返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

让复杂 AI 应用构建就像搭积木:Spring AI Alibaba Graph 使用指南与源码解读

[复制链接]
链载Ai 显示全部楼层 发表于 昨天 21:31 |阅读模式 打印 上一主题 下一主题


目录




Cloud Native


1. 引言与概述
2. 核心架构与设计理念
3. 核心概念深度解析
4. 预定义组件与工具箱
5. 高级特性与扩展能力
6. 快速开始与实战指南


1. 引言与概述

Cloud Native


1.1 Spring AI Alibaba Graph 概述


Spring AI Alibaba Graph 是社区核心实现之一,也是整个框架在设计理念上区别于 Spring AI 只做底层原子抽象的地方,Spring AI Alibaba 期望帮助开发者更容易的构建智能体应用。基于 Graph 开发者可以构建工作流、多智能体应用。


Spring AI Alibaba Graph 在设计理念上借鉴 LangGraph,社区在此基础上增加了大量预置 Node、简化了 State 定义过程等,让开发者更容易编写对等低代码平台的工作流、多智能体等。


1.2 核心特性与优势


相比传统的 AI 应用开发方式,Spring AI Alibaba Graph 具有以下核心优势:


Java 生态深度集成


  • Spring 原生支持:完整的依赖注入、配置管理、监控观测。
  • 高并发处理:Java 天然的多线程优势,支持高并发场景。


丰富的预置组件


  • 15+ 预定义节点类型:QuestionClassifierNode、LlmNode、ToolNode、KnowledgeRetrievalNode 等。
  • 多种 Agent 模式:内置 React、Reflection、Supervisor 等智能体模式。
  • 简化的 State 管理:统一的状态定义和合并策略。


声明式 API 设计


  • 类似 LangGraph 的 API:Java 开发者更容易上手。
  • 链式调用:简洁的流式 API,代码更加优雅。
  • 条件分支:支持复杂的条件逻辑和并行处理。


生产级特性


  • 观测性支持:完整的指标收集、链路追踪。
  • 容错机制:支持检查点、状态恢复、错误处。
  • 人机协作:Human-in-the-loop 支持,支持修改状态、恢复执行。


快速开始:客户评价分类系统

Cloud Native


让我们通过一个具体示例了解 Spring AI Alibaba Graph 的使用方式。这个示例展示了如何构建一个客户评价分类系统:


系统架构



核心代码实现


@ConfigurationpublicclassCustomerServiceWorkflow{
@Bean publicStateGraphcustomerServiceGraph(ChatModel chatModel){ ChatClientchatClient=ChatClient.builder(chatModel) .defaultAdvisors(newSimpleLoggerAdvisor()) .build();
// 评价分类器 - 区分正面/负面评价 QuestionClassifierNodefeedbackClassifier=QuestionClassifierNode.builder() .chatClient(chatClient) .inputTextKey("input") .outputKey("classifier_output") .categories(List.of("positive feedback","negative feedback")) .build();
// 问题细分器 - 对负面评价进行细分 QuestionClassifierNodespecificQuestionClassifier=QuestionClassifierNode.builder() .chatClient(chatClient) .inputTextKey("input") .outputKey("classifier_output") .categories(List.of("after-sale service","transportation","product quality","others")) .build();
// 状态工厂定义 - 简化的状态管理 KeyStrategyFactorystateFactory=() -> { Map<String, KeyStrategy> strategies =newHashMap<>(); strategies.put("input",newReplaceStrategy()); strategies.put("classifier_output",newReplaceStrategy()); strategies.put("solution",newReplaceStrategy()); returnstrategies; };
// 构建工作流 - 声明式API returnnewStateGraph("客户服务评价处理", stateFactory) .addNode("feedback_classifier", node_async(feedbackClassifier)) .addNode("specific_question_classifier", node_async(specificQuestionClassifier)) .addNode("recorder", node_async(newRecordingNode())) .addEdge(START,"feedback_classifier") .addConditionalEdges("feedback_classifier", edge_async(newFeedbackQuestionDispatcher()), Map.of("positive","recorder","negative","specific_question_classifier")) .addEdge("recorder", END); }}


以上代码只展示了图结构(StateGraph)的构建,具体的代码实现你可以关注spring-ai-alibaba-example仓库:spring-ai-alibaba-example【1】


这个示例展示了 Spring AI Alibaba Graph 的核心特性:


  • 预置组件:使用 QuestionClassifierNode 快速实现分类功能。
  • 简化状态管理:通过 KeyStrategyFactory 统一管理状态。
  • 声明式 API:链式调用构建复杂工作流。
  • Spring Boot 集成:通过 @Configuration 和 @Bean 完成依赖注入。


2. 核心架构与设计理念

Cloud Native


2.1 整体数据流转架构


Spring AI Alibaba Graph 采用工作流模型,整个框架的数据流转遵循"构建→编译→执行"的三阶段模式:


2.1.1 完整数据流转图



2.1.2 核心执行流程详解


数据流转的核心理念:整个框架围绕OverAllState这个数据载体进行流转,每个节点都是状态的转换器,通过AsyncNodeGenerator这个状态机来驱动整个流程的执行。



2.1.3 关键数据结构流转


StateGraph → CompiledGraph 转换



AsyncNodeGenerator 执行机制



2.2 整体架构设计


基于上述数据流转机制,Spring AI Alibaba Graph 的整体架构设计具有以下特点:


  • 清晰的执行流程:每个节点代表一个处理步骤,边表示数据流向。
  • 灵活的条件分支:支持根据状态动态选择执行路径。
  • 并行处理能力:多个节点可以并行执行,提高处理效率。
  • 状态可追溯:完整的状态变化历史,便于调试和监控。


架构核心理念:Spring AI Alibaba Graph 将复杂的 AI 任务分解为可组合的原子操作,每个节点专注于单一职责,通过状态驱动的方式实现节点间的协调。这种设计让开发者可以像搭积木一样构建复杂的 AI 应用,既保证了系统的可维护性,又提供了足够的灵活性。


2.2.1 系统架构总览



2.2.2 StateGraph 构建流程


StateGraph 是工作流的蓝图设计器,它负责定义整个工作流的结构和执行逻辑,就像建筑师绘制建筑图纸一样。通过声明式的 API,开发者可以轻松定义节点、边和状态管理策略,最终编译成可执行的 CompiledGraph。



关键设计思想:StateGraph 采用了"先定义后执行"的模式,将工作流的结构定义与实际执行分离,这样可以在编译时进行各种验证和优化,确保运行时的稳定性和高效性。


2.2.3 CompiledGraph 执行流程


CompiledGraph 是工作流的运行时引擎,它将 StateGraph 的静态定义转换为可执行的动态流程。就像将建筑图纸变成真正的建筑物一样,CompiledGraph 负责协调各个组件的执行,管理状态流转,确保整个工作流按照预期运行。


AsyncNodeGenerator 是整个图流转执行的唯一状态机,它控制着工作流的每一步执行,包括节点调度、状态更新、条件判断和异常处理。这种单一状态机的设计确保了执行的一致性和可预测性。



核心执行机制:CompiledGraph 采用了基于迭代器模式的异步执行机制,每次调用 next() 方法都会推进工作流的执行,这种设计既支持同步调用,也支持流式处理,为不同的使用场景提供了灵活性。


2.3 核心组件关系图


组件职责说明


  • StateGraph:工作流的架构师,负责定义整个流程的结构和规则。
  • CompiledGraph:工作流的指挥官,负责协调和管理整个执行过程。
  • OverAllState:工作流的记忆中心,负责存储和管理所有状态数据。
  • Node:工作流的执行单元,每个节点专注于特定的业务逻辑。
  • Edge:工作流的连接器,定义节点之间的转换关系和条件。
  • AsyncNodeGenerator:工作流的执行引擎,是推动整个流程运转的核心状态机。


2.4 核心设计理念


2.4.1 声明式编程模型


借鉴 LangGraph 的设计理念,Spring AI Alibaba Graph 采用声明式编程模型,开发者只需要描述"做什么":


//声明式定义工作流StateGraphgraph=newStateGraph("客户服务工作流",stateFactory).addNode("feedback_classifier",node_async(feedbackClassifier)).addNode("specific_question_classifier",node_async(specificQuestionClassifier)).addNode("recorder",node_async(recorderNode)).addEdge(START,"feedback_classifier").addConditionalEdges("feedback_classifier",edge_async(newFeedbackQuestionDispatcher()),Map.of("positive","recorder","negative","specific_question_classifier")).addEdge("recorder",END);

2.4.2 状态驱动的执行模型



所有的数据流转都通过OverAllState进行管理,确保状态的一致性和可追溯性:


//状态工厂定义KeyStrategyFactorystateFactory=()->{Map<String,KeyStrategy>strategies=newHashMap<>();strategies.put("input",newReplaceStrategy());strategies.put("classifier_output",newReplaceStrategy());strategies.put("solution",newReplaceStrategy());returnstrategies;};


2.4.3 异步优先的设计


框架优先支持异步处理,提高系统的吞吐量和响应性,同时还原生支持了节点内模型流式透传



// 异步节点定义AsyncNodeActionasyncNode =node_async(newCustomNodeAction());
// 并行节点处理publicclassParallelNodeextendsNode{ recordAsyncParallelNodeAction( List<AsyncNodeActionWithConfig> actions, Map<String,KeyStrategy> channels )implementsAsyncNodeActionWithConfig{
@Override publicCompletableFuture<Map<String,Object>>apply(OverAllState state, RunnableConfig config) { varfutures = actions.stream() .map(action -> action.apply(state, config)) .toArray(CompletableFuture[]::new);
returnCompletableFuture.allOf(futures) .thenApply(v -> { // 合并所有结果 Map<String,Object> result =newHashMap<>(); for(CompletableFuture<Map<String,Object>> future : futures) { result.putAll(future.join()); } returnresult; }); } }}

2.5 Spring 生态集成


Spring AI Alibaba Graph 与 Spring 生态深度集成,你可以轻松在你的 Spring 应用中引入 AI 模型工作流以开发智能 Java 应用。


2.5.1 依赖注入架构



2.5.2 依赖注入支持


以下代码演示了 Spring AI Alibaba Graph 是如何被 IOC 容器所管理的。


@ConfigurationpublicclassGraphConfiguration{
@Bean publicStateGraphworkflowGraph(ChatModel chatModel) { ChatClientchatClient =ChatClient.builder(chatModel) .defaultAdvisors(newSimpleLoggerAdvisor()) .build();
// 构建图定义... returnstateGraph; }
@Bean publicCompiledGraphcompiledGraph(StateGraph stateGraph, ObservationRegistry observationRegistry) { returnstateGraph.compile(CompileConfig.builder() .withLifecycleListener(newGraphObservationLifecycleListener(observationRegistry)) .build()); }}


2.5.3 观测性集成


Spring AI Alibaba Graph 基于 Micrometer 内置了可观测支持,可以无缝集成 Spring Boot 可观测性。


@RestControllerpublicclassGraphController{
publicGraphController(@Qualifier("workflowGraph")StateGraph stateGraph, ObjectProvider<ObservationRegistry> observationRegistry) { this.compiledGraph = stateGraph.compile(CompileConfig.builder() .withLifecycleListener(new GraphObservationLifecycleListener( observationRegistry.getIfUnique(() -> ObservationRegistry.NOOP))) .build()); }}


3. 核心概念深度解析

Cloud Native


3.1 StateGraph (状态图)


StateGraph 是整个框架的设计蓝图,它就像建筑师的设计图纸一样,定义了工作流的完整结构和执行逻辑。StateGraph 采用声明式 API,让开发者可以用简洁的代码描述复杂的业务流程,而不需要关心底层的执行细节。


核心设计理念:StateGraph 将复杂的工作流抽象为节点和边的组合,每个节点代表一个具体的操作,边定义了操作之间的流转关系。这种抽象让开发者可以专注于业务逻辑的设计,而不是执行机制的实现。


3.1.1 StateGraph 生命周期



3.1.2 基本构造


publicclassStateGraph{ // 核心数据结构 finalNodesnodes=newNodes(); // 存储所有节点 finalEdgesedges=newEdges(); // 存储所有边
// 特殊节点常量 publicstaticfinalStringEND="__END__"; publicstaticfinalStringSTART="__START__"; publicstaticfinalStringERROR="__ERROR__";
// 状态管理 privateKeyStrategyFactory keyStrategyFactory; privatePlainTextStateSerializer stateSerializer;}

3.1.3 节点管理流程



支持的节点添加方式:


// 添加普通节点publicStateGraphaddNode(Stringid, AsyncNodeAction action) { Nodenode =newNode(id, (config) ->AsyncNodeActionWithConfig.of(action)); returnaddNode(id, node);}
// 添加带配置的节点publicStateGraphaddNode(Stringid, AsyncNodeActionWithConfig actionWithConfig) { Nodenode =newNode(id, (config) -> actionWithConfig); returnaddNode(id, node);}
// 添加子图节点publicStateGraphaddNode(Stringid, StateGraph subGraph) { subGraph.validateGraph();// 先验证子图 varnode =newSubStateGraphNode(id, subGraph); returnaddNode(id, node);}


3.1.4 边管理流程



3.1.5 图验证机制



3.2 OverAllState (全局状态)


OverAllState 是工作流的数据中枢,它就像工作流的记忆系统一样,负责在各个节点之间传递和管理状态数据。OverAllState 不仅存储数据,还定义了数据的合并策略,确保不同节点产生的数据能够正确地整合在一起。


设计巧思:OverAllState 采用了策略模式来处理状态更新,不同的数据类型可以采用不同的合并策略(如替换、追加、合并等),这种设计让状态管理变得非常灵活,能够适应各种复杂的业务场景。


3.2.1 状态管理架构



3.2.2 状态更新流程



3.2.3 状态策略详解


策略模式架构



内置策略实现


// 替换策略 - 新值覆盖旧值publicclassReplaceStrategyimplementsKeyStrategy{ @Override publicObjectapply(ObjectoldValue,ObjectnewValue) {   returnnewValue;  }}
// 追加策略 - 新值追加到列表,支持复杂的列表操作publicclassAppendStrategyimplementsKeyStrategy{ @Override publicObjectapply(ObjectoldValue,ObjectnewValue) { if(newValue ==null) { returnoldValue; }
// 处理Optional类型 if(oldValueinstanceofOptional<?> oldValueOptional) { oldValue = oldValueOptional.orElse(null); }
booleanoldValueIsList = oldValueinstanceofList<?>;
// 处理移除操作 if(oldValueIsList && newValueinstanceofAppenderChannel.RemoveIdentifier<?>) { varresult =newArrayList<>((List<Object>) oldValue); removeFromList(result, (AppenderChannel.RemoveIdentifier) newValue); returnunmodifiableList(result); }
// 处理新值为集合的情况 List<Object> list =null; if(newValueinstanceofList) { list =newArrayList<>((List<?>) newValue); }elseif(newValue.getClass().isArray()) { list =Arrays.asList((Object[]) newValue); }elseif(newValueinstanceofCollection) { list =newArrayList<>((Collection<?>) newValue); }
// 合并逻辑 if(oldValueIsList) { List<Object> oldList = (List<Object>) oldValue; if(list !=null) { if(list.isEmpty()) { returnoldValue; } // 合并并去重 varresult = evaluateRemoval(oldList, list); returnStream.concat(result.oldValues().stream(), result.newValues().stream()) .distinct() .collect(Collectors.toList()); }else{ oldList.add(newValue); } returnoldList; }else{ ArrayList<Object> arrayResult =newArrayList<>(); if(list !=null) { arrayResult.addAll(list); }else{ arrayResult.add(newValue); } returnarrayResult; } }}


自定义策略示例


// 自定义Map合并策略publicclassMapMergeStrategyimplementsKeyStrategy{ @Override publicObjectapply(ObjectoldValue,ObjectnewValue) {   if(oldValueinstanceofMap&& newValueinstanceofMap) {     Map<String,Object> merged =newHashMap<>((Map) oldValue);      merged.putAll((Map) newValue);     returnmerged;    }   returnnewValue;// 默认替换  }}
// 自定义字符串连接策略publicclassStringConcatStrategyimplementsKeyStrategy{ privatefinalStringseparator;
publicStringConcatStrategy(Stringseparator) { this.separator= separator; }
@Override publicObjectapply(ObjectoldValue,ObjectnewValue) { if(oldValueinstanceofString&& newValueinstanceofString) { returnoldValue + separator + newValue; } returnnewValue; }}


策略工厂模式


publicclassStrategyFactory{
publicstaticKeyStrategyFactorycreateDefaultFactory() { return() -> { Map<String,KeyStrategy> strategies =newHashMap<>(); strategies.put("messages",newAppendStrategy()); strategies.put("input",newReplaceStrategy()); strategies.put("output",newReplaceStrategy()); returnstrategies; }; }
publicstaticKeyStrategyFactorycreateCustomFactory(Map<String, KeyStrategy> customStrategies) { return() -> { Map<String,KeyStrategy> strategies =newHashMap<>(); // 添加默认策略 strategies.put("messages",newAppendStrategy()); strategies.put("input",newReplaceStrategy()); // 覆盖自定义策略 strategies.putAll(customStrategies); returnstrategies; }; }}

3.3 Node (节点)


Node 是工作流的功能模块,每个节点就像一个专门的工作站,负责执行特定的业务逻辑。Node 的设计遵循单一职责原则,每个节点只关注一件事情,这样既保证了代码的可维护性,也提高了节点的可复用性。


执行特性:Node 支持同步和异步两种执行模式,还支持并行执行多个子任务。这种灵活的执行机制让 Node 既能处理简单的数据转换,也能处理复杂的外部服务调用,满足各种性能要求。


3.3.1 节点执行流程



3.3.2 节点类型层次结构



3.3.3 并行节点处理机制



3.4 Edge (边)


Edge 是工作流的路由器,它决定了数据在节点之间的流转路径。Edge 不仅仅是简单的连接线,它还包含了复杂的条件判断逻辑,能够根据当前状态动态决定下一步的执行路径。


智能路由:Edge 支持静态路由和动态路由两种模式。静态边提供固定的转换路径,而条件边则可以根据状态内容进行智能判断,这种设计让工作流具备了强大的条件分支能力,能够处理各种复杂的业务逻辑。


3.4.1 边的类型与结构



3.4.2 条件边路由流程



3.4.3 边验证机制


publicclassEdge{ publicvoidvalidate(Nodes nodes) throws GraphStateException{   // 验证源节点存在   if(!nodes.anyMatchById(sourceId)) {     throwErrors.missingNodeInEdgeMapping.exception(sourceId);    }
// 验证目标节点 for(EdgeValue target : targets()) { if(target.id() !=null) { // 静态边:直接验证目标节点 if(!nodes.anyMatchById(target.id()) && !END.equals(target.id())) { throwErrors.missingNodeInEdgeMapping.exception(target.id()); } }elseif(target.value() !=null) { // 条件边:验证映射中的所有目标节点 for(String targetNodeId : target.value().mappings().values()) { if(!nodes.anyMatchById(targetNodeId) && !END.equals(targetNodeId)) { throwErrors.missingNodeInEdgeMapping.exception(targetNodeId); } } } } }}


3.5 CompiledGraph (编译图)


CompiledGraph 是工作流的执行引擎,它将 StateGraph 的静态定义转换为高效的运行时代码。就像将高级语言编译成机器码一样,CompiledGraph 对工作流进行了各种优化,包括节点预处理、边路由优化、状态管理策略等。


运行时优化:CompiledGraph 在编译过程中会进行多种优化,如节点依赖分析、并行执行规划、状态访问优化等,这些优化确保了工作流在运行时的高效性和稳定性。


3.5.1 编译过程详解



3.5.2 AsyncNodeGenerator 执行机制


AsyncNodeGenerator 是工作流执行的核心状态机,它负责推动整个工作流的运行。AsyncNodeGenerator 采用了基于迭代器的设计模式,每次调用 next() 方法都会执行一个步骤,这种设计既支持同步执行,也支持异步流式处理。


执行控制:AsyncNodeGenerator 内置了完善的执行控制机制,包括最大迭代次数检查、中断条件处理、错误恢复等,确保工作流在各种情况下都能稳定运行。



3.5.3 状态流转核心逻辑


publicclassAsyncNodeGenerator<Output extends NodeOutput>implementsAsyncGenerator<Output> {
@Override publicData<Output> next() { try{ // 1. 检查最大迭代次数 if(++iteration > maxIterations) { returnData.error(new IllegalStateException( format("Maximum number of iterations (%d) reached!", maxIterations))); }
// 2. 检查是否结束 if(nextNodeId ==null&& currentNodeId ==null) { returnreleaseThread().map(Data::<Output>done) .orElseGet(() -> Data.done(currentState)); }
// 3. 处理START节点 if(START.equals(currentNodeId)) { doListeners(START,null); varnextNodeCommand = getEntryPoint(currentState, config); nextNodeId = nextNodeCommand.gotoNode(); currentState = nextNodeCommand.update();
varcp = addCheckpoint(config, START, currentState, nextNodeId); varoutput = (cp.isPresent() && config.streamMode() == StreamMode.SNAPSHOTS) ? buildStateSnapshot(cp.get()) : buildNodeOutput(currentNodeId);
currentNodeId = nextNodeId; returnData.of(output); }
// 4. 处理END节点 if(END.equals(nextNodeId)) { nextNodeId =null; currentNodeId =null; doListeners(END,null); returnData.of(buildNodeOutput(END)); }
// 5. 检查中断条件 if(shouldInterruptAfter(currentNodeId, nextNodeId)) { returnData.done(currentNodeId); } if(shouldInterruptBefore(nextNodeId, currentNodeId)) { returnData.done(currentNodeId); }
// 6. 执行节点 currentNodeId = nextNodeId; varaction = nodes.get(currentNodeId); returnData.of(evaluateAction(action, overAllState));
}catch(Exception e) { returnData.error(e); } }}


4. 预定义组件与工具箱

Cloud Native


4.1 预定义节点类型


Spring AI Alibaba Graph 提供了丰富的预定义节点工具箱,这些节点就像乐高积木一样,开发者可以通过组合这些预定义节点快速构建复杂的 AI 应用。每个预定义节点都经过了精心设计和优化,不仅功能强大,而且易于使用。


设计理念:预定义节点的设计遵循了"开箱即用"的原则,开发者只需要提供必要的配置参数,就能立即使用这些节点的强大功能,大大降低了 AI 应用的开发门槛。


4.1.1 节点分类架构



4.1.2 QuestionClassifierNode - 智能分类节点


QuestionClassifierNode 是工作流的智能分拣员,它能够理解文本内容并将其归类到预定义的类别中。这个节点内置了少样本学习机制,即使没有大量训练数据,也能实现准确的分类效果。


核心优势:QuestionClassifierNode 采用了提示工程的最佳实践,通过精心设计的提示词模板和少样本示例,让大语言模型能够准确理解分类任务的要求,实现高质量的文本分类。



应用场景:QuestionClassifierNode 特别适合客服系统的问题分类、内容审核的类型判断、邮件的自动分拣等场景,能够显著提高业务处理的自动化程度。


QuestionClassifierNodeclassifier=QuestionClassifierNode.builder().chatClient(chatClient).inputTextKey("input").outputKey("classifier_output").categories(List.of("positivefeedback","negativefeedback")).classificationInstructions(List.of("Trytounderstandtheuser'sfeelingwhengivingfeedback.")).build();


核心实现原理:


@OverridepublicMap<String,Object>apply(OverAllStatestate) throwsException{ // 1. 从状态获取输入文本 if(StringUtils.hasLength(inputTextKey)) {   this.inputText= (String) state.value(inputTextKey).orElse(this.inputText);  }
// 2. 构建少样本学习消息 List<Message> messages =newArrayList<>(); messages.add(newUserMessage(QUESTION_CLASSIFIER_USER_PROMPT_1)); messages.add(newAssistantMessage(QUESTION_CLASSIFIER_ASSISTANT_PROMPT_1)); messages.add(newUserMessage(QUESTION_CLASSIFIER_USER_PROMPT_2)); messages.add(newAssistantMessage(QUESTION_CLASSIFIER_ASSISTANT_PROMPT_2));
// 3. 调用大模型进行分类 ChatResponseresponse = chatClient.prompt() .system(systemPromptTemplate.render(Map.of( "inputText", inputText, "categories", categories, "classificationInstructions", classificationInstructions))) .user(inputText) .messages(messages) .call() .chatResponse();
// 4. 返回分类结果 Map<String,Object> updatedState =newHashMap<>(); updatedState.put(outputKey, response.getResult().getOutput().getText()); returnupdatedState;}


4.1.3 LlmNode - 大模型调用节点


LlmNode 是工作流的智能大脑,它封装了与大语言模型的所有交互逻辑,让开发者可以轻松地在工作流中使用 AI 的强大能力。LlmNode 不仅支持简单的文本生成,还支持复杂的对话管理和流式输出。


智能特性:LlmNode 内置了提示词模板引擎,支持动态参数替换,还能管理完整的对话历史,这些特性让它能够处理各种复杂的 AI 交互场景。



流式处理优势:LlmNode 原生支持流式输出,这意味着用户可以实时看到 AI 的生成过程,而不需要等待完整的响应,大大提升了用户体验。


LlmNodellmNode=LlmNode.builder().chatClient(chatClient).systemPromptTemplate("Youareahelpfulassistant.").userPromptTemplate("leaseprocess:{input}").messagesKey("messages").outputKey("llm_response").stream(true)//启用流式输出.build();


核心特性:


  • 模板支持:支持系统提示词和用户提示词模板。
  • 消息历史:支持消息历史管理。
  • 流式输出:原生支持流式处理。
  • 参数渲染:支持动态参数替换。


4.1.4 ToolNode - 工具调用节点


ToolNode 是工作流的万能工具箱,它让 AI 能够调用外部工具和 API,极大地扩展了 AI 的能力边界。ToolNode 不仅能执行单个工具调用,还能并行执行多个工具,显著提高了处理效率。


核心价值:ToolNode 将 AI 从纯文本生成扩展到了实际的行动能力,让 AI 能够查询数据库、调用 API、执行计算等,真正实现了 AI Agent 的概念。



灵活性设计:ToolNode 支持各种类型的工具调用,从简单的函数调用到复杂的 API 集成,都能轻松处理,这种灵活性让 AI 应用能够适应各种业务场景。


ToolNodetoolNode=ToolNode.builder().toolCallbacks(toolCallbacks).llmResponseKey("llm_response").outputKey("tool_response").build();


执行机制:


@OverridepublicMap<String,Object>apply(OverAllStatestate) throwsException{ // 1. 获取助手消息(包含工具调用) this.assistantMessage= (AssistantMessage) state.value(this.llmResponseKey)    .orElseGet(() -> {     List<Message> messages = (List<Message>) state.value("messages").orElseThrow();     returnmessages.get(messages.size() -1);    });
// 2. 执行工具调用 ToolResponseMessagetoolResponseMessage =executeFunction(assistantMessage, state);
// 3. 返回工具响应 Map<String,Object> updatedState =newHashMap<>(); updatedState.put("messages", toolResponseMessage); if(StringUtils.hasLength(this.outputKey)) { updatedState.put(this.outputKey, toolResponseMessage); } returnupdatedState;}


4.1.5 KnowledgeRetrievalNode - 知识检索节点


KnowledgeRetrievalNode 是工作流的知识专家,它能够从庞大的知识库中快速找到与问题相关的信息,为 AI 提供准确的背景知识。这个节点结合了向量检索和重排序技术,确保检索结果的准确性和相关性。


技术优势:KnowledgeRetrievalNode 采用了先进的 RAG(检索增强生成)技术,通过向量相似度计算找到相关文档,再通过重排序模型进一步优化结果质量,这种两阶段的设计确保了检索的精准性。



应用价值:KnowledgeRetrievalNode 让 AI 能够基于企业的私有知识库回答问题,这对于构建企业级 AI 助手、智能客服等应用具有重要意义。


KnowledgeRetrievalNoderetrievalNode=KnowledgeRetrievalNode.builder().vectorStore(vectorStore).userPromptKey("query").topK(5).similarityThreshold(0.7).enableRanker(true).rerankModel(rerankModel).outputKey("retrieved_docs").build();


4.2 预定义 Agent 类型


4.2.1 ReactAgent - 反应式 Agent


ReactAgent 是工作流的智能决策者,它实现了经典的 ReAct(Reasoning and Acting)模式,能够根据当前情况动态决定是否需要调用工具。ReactAgent 就像一个有经验的助手,知道什么时候需要查找信息,什么时候可以直接回答。


核心思想:ReactAgent 将推理和行动结合在一起,让 AI 不仅能思考,还能行动。这种设计让 AI 具备了解决复杂问题的能力,能够通过多轮推理和工具调用来完成复杂任务。



智能循环:ReactAgent 的执行过程是一个智能循环,每次循环都会评估当前状态,决定下一步行动,这种设计让 AI 能够处理各种复杂和动态的任务场景。


ReactAgentreactAgent=newReactAgent( "weatherAgent",  chatClient,  toolCallbacks, 10// 最大迭代次数);
// 编译并使用CompiledGraphcompiledGraph=reactAgent.getAndCompileGraph();


内部图结构构建:


privateStateGraph initGraph() throws GraphStateException {  StateGraph graph = new StateGraph(name,this.keyStrategyFactory);
// 添加核心节点 graph.addNode("llm", node_async(this.llmNode)); graph.addNode("tool", node_async(this.toolNode));
// 构建执行流程 graph.addEdge(START,"llm") .addConditionalEdges("llm", edge_async(this::think), Map.of("continue","tool","end", END)) .addEdge("tool","llm");
returngraph;}
// 决策逻辑privateString think(OverAllState state) { if(iterations > max_iterations) { return"end"; }
List<Message> messages = (List<Message>) state.value("messages").orElseThrow(); AssistantMessage message = (AssistantMessage) messages.get(messages.size() -1);
// 检查是否有工具调用 returnmessage.hasToolCalls() ?"continue":"end";}


4.2.2 ReflectAgent - 反思 Agent


ReflectAgent 是工作流的质量监督者,它实现了反思模式,能够对自己的输出进行评估和改进。ReflectAgent 就像一个严格的编辑,会反复检查和修改内容,直到达到满意的质量标准。


自我改进机制:ReflectAgent 采用了双节点协作的设计,一个节点负责生成内容,另一个节点负责评估质量,通过多轮迭代不断提升输出质量。这种设计让 AI 具备了自我完善的能力。



质量保证:ReflectAgent 特别适合对输出质量要求较高的场景,如文档写作、代码生成、创意内容等,通过反思机制确保最终输出的质量。


ReflectAgentreflectAgent=ReflectAgent.builder().graph(assistantGraphNode)//生成节点.reflection(judgeGraphNode)//评判节点.maxIterations(3).build();


执行流程详解:


publicStateGraphcreateReflectionGraph(NodeAction graph, NodeAction reflection, int maxIterations) { StateGraphstateGraph =newStateGraph(() -> {   HashMap<String,KeyStrategy> keyStrategyHashMap =newHashMap<>();    keyStrategyHashMap.put(MESSAGES,newReplaceStrategy());    keyStrategyHashMap.put(ITERATION_NUM,newReplaceStrategy());   returnkeyStrategyHashMap;  })  .addNode(GRAPH_NODE_ID,node_async(graph))  .addNode(REFLECTION_NODE_ID,node_async(reflection))  .addEdge(START,GRAPH_NODE_ID)  .addConditionalEdges(GRAPH_NODE_ID,edge_async(this::graphCount),   Map.of(REFLECTION_NODE_ID,REFLECTION_NODE_ID,END,END))  .addConditionalEdges(REFLECTION_NODE_ID,edge_async(this::apply),   Map.of(GRAPH_NODE_ID,GRAPH_NODE_ID,END,END));
returnstateGraph;}
// 迭代次数检查privateStringgraphCount(OverAllState state) { int iterationNum = state.value(ITERATION_NUM,Integer.class).orElse(0); state.updateState(Map.of(ITERATION_NUM, iterationNum +1));
returniterationNum >= maxIterations ?END:REFLECTION_NODE_ID;}
// 消息类型检查privateStringapply(OverAllState state) { List<Message> messages = state.value(MESSAGES,List.class).orElse(newArrayList<>()); if(messages.isEmpty())returnEND;
MessagelastMessage = messages.get(messages.size() -1); returnlastMessageinstanceofUserMessage?GRAPH_NODE_ID:END;}


4.2.3 ReactAgentWithHuman - 人机协作 Agent


ReactAgentWithHuman 是工作流的人机协作专家,它在 ReactAgent 的基础上增加了人工干预能力,让 AI 和人类能够协作完成复杂任务。这种设计特别适合需要人工审核、决策确认或专业判断的场景。


协作机制:ReactAgentWithHuman 内置了完善的中断和恢复机制,当遇到需要人工干预的情况时,系统会自动暂停执行,等待人工处理,然后无缝恢复执行。这种设计让人机协作变得自然而流畅。



人机协作实现:


privateStateGraphinitGraph()throwsGraphStateException { StateGraphgraph=newStateGraph(name, keyStrategyFactory)    .addNode("agent", node_async(this.llmNode))    .addNode("human", node_async(this.humanNode))    .addNode("tool", node_async(this.toolNode))    .addEdge(START,"agent")    .addEdge("agent","human")    .addConditionalEdges("human", edge_async(humanNode::think),      Map.of("agent","agent","tool","tool","end", END))    .addEdge("tool","agent");
returngraph;}
// HumanNode的决策逻辑publicStringthink(OverAllState state){ // 检查是否需要中断 if(shouldInterruptFunc !=null&& shouldInterruptFunc.apply(state)) { // 设置中断消息,等待人工处理 state.setInterruptMessage("等待人工审批"); return"human_interrupt"; }
// 检查是否需要工具调用 List<Message> messages = (List<Message>) state.value("messages").orElse(newArrayList<>()); if(!messages.isEmpty()) { MessagelastMessage=messages.get(messages.size() -1); if(lastMessageinstanceofAssistantMessage && ((AssistantMessage) lastMessage).hasToolCalls()) { return"tool"; } }
return"agent";}


5. 高级特性与扩展能力

Cloud Native


5.1 可观测性


Spring AI Alibaba Graph 提供了企业级的全链路观测能力,基于 OpenTelemetry 和 Micrometer 标准,实现了从 Graph 执行到模型调用的完整追踪。


5.1.1 核心特性


  • 全链路可观测:实时追踪每个节点的输入、输出和状态变化。
  • 流式数据采集:支持异步、并行、流式节点的观测。
  • 异常溯源:快速定位异常节点和数据。
  • 多平台支持:兼容 Langfuse、Jaeger、Zipkin、Prometheus 等主流平台。


5.1.2 快速接入


使用观测性 Starter


<dependency> <groupId>com.alibaba.cloud.ai</groupId> <artifactId>spring-ai-alibaba-starter-graph-observation</artifactId> <version>${spring-ai-alibaba.version}</version></dependency>
@BeanpublicCompiledGraphcompiledGraph(StateGraphobservabilityGraph,CompileConfigobservationCompileConfig)throwsGraphStateException{returnobservabilityGraph.compile(observationCompileConfig);}


5.1.3 详细文档


关于 Spring AI Alibaba Graph 观测性的完整架构设计、实现原理、配置方式、最佳实践等详细内容,请参考官方观测性文档:


📚Graph 观测性完整指南

Spring AI Alibaba Graph 观测性设计与实现【2】


该文档涵盖:


  • 观测性设计理念与架构
  • 并行与流式观测实现
  • 多平台集成配置
  • Langfuse 等可视化平台使用
  • 最佳实践与扩展建议


🔗完整示例代码

graph-observability-langfuse【3】


5.2 并行节点与流式处理


5.2.1 并行节点的两种创建方式


Spring AI Alibaba Graph 提供了两种创建并行节点的方式,这两种方式在底层实现上有所不同,但都能实现并行处理的效果。


方式一:直接创建 ParallelNode

直接创建一个 ParallelNode 实例,并将其注册到 StateGraph 中:


// 创建并行任务列表List<AsyncNodeActionWithConfig> parallelActions =List.of( node_async(newDataProcessingNode1()), node_async(newDataProcessingNode2()), node_async(newDataProcessingNode3()));
// 定义状态合并策略Map<String,KeyStrategy> channels =Map.of( "results",newAppendStrategy(), "metadata",newReplaceStrategy());
// 创建并行节点ParallelNodeparallelNode =newParallelNode( "data_processing", // 节点内部ID parallelActions, // 并行任务列表 channels // KeyStrategy映射);
// 添加到StateGraphstateGraph.addNode("parallel_tasks", parallelNode);


方式二:通过 StateGraph 描述并行边

这是更常用的方式,通过添加多个指向相同目标的边来定义并行结构:


StateGraphworkflow=newStateGraph(keyStrategyFactory)  .addNode("source", node_async(sourceNode))  .addNode("task1", node_async(task1Node))  .addNode("task2", node_async(task2Node))  .addNode("task3", node_async(task3Node))  .addNode("merger", node_async(mergerNode))
// 创建并行分支 - 从source到多个任务 .addEdge("source","task1") .addEdge("source","task2") .addEdge("source","task3")
// 汇聚到merger节点 .addEdge("task1","merger") .addEdge("task2","merger") .addEdge("task3","merger")
.addEdge(START,"source") .addEdge("merger", END);


编译时转换机制


当 StateGraph 编译时,框架会自动检测并行边模式,并在内部创建 ParallelNode:


// CompiledGraph编译过程中的处理逻辑if(targets.size() >1) { // 检测到并行边,获取所有并行目标节点的Action varactions = parallelNodeStream.get()    .map(target -> nodes.get(target.id()))    .toList();
// 自动创建ParallelNode varparallelNode =newParallelNode(e.sourceId(), actions, keyStrategyMap);
// 替换原有节点和边的映射 nodes.put(parallelNode.id(), parallelNode.actionFactory().apply(compileConfig)); edges.put(e.sourceId(),newEdgeValue(parallelNode.id()));}


5.2.2 并行节点的内部执行机制


ParallelNode 的核心实现基于 CompletableFuture.allOf(),实现真正的并行执行:


publicclassParallelNodeextendsNode{
recordAsyncParallelNodeAction( List<AsyncNodeActionWithConfig> actions, Map<String,KeyStrategy> channels )implementsAsyncNodeActionWithConfig{
@Override publicCompletableFuture<Map<String,Object>>apply(OverAllState state, RunnableConfig config) { Map<String,Object> partialMergedStates =newHashMap<>(); Map<String,Object> asyncGenerators =newHashMap<>();
// 并行执行所有Action varfutures = actions.stream() .map(action -> action.apply(state, config) .thenApply(partialState -> { // 分离普通结果和AsyncGenerator partialState.forEach((key, value) -> { if(valueinstanceofAsyncGenerator<?> || valueinstanceofGeneratorSubscriber) { ((List) asyncGenerators.computeIfAbsent(key, k ->newArrayList<>())).add(value); }else{ partialMergedStates.put(key, value); } }); // 立即更新状态 state.updateState(partialMergedStates); returnaction; })) .toList() .toArray(newCompletableFuture[0]);
// 等待所有任务完成 returnCompletableFuture.allOf(futures) .thenApply((p) ->CollectionUtils.isEmpty(asyncGenerators) ? state.data() : asyncGenerators); } }}


5.2.3 并行流式处理的合并机制


核心挑战:当多个并行分支都产生流式输出时,如何将这些异步流合并成统一的输出流?


Spring AI Alibaba Graph 通过AsyncGeneratorUtils.createMergedGenerator框架内核中解决了这个复杂问题:



5.2.4 MergedGenerator 核心实现


AsyncGeneratorUtils.createMergedGenerator是框架内核的核心算法,实现了多个异步流的智能合并:


publicstatic<T>AsyncGenerator<T>createMergedGenerator(  List<AsyncGenerator<T>> generators, Map<String, KeyStrategy> keyStrategyMap) {
returnnewAsyncGenerator<>() { // 使用StampedLock优化并发性能 privatefinalStampedLocklock =newStampedLock(); privateAtomicIntegerpollCounter =newAtomicInteger(0); privateMap<String,Object> mergedResult =newHashMap<>(); privatefinalList<AsyncGenerator<T>> activeGenerators =newCopyOnWriteArrayList<>(generators);
@Override publicAsyncGenerator.Data<T>next() { while(true) { // 乐观读锁快速检查 long stamp = lock.tryOptimisticRead(); booleanempty = activeGenerators.isEmpty(); if(!lock.validate(stamp)) { stamp = lock.readLock(); try{ empty = activeGenerators.isEmpty(); }finally{ lock.unlockRead(stamp); } } if(empty) { returnAsyncGenerator.Data.done(mergedResult); }
// 轮询策略选择Generator finalAsyncGenerator<T> current; long writeStamp = lock.writeLock(); try{ final int size = activeGenerators.size(); if(size ==0)returnAsyncGenerator.Data.done(mergedResult);
int currentIdx = pollCounter.updateAndGet(i -> (i +1) % size); current = activeGenerators.get(currentIdx); }finally{ lock.unlockWrite(writeStamp); }
// 在无锁状态下执行Generator AsyncGenerator.Data<T> data = current.next();
// 处理结果并更新状态 writeStamp = lock.writeLock(); try{ if(!activeGenerators.contains(current)) { continue; }
if(data.isDone() || data.isError()) { handleCompletedGenerator(current, data); if(activeGenerators.isEmpty()) { returnAsyncGenerator.Data.done(mergedResult); } continue; }
handleCompletedGenerator(current, data); returndata; }finally{ lock.unlockWrite(writeStamp); } } }
privatevoidhandleCompletedGenerator(AsyncGenerator<T> generator, AsyncGenerator.Data<T> data) { // 移除完成的Generator if(data.isDone() || data.isError()) { activeGenerators.remove(generator); }
// 使用KeyStrategy合并结果 data.resultValue().ifPresent(result -> { if(resultinstanceofMap) { Map<String,Object> mapResult = (Map<String,Object>) result; mergedResult =OverAllState.updateState(mergedResult, mapResult, keyStrategyMap); } }); } };}


核心算法特点


  • 轮询机制:通过 pollCounter 实现公平的轮询调度。
  • StampedLock 优化:使用乐观读锁提高并发性能。
  • 状态合并:通过 KeyStrategy 实现灵活的状态合并策略。
  • 线程安全:CopyOnWriteArrayList 确保并发访问的安全性。


5.2.5 流式输出配置


@RestController@RequestMapping("/stream")publicclassStreamingController{
privatefinalCompiledGraphcompiledGraph;
@GetMapping(value ="/process", produces =MediaType.TEXT_EVENT_STREAM_VALUE) publicFlux<ServerSentEvent<String>>processStream(@RequestParamStringinput) { returnFlux.create(sink -> { try{ AsyncGenerator<NodeOutput> generator = compiledGraph.stream( Map.of("input", input), RunnableConfig.builder() .threadId(UUID.randomUUID().toString()) .build() );
generator.forEachAsync(output -> { if(outputinstanceofStreamingOutput) { StreamingOutputstreamingOutput = (StreamingOutput) output; Stringchunk = streamingOutput.chunk().toString(); sink.next(ServerSentEvent.builder(chunk).build()); } }).thenRun(() -> { sink.complete(); }).exceptionally(throwable -> { sink.error(throwable); returnnull; });
}catch(Exceptione) { sink.error(e); } }); }}


5.3 子图节点


子图节点是工作流的模块化组件,它允许将复杂的工作流分解为可重用的子模块。子图节点就像函数调用一样,可以在主工作流中调用预定义的子工作流,实现代码复用和模块化设计。


5.3.1 子图节点类型


Spring AI Alibaba Graph 支持两种类型的子图节点:


SubStateGraphNode - 未编译子图节点

publicclassSubStateGraphNodeextendsNode{ privatefinalStateGraph subGraph;
publicSubStateGraphNode(String id, StateGraph subGraph){ super(id, (config) -> { // 在运行时编译子图 CompiledGraphcompiledSubGraph=subGraph.compile(config); returnnewSubGraphAction(compiledSubGraph); }); this.subGraph = subGraph; }}

SubCompiledGraphNode - 预编译子图节点

publicclassSubCompiledGraphNodeextendsNode{ privatefinalCompiledGraph subGraph;
publicSubCompiledGraphNode(String id, CompiledGraph subGraph){ super(id, (config) ->newSubGraphAction(subGraph)); this.subGraph = subGraph; }}

5.3.2 子图定义与使用


定义文档处理子图


publicclassDocumentProcessingSubGraph{
publicstaticStateGraphcreateDocumentProcessingGraph(ChatModel chatModel){ ChatClientchatClient=ChatClient.builder(chatModel).build();
// 文档提取节点 DocumentExtractorNodeextractorNode=newDocumentExtractorNode( "document_path","extracted_text", List.of("pdf","docx","txt") );
// 文档分析节点 LlmNodeanalysisNode=LlmNode.builder() .chatClient(chatClient) .systemPromptTemplate("你是一个文档分析专家,请分析文档内容并提取关键信息。") .userPromptTemplate("请分析以下文档内容:\n{extracted_text}") .outputKey("analysis_result") .build();
KeyStrategyFactorystateFactory=() -> { Map<String, KeyStrategy> strategies =newHashMap<>(); strategies.put("document_path",newReplaceStrategy()); strategies.put("extracted_text",newReplaceStrategy()); strategies.put("analysis_result",newReplaceStrategy()); returnstrategies; };
returnnewStateGraph("文档处理子图", stateFactory) .addNode("extractor", node_async(extractorNode)) .addNode("analyzer", node_async(analysisNode)) .addEdge(START,"extractor") .addEdge("extractor","analyzer") .addEdge("analyzer", END); }}


在主工作流中使用子图


@ConfigurationpublicclassMainWorkflowConfiguration{
@Bean publicStateGraphmainWorkflow(ChatModel chatModel){ // 创建子图 StateGraphdocumentProcessingSubGraph=DocumentProcessingSubGraph .createDocumentProcessingGraph(chatModel);
// 创建其他节点 QuestionClassifierNodeclassifierNode=QuestionClassifierNode.builder() .chatClient(ChatClient.builder(chatModel).build()) .inputTextKey("input") .outputKey("classifier_output") .categories(List.of("document_processing","general_question")) .build();
LlmNodegeneralAnswerNode=LlmNode.builder() .chatClient(ChatClient.builder(chatModel).build()) .systemPromptTemplate("你是一个通用助手,请回答用户的问题。") .userPromptTemplate("用户问题:{input}") .outputKey("general_answer") .build();
KeyStrategyFactorystateFactory=() -> { Map<String, KeyStrategy> strategies =newHashMap<>(); strategies.put("input",newReplaceStrategy()); strategies.put("classifier_output",newReplaceStrategy()); strategies.put("document_path",newReplaceStrategy()); strategies.put("extracted_text",newReplaceStrategy()); strategies.put("analysis_result",newReplaceStrategy()); strategies.put("general_answer",newReplaceStrategy()); returnstrategies; };
returnnewStateGraph("主工作流", stateFactory) .addNode("classifier", node_async(classifierNode)) .addNode("document_processor", documentProcessingSubGraph) // 添加子图 .addNode("general_answer", node_async(generalAnswerNode)) .addEdge(START,"classifier") .addConditionalEdges("classifier", edge_async(newClassifierDispatcher()), Map.of("document_processing","document_processor", "general_question","general_answer")) .addEdge("document_processor", END) .addEdge("general_answer", END); }}


5.3.3 子图执行流程



5.3.4 子图状态管理


状态隔离与传递


publicclassSubGraphActionimplementsAsyncNodeActionWithConfig{ privatefinalCompiledGraphsubGraph;
@Override publicCompletableFuture<Map<String,Object>>apply(OverAllState state, RunnableConfig config) { returnCompletableFuture.supplyAsync(() -> { try{ // 从主状态中提取子图需要的数据 Map<String,Object> subGraphInput =extractSubGraphInput(state);
// 执行子图 Optional<OverAllState> subGraphResult = subGraph.invoke(subGraphInput, config);
// 将子图结果映射回主状态 returnmapSubGraphOutput(subGraphResult.orElse(null));
}catch(Exceptione) { thrownewRuntimeException("子图执行失败", e); } }); }
privateMap<String,Object>extractSubGraphInput(OverAllState state) { Map<String,Object> input =newHashMap<>(); // 根据子图的输入需求提取数据 state.value("document_path").ifPresent(value -> input.put("document_path", value)); state.value("input").ifPresent(value -> input.put("input", value)); returninput; }
privateMap<String,Object>mapSubGraphOutput(OverAllState subGraphState) { Map<String,Object> output =newHashMap<>(); if(subGraphState !=null) { // 将子图的输出映射到主状态 subGraphState.value("analysis_result").ifPresent(value -> output.put("analysis_result", value)); subGraphState.value("extracted_text").ifPresent(value -> output.put("extracted_text", value)); } returnoutput; }}


5.4 中断与恢复机制


中断与恢复机制是工作流的容错保障,它让工作流能够在遇到需要人工干预或外部条件不满足时优雅地暂停执行,并在条件满足后无缝恢复。这种机制对于构建可靠的企业级 AI 应用至关重要。


5.4.1 中断机制原理



5.4.2 中断条件配置


InterruptBefore - 节点执行前中断


@ConfigurationpublicclassInterruptConfiguration{
@Bean publicCompiledGraphinterruptableGraph(StateGraph stateGraph) { returnstateGraph.compile(CompileConfig.builder() .withInterruptBefore("human_approval") // 在human_approval节点前中断 .build()); }}


InterruptAfter - 节点执行后中断


@BeanpublicCompiledGraphinterruptableGraph(StateGraphstateGraph){returnstateGraph.compile(CompileConfig.builder().withInterruptAfter("data_processing")//在data_processing节点后中断.build());}


动态中断条件


publicclassDynamicInterruptNodeimplementsAsyncNodeActionWithConfig{
@Override publicCompletableFuture<Map<String,Object>>apply(OverAllState state, RunnableConfig config) { returnCompletableFuture.supplyAsync(() -> { // 检查是否需要中断 if(shouldInterrupt(state)) { // 设置中断消息 state.setInterruptMessage("需要人工审批,请检查数据质量");
Map<String,Object> result =newHashMap<>(); result.put("interrupt_reason","data_quality_check"); result.put("requires_approval",true); returnresult; }
// 正常处理逻辑 returnprocessData(state); }); }
privatebooleanshouldInterrupt(OverAllState state) { // 自定义中断条件逻辑 Doubleconfidence = (Double) state.value("confidence_score").orElse(1.0); returnconfidence <0.8; // 置信度低于80%时中断 }}


5.4.3 状态快照管理


内存快照存储


@ComponentpublicclassMemorySnapshotManager{
privatefinalMap<String,OverAllState> snapshots =newConcurrentHashMap<>();
publicStringsaveSnapshot(OverAllState state) { StringsnapshotId =UUID.randomUUID().toString(); snapshots.put(snapshotId, state.snapShot().orElse(state)); returnsnapshotId; }
publicOverAllStateloadSnapshot(StringsnapshotId) { OverAllStatesnapshot = snapshots.get(snapshotId); if(snapshot ==null) { thrownewIllegalArgumentException("快照不存在: "+ snapshotId); } returnsnapshot; }
publicvoidremoveSnapshot(StringsnapshotId) { snapshots.remove(snapshotId); }}


持久化快照存储


@ComponentpublicclassPersistentSnapshotManager{
privatefinalRedisTemplate<String,String> redisTemplate; privatefinalObjectMapperobjectMapper;
publicStringsaveSnapshot(OverAllState state) { try{ StringsnapshotId =UUID.randomUUID().toString(); StringserializedState = objectMapper.writeValueAsString(state);
redisTemplate.opsForValue().set( "snapshot:"+ snapshotId, serializedState, Duration.ofHours(24) // 24小时过期 );
returnsnapshotId; }catch(Exceptione) { thrownewRuntimeException("保存快照失败", e); } }
publicOverAllStateloadSnapshot(StringsnapshotId) { try{ StringserializedState = redisTemplate.opsForValue().get("snapshot:"+ snapshotId); if(serializedState ==null) { thrownewIllegalArgumentException("快照不存在: "+ snapshotId); }
returnobjectMapper.readValue(serializedState,OverAllState.class); }catch(Exceptione) { thrownewRuntimeException("加载快照失败", e); } }}


6. 快速开始与实战指南

Cloud Native


6.1 环境准备


6.1.1 依赖配置


在你的 Spring Boot 项目中添加 Spring AI Alibaba Graph 依赖:


<properties> <spring-ai-alibaba.version>1.0.0.3-SNAPSHOT</spring-ai-alibaba.version></properties><dependencies> <dependency>   <groupId>com.alibaba.cloud.ai</groupId>   <artifactId>spring-ai-alibaba-starter-dashscope</artifactId>   <version>${spring-ai-alibaba.version}</version> </dependency> <dependency>   <groupId>com.alibaba.cloud.ai</groupId>   <artifactId>spring-ai-alibaba-graph-core</artifactId>   <version>${spring-ai-alibaba.version}</version> </dependency> <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId> </dependency></dependencies>

6.2 快速开始流程


6.2.1 创建第一个工作流


@ConfigurationpublicclassMyFirstGraphConfiguration{
@Bean publicStateGraphmyFirstGraph(ChatModel chatModel) { // 1. 创建ChatClient ChatClientchatClient =ChatClient.builder(chatModel).build();
// 2. 定义节点 LlmNodewelcomeNode =LlmNode.builder() .chatClient(chatClient) .systemPromptTemplate("你是一个友好的助手") .userPromptTemplate("欢迎用户:{input}") .outputKey("welcome_message") .build();
// 3. 定义状态策略 KeyStrategyFactorystateFactory = () -> { Map<String,KeyStrategy> strategies =newHashMap<>(); strategies.put("input",newReplaceStrategy()); strategies.put("welcome_message",newReplaceStrategy()); returnstrategies; };
// 4. 构建工作流 returnnewStateGraph("我的第一个工作流", stateFactory) .addNode("welcome",node_async(welcomeNode)) .addEdge(START,"welcome") .addEdge("welcome",END); }
@Bean publicCompiledGraphcompiledGraph(StateGraph myFirstGraph) { returnmyFirstGraph.compile(); }}


6.2.2 使用工作流


@RestControllerpublicclassGraphController{
privatefinalCompiledGraphcompiledGraph;
@PostMapping("/chat") publicResponseEntity<Map<String,Object>>chat(@RequestBodyStringinput) { Optional<OverAllState> result = compiledGraph.invoke(Map.of("input", input)); returnResponseEntity.ok(result.map(OverAllState::data).orElse(Map.of())); }}


6.3 完整示例项目


为了帮助开发者更好地理解和使用 Spring AI Alibaba Graph,我们提供了完整的示例项目:


📚官方示例仓库

spring-ai-alibaba-graph-example【1】


快速体验步骤


1. 克隆仓库

gitclonehttps://github.com/springaialibaba/spring-ai-alibaba-examples.gitcdspring-ai-alibaba-examples/spring-ai-alibaba-graph-example

2. 配置环境

#设置DashScopeAPIKeyexportAI_DASHSCOPE_API_KEY=your_api_key_here

3. 运行示例

mvnspring-boot:run

6.4 社区支持


技术支持


  • GitHub Issues
    提交问题和建议【4】
  • 官方文档
    完整文档站点【5】
  • 示例代码
    更多示例【6】


通过以上指南和完整的示例项目,你可以快速掌握 Spring AI Alibaba Graph 的使用方法,并在实际项目中高效地构建智能化应用。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ