AsyncGenerator<E>接口实现流式输出。其核心设计思路是将异步数据流抽象为一个可迭代的对象,消费者通过调用next()方法逐个获取数据元素。该模式遵循以下原则:
Data<E>类封装异步操作的各种状态(正常数据、完成状态、错误状态)传统迭代器模式更符合命令式编程思维,而响应式流模式则体现了声明式编程的理念。
传统迭代器模式主要依赖以下 Java 并发机制:
thenCompose、thenApply等方法构建异步操作链// AsyncGenerator接口中的toCompletableFuture方法体现了链式调用思想
defaultCompletableFuture<Object>toCompletableFuture(){
finalData<E> next = next();
if(next.isDone()) {
returncompletedFuture(next.resultValue);
}
returnnext.data.thenCompose(v -> toCompletableFuture());
}AsyncGenerator<E>接口定义了异步生成器的核心契约:
publicinterfaceAsyncGenerator<E>extendsIterable<E> {
Data<E>next();// 获取下一个异步数据元素
defaultCompletableFuture<Object>toCompletableFuture(){ ... }// 转换为CompletableFuture
defaultStream<E>stream(){ ... }// 转换为Stream
defaultIterator<E>iterator(){ ... }// 获取迭代器
}主要实现类包括:
Data<E>类是异步数据元素的核心封装,设计上考虑了多种状态:
classData<E> {
finalCompletableFuture<E> data;// 异步数据
finalEmbed<E> embed;// 嵌入式生成器
finalObject resultValue;// 结果值
publicbooleanisDone(){// 完成状态判断
returndata ==null&& embed ==null;
}
publicbooleanisError(){// 错误状态判断
returndata !=null&& data.isCompletedExceptionally();
}
}设计考量包括:
classWithResult<E>implementsAsyncGenerator<E>, HasResultValue {
protectedfinalAsyncGenerator<E> delegate;
privateObject resultValue;
@Override
publicfinalData<E>next(){
finalData<E> result = delegate.next();
if(result.isDone()) {
resultValue = result.resultValue;
}
returnresult;
}
}classWithEmbed<E>implementsAsyncGenerator<E>, HasResultValue {
protectedfinalDeque<Embed<E>> generatorsStack =newArrayDeque<>(2);
privatefinalDeque<Data<E>> returnValueStack =newArrayDeque<>(2);
@Override
publicData<E>next(){
// 处理嵌套生成器栈
// 实现生成器组合逻辑
}
}传统迭代器模式的背压处理主要通过以下方式实现:
next()方法调用会阻塞局限性:
AsyncGenerator继承Iterable接口WithResult和WithEmbed类next()方法定义算法框架Spring AI Alibaba Graph 模块的流式处理设计采用了响应式编程范式,基于 Project Reactor 框架实现。其核心理念是:
OverAllState管理流式处理过程中的状态变化Flux 作为流式处理的核心组件具有以下优势:
在ParallelNode中,流合并策略采用以下设计:
Flux.zip操作符合并多个 Flux 流OverAllState.updateState方法维护状态一致性为保持向后兼容性,项目提供了AsyncGenerator与 Flux 的双向转换:
AsyncGenerator.fromFlux:将 Flux 转换为 AsyncGeneratorFlowGenerator.fromPublisher:将 Publisher 转换为 AsyncGenerator在ParallelNode中,多个 Flux 流的合并通过以下步骤实现:
// 检查是否有Flux类型的输出
booleanhasFlux=results.stream()
.flatMap(map -> map.values().stream())
.anyMatch(value -> valueinstanceofFlux);
if(hasFlux) {
// 收集所有Flux流
List<Flux<Object>> fluxList =newArrayList<>();
// ... 处理非Flux输出 ...
// 合并Flux流
if(!fluxList.isEmpty()) {
Flux<Object> mergedFlux = Flux.zip(fluxList,newFunction<Object[], Object>() {
@Override
publicObjectapply(Object[] objects){
returnnull;// 简化的合并逻辑
}
});
mergedState.put("__merged_stream__", mergedFlux);
}
}StreamingOutput类封装了流式输出的数据:
publicclassStreamingOutputextendsNodeOutput{
privatefinalString chunk;
privatefinalChatResponse chatResponse;
publicStreamingOutput(ChatResponse chatResponse, String node, OverAllState state){
super(node, state);
this.chatResponse = chatResponse;
this.chunk =null;
}
publicStreamingOutput(String chunk, String node, OverAllState state){
super(node, state);
this.chunk = chunk;
this.chatResponse =null;
}
}StreamingChatGenerator构建流式聊天生成器:
publicAsyncGenerator<?extendsNodeOutput> buildInternal(Flux<ChatResponse> flux,
Function<ChatResponse, StreamingOutput> outputMapper) {
varresult=newAtomicReference<ChatResponse>(null);
Consumer<ChatResponse> mergeMessage = (response) -> {
result.updateAndGet(lastResponse -> {
// 合并消息逻辑
// ...
});
};
varprocessedFlux=flux
.filter(response -> response.getResult() !=null&& response.getResult().getOutput() !=null)
.doOnNext(mergeMessage)
.map(next ->newStreamingOutput(next.getResult().getOutput().getText(), startingNode, startingState));
returnFlowGenerator.fromPublisher(FlowAdapters.toFlowPublisher(processedFlux),
() -> mapResult.apply(result.get()));
}OverAllState通过以下方式管理流式处理状态:
publicstaticMap<String, Object>updateState(Map<String, Object> state, Map<String, Object> partialState,
Map<String, KeyStrategy> keyStrategies){
Objects.requireNonNull(state,"state cannot be null");
if(partialState ==null|| partialState.isEmpty()) {
returnstate;
}
Map<String, Object> updatedPartialState = updatePartialStateFromSchema(state, partialState, keyStrategies);
returnStream.concat(state.entrySet().stream(), updatedPartialState.entrySet().stream())
.collect(toMapRemovingNulls(Map.Entry::getKey, Map.Entry::getValue, (currentValue, newValue) -> newValue));
}AsyncGeneratorUtils提供多生成器合并功能:
publicstatic<T> AsyncGenerator<T>createMergedGenerator(List<AsyncGenerator<T>> generators,
Map<String, KeyStrategy> keyStrategyMap){
returnnewAsyncGenerator<>() {
privatefinalStampedLocklock=newStampedLock();
privateAtomicIntegerpollCounter=newAtomicInteger(0);
privateMap<String, Object> mergedResult =newHashMap<>();
privatefinalList<AsyncGenerator<T>> activeGenerators =newCopyOnWriteArrayList<>(generators);
@Override
publicData<T>next(){
// 轮询各个生成器,合并结果
// ...
}
};
}在并行执行中,流式数据通过以下方式传递和聚合:
CompletableFuture.allOf并行执行多个节点OverAllState.updateState更新全局状态__merged_stream__键用于标识合并后的 Flux 流,在后续处理中可以识别和处理合并的流数据。
项目通过类型检查来处理不同数据类型:
instanceof Flux检查,收集到fluxList中进行合并OverAllState.updateState方法更新状态onBackpressureBuffer()处理背压CompletableFuture实现异步执行graph TD
A[StateGraph] --> B[ParallelNode]
B --> C[AsyncParallelNodeAction]
C --> D[Node Actions]
D --> E[Flux Streams]
E --> F[Flux Merge]
F --> G[__merged_stream__]
G --> H[OverAllState]sequenceDiagram
participant Client
participant ParallelNode
participant NodeAction1
participant NodeAction2
participant FluxMerge
Client->>ParallelNode: Execute
ParallelNode->>NodeAction1: Execute Async
ParallelNode->>NodeAction2: Execute Async
NodeAction1-->>ParallelNode: Return Flux
NodeAction2-->>ParallelNode: Return Flux
ParallelNode->>FluxMerge: Merge Flux Streams
FluxMerge-->>ParallelNode: Merged Flux
ParallelNode->>Client: Return Resultnext()调用可能涉及线程阻塞响应式流模式在资源利用方面具有明显优势:
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |