ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 13px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">20250925日,随着Spring AI Alibaba Graph从1.0.0.3升级至1.0.0.4,其中的Graph流式输出有了很大的改进,相关的example已更新,欢迎大家随时跟进,PR地址如下:https://github.com/spring-ai-alibaba/examples/pull/364 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 13px;margin: 0.1em auto 0.5em;border-radius: 4px;" title="null"/> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;display: table;padding: 0px 0.2em;color: rgb(255, 255, 255);background: rgb(0, 152, 116);">1. 背景
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 13px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">随着 spring-ai-alibaba-graph 模块的广泛应用,社区中出现了许多关于其流式处理实现机制的疑问和使用文档需求。其中最突出的问题是:graph 模块的流式实现如何与当前主流的响应式流(Reactive Streams)框架进行集成。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 13px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">当前 graph 模块采用传统的迭代器模式实现流式输出,这种实现方式与主流的响应式编程范式存在较大差异。在与 Project Reactor、RxJava 等响应式流框架集成时,开发者需要进行大量额外的适配工作,增加了技术复杂性和维护成本。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 13px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">为解决这一问题并提升框架的现代化程度,团队对 graph 内核中的流式输出机制进行了重构,采用基于 Project Reactor 的响应式流实现,以更好地与现代响应式生态系统集成,降低开发者的使用门槛并提高整体性能表现。 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;display: table;padding: 0px 0.2em;color: rgb(255, 255, 255);background: rgb(0, 152, 116);">2. 迭代器实现流式输出分析 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 8px;color: rgb(63, 63, 63);">2.1 设计理念 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(0, 152, 116);">2.1.1 传统迭代器模式在流式输出场景下的设计思路
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 13px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">传统迭代器模式在 Spring AI Alibaba 项目的AsyncGenerator<E>接口实现流式输出。其核心设计思路是将异步数据流抽象为一个可迭代的对象,消费者通过调用next()方法逐个获取数据元素。该模式遵循以下原则:
• 拉取模式(Pull-based):消费者主动从生成器中拉取数据,而非被动接收推送 • 状态封装:通过Data<E>类封装异步操作的各种状态(正常数据、完成状态、错误状态) • 装饰器扩展:通过装饰器模式为基本生成器添加额外功能(如结果值获取、嵌套生成器支持) 2.1.2 与响应式流模式的架构哲学差异 传统迭代器模式更符合命令式编程思维,而响应式流模式则体现了声明式编程的理念。
2.1.3 Java 并发模型角度的线程模型和资源管理策略 传统迭代器模式主要依赖以下 Java 并发机制:
1. CompletableFuture 链式调用:通过thenCompose、thenApply等方法构建异步操作链 3. 手动资源管理:需要显式管理异步任务的生命周期,防止资源泄漏 // AsyncGenerator接口中的toCompletableFuture方法体现了链式调用思想 defaultCompletableFuture<Object>toCompletableFuture(){ finalData<E> next = next(); if(next.isDone()) { returncompletedFuture(next.resultValue); } returnnext.data.thenCompose(v -> toCompletableFuture()); }2.2 核心源码深度解析 2.2.1 AsyncGenerator 接口及其实现类的职责划分 AsyncGenerator<E>接口定义了异步生成器的核心契约:
publicinterfaceAsyncGenerator<E>extendsIterable<E> { Data<E>next();// 获取下一个异步数据元素 defaultCompletableFuture<Object>toCompletableFuture(){ ... }// 转换为CompletableFuture defaultStream<E>stream(){ ... }// 转换为Stream defaultIterator<E>iterator(){ ... }// 获取迭代器 }主要实现类包括:
1. 基本实现:通过 lambda 表达式直接实现接口 2. WithResult 装饰器:添加结果值获取功能 3. WithEmbed 装饰器:支持生成器嵌套组合 2.2.2 Data 封装类的设计考量 Data<E>类是异步数据元素的核心封装,设计上考虑了多种状态:
classData<E> { finalCompletableFuture<E> data;// 异步数据 finalEmbed<E> embed;// 嵌入式生成器 finalObject resultValue;// 结果值 publicbooleanisDone(){// 完成状态判断 returndata ==null&& embed ==null; } publicbooleanisError(){// 错误状态判断 returndata !=null&& data.isCompletedExceptionally(); } }设计考量包括:
1. 状态分离:通过不同字段表示不同状态,避免状态混淆 3. 不可变性:字段均为 final,确保线程安全 2.2.3 WithEmbed 和 WithResult 装饰器模式的应用 WithResult 装饰器: classWithResult<E>implementsAsyncGenerator<E>, HasResultValue { protectedfinalAsyncGenerator<E> delegate; privateObject resultValue; @Override publicfinalData<E>next(){ finalData<E> result = delegate.next(); if(result.isDone()) { resultValue = result.resultValue; } returnresult; } }WithEmbed 装饰器: classWithEmbed<E>implementsAsyncGenerator<E>, HasResultValue { protectedfinalDeque<Embed<E>> generatorsStack =newArrayDeque<>(2); privatefinalDeque<Data<E>> returnValueStack =newArrayDeque<>(2); @Override publicData<E>next(){ // 处理嵌套生成器栈 // 实现生成器组合逻辑 } }2.2.4 背压处理机制的实现方式和局限性 传统迭代器模式的背压处理主要通过以下方式实现:
1. 阻塞式背压:当消费者处理速度慢于生产者时,next()方法调用会阻塞 2. 缓冲区管理:通过手动管理 CompletableFuture 链实现简单的缓冲 局限性:
1. 缺乏精细化控制:无法根据消费者处理能力动态调整生产速度 2.3 结构图解 2.3.1 组件交互图 2.3.2 典型流式调用时序图 2.3.3 设计模式应用标注 1. 迭代器模式:AsyncGenerator继承Iterable接口 2. 装饰器模式:WithResult和WithEmbed类 3. 响应式流模式实现分析 3.1 设计理念 3.1.1 整体架构设计理念 Spring AI Alibaba Graph 模块的流式处理设计采用了响应式编程范式,基于 Project Reactor 框架实现。其核心理念是:
1.非阻塞异步处理 :通过 Flux 和 Mono 实现非阻塞的数据流处理,提高系统吞吐量 2.背压处理 :利用 Reactor 的背压机制,确保生产者和消费者之间的速率平衡 3.状态一致性 :通过OverAllState管理流式处理过程中的状态变化 4.模块化设计 :将流式处理逻辑封装在独立的组件中,便于维护和扩展 3.1.2 采用 Flux 作为核心组件的原因 Flux 作为流式处理的核心组件具有以下优势:
1.丰富的操作符 :提供 map、filter、zip 等丰富的流操作符,便于处理复杂的数据流 3.错误处理 :完善的错误处理机制,支持异常传播和恢复 4.与 Spring 生态集成 :与 Spring WebFlux 无缝集成,便于构建响应式应用 3.1.3 并行节点流合并策略 在ParallelNode中,流合并策略采用以下设计:
1.分离处理 :将非 Flux 和 Flux 类型的输出分别处理 2.统一合并 :使用Flux.zip操作符合并多个 Flux 流 3.状态管理 :通过OverAllState.updateState方法维护状态一致性 3.1.4 AsyncGenerator 与 Reactor Flux 的集成 为保持向后兼容性,项目提供了AsyncGenerator与 Flux 的双向转换:
1.AsyncGenerator.fromFlux:将 Flux 转换为 AsyncGenerator 2.FlowGenerator.fromPublisher:将 Publisher 转换为 AsyncGenerator 3.2 核心源码解析 3.2.1 ParallelNode 中多个 Flux 流的合并实现 在ParallelNode中,多个 Flux 流的合并通过以下步骤实现:
// 检查是否有Flux类型的输出 booleanhasFlux=results.stream() .flatMap(map -> map.values().stream()) .anyMatch(value -> valueinstanceofFlux); if(hasFlux) { // 收集所有Flux流 List<Flux<Object>> fluxList =newArrayList<>(); // ... 处理非Flux输出 ... // 合并Flux流 if(!fluxList.isEmpty()) { Flux<Object> mergedFlux = Flux.zip(fluxList,newFunction<Object[], Object>() { @Override publicObjectapply(Object[] objects){ returnnull;// 简化的合并逻辑 } }); mergedState.put("__merged_stream__", mergedFlux); } }3.2.2 StreamingOutput 和 StreamingChatGenerator 处理流式输出 StreamingOutput类封装了流式输出的数据:
publicclassStreamingOutputextendsNodeOutput{ privatefinalString chunk; privatefinalChatResponse chatResponse; publicStreamingOutput(ChatResponse chatResponse, String node, OverAllState state){ super(node, state); this.chatResponse = chatResponse; this.chunk =null; } publicStreamingOutput(String chunk, String node, OverAllState state){ super(node, state); this.chunk = chunk; this.chatResponse =null; } }StreamingChatGenerator构建流式聊天生成器:
publicAsyncGenerator<?extendsNodeOutput> buildInternal(Flux<ChatResponse> flux, Function<ChatResponse, StreamingOutput> outputMapper) { varresult=newAtomicReference<ChatResponse>(null); Consumer<ChatResponse> mergeMessage = (response) -> { result.updateAndGet(lastResponse -> { // 合并消息逻辑 // ... }); }; varprocessedFlux=flux .filter(response -> response.getResult() !=null&& response.getResult().getOutput() !=null) .doOnNext(mergeMessage) .map(next ->newStreamingOutput(next.getResult().getOutput().getText(), startingNode, startingState)); returnFlowGenerator.fromPublisher(FlowAdapters.toFlowPublisher(processedFlux), () -> mapResult.apply(result.get())); }3.2.3 OverAllState 在流式处理中的状态管理 OverAllState通过以下方式管理流式处理状态:
publicstaticMap<String, Object>updateState(Map<String, Object> state, Map<String, Object> partialState, Map<String, KeyStrategy> keyStrategies){ Objects.requireNonNull(state,"state cannot be null"); if(partialState ==null|| partialState.isEmpty()) { returnstate; } Map<String, Object> updatedPartialState = updatePartialStateFromSchema(state, partialState, keyStrategies); returnStream.concat(state.entrySet().stream(), updatedPartialState.entrySet().stream()) .collect(toMapRemovingNulls(Map.Entry::getKey, Map.Entry::getValue, (currentValue, newValue) -> newValue)); }3.2.4 AsyncGeneratorUtils 中多生成器合并的实现原理 AsyncGeneratorUtils提供多生成器合并功能:
publicstatic<T> AsyncGenerator<T>createMergedGenerator(List<AsyncGenerator<T>> generators, Map<String, KeyStrategy> keyStrategyMap){ returnnewAsyncGenerator<>() { privatefinalStampedLocklock=newStampedLock(); privateAtomicIntegerpollCounter=newAtomicInteger(0); privateMap<String, Object> mergedResult =newHashMap<>(); privatefinalList<AsyncGenerator<T>> activeGenerators =newCopyOnWriteArrayList<>(generators); @Override publicData<T>next(){ // 轮询各个生成器,合并结果 // ... } }; }3.3 技术细节 3.3.1 流式数据在并行执行节点间的传递和聚合 在并行执行中,流式数据通过以下方式传递和聚合:
1.并行执行 :使用CompletableFuture.allOf并行执行多个节点 3.类型分离 :将 Flux 和非 Flux 类型的结果分别处理 4.状态更新 :通过OverAllState.updateState更新全局状态 3.3.2merged_stream 键在流合并过程中的作用 __merged_stream__键用于标识合并后的 Flux 流,在后续处理中可以识别和处理合并的流数据。
3.3.3 不同数据类型在流合并时的处理策略 项目通过类型检查来处理不同数据类型:
1.Flux 类型 :通过instanceof Flux检查,收集到fluxList中进行合并 2.非 Flux 类型 :直接通过OverAllState.updateState方法更新状态 3.3.4 背压处理和性能优化措施 1.Reactor 背压机制 :利用 Flux 内置的背压处理机制 2.缓冲策略 :使用onBackpressureBuffer()处理背压 3.异步处理 :通过CompletableFuture实现异步执行 3.4 架构图示 3.4.1 关键组件交互图 graph TD A[StateGraph] --> B[ParallelNode] B --> C[AsyncParallelNodeAction] C --> D[Node Actions] D --> E[Flux Streams] E --> F[Flux Merge] F --> G[__merged_stream__] G --> H[OverAllState]3.4.2 并行节点流合并时序图 sequenceDiagram participant Client participant ParallelNode participant NodeAction1 participant NodeAction2 participant FluxMerge Client->>ParallelNode: Execute ParallelNode->>NodeAction1: Execute Async ParallelNode->>NodeAction2: Execute Async NodeAction1-->>ParallelNode: Return Flux NodeAction2-->>ParallelNode: Return Flux ParallelNode->>FluxMerge: Merge Flux Streams FluxMerge-->>ParallelNode: Merged Flux ParallelNode->>Client: Return Result4. 两种实现对比 4.1 性能和可扩展性分析 4.1.1 高并发场景性能对比 传统迭代器模式: • CompletableFuture 链式调用增加内存开销 响应式流模式: 4.1.2 资源利用和内存管理优势 响应式流模式在资源利用方面具有明显优势:
3. 资源回收:自动化的订阅/取消机制确保资源及时回收 4.1.3 扩展性和维护性对比 4.2 适用场景分析 传统迭代器模式适用于: 响应式流模式适用于: