在当今信息爆炸的时代,快速准确地获取知识变得尤为重要。传统的问答系统虽然能够处理一些简单问题,但在面对复杂问题时往往显得力不从心。为了解决这一痛点,我们开发了一款基于 LangGraph 的 RAG 多智能体工具,它能够高效地处理复杂问题,整合多源信息,并通过迭代步骤得出精准答案。今天,就让我们深入了解一下这个强大的工具。
一、引言:从简单的 RAG 到智能的多智能体 RAG 在项目开发初期,我们发现传统的“简单 RAG”方法存在诸多不足。简单 RAG 无法拆解复杂问题,只能在单一层面处理查询,无法深入分析每个步骤并得出统一结论;它缺乏对幻觉(即模型生成错误信息)或错误处理的能力,无法通过验证步骤纠正错误;此外,简单 RAG 系统也无法根据工作流条件动态使用工具、调用外部 API 或与数据库交互。
为了解决这些问题,我们引入了多智能体 RAG 研究系统。基于智能体的框架能够实现以下功能:
路由和工具使用 :路由智能体可以对用户的查询进行分类,并将流程引导到合适的节点或工具。例如,它可以判断文档是否需要完整总结、是否需要更详细的信息,或者问题是否超出范围。规划子步骤 :复杂查询通常需要被拆解成更小、更易管理的步骤。从一个查询出发,系统可以生成一系列执行步骤,以探索查询的不同方面并得出结论。比如,如果查询需要比较文档的两个不同部分,基于智能体的方法可以识别这种比较需求,分别检索两个来源,并在最终回应中将它们合并为比较分析。反思和错误纠正 :除了简单的回应生成,智能体方法还可以增加一个验证步骤,以应对潜在的幻觉、错误或未能准确回答用户查询的回应。此外,它还能够整合人工参与的自我纠正机制,将人类输入融入自动化流程。这种功能使基于智能体的 RAG 系统成为企业应用中更稳健、更可靠的解决方案,因为在企业场景中,可靠性至关重要。共享全局状态 :智能体工作流共享一个全局状态,简化了跨多步骤的状态管理。这个共享状态对于维持多智能体过程不同阶段的一致性至关重要。二、项目概览:构建智能问答的“大脑” (一)系统架构图 我们的系统包含两个核心部分:研究者子图和主图。研究者子图负责生成用于检索和重排向量数据库中 top-k 文档的不同查询;主图则包含主要工作流程,例如分析用户查询、生成完成任务所需的步骤、生成回应,并通过人工参与机制检查幻觉。
(二)文档处理与向量数据库构建 1. 文档解析 对于结构复杂的 PDF 文档,尤其是包含复杂布局的表格,选择合适的解析工具至关重要。许多库在处理复杂页面布局或表格结构的 PDF 时精度不足。为此,我们采用了 Docling 这一开源库,它能够高效地解析文档,并将内容导出为所需格式。Docling 支持从 PDF、DOCX、PPTX、XLSX、图片、HTML、AsciiDoc 和 Markdown 等多种常用文档格式读取和导出 Markdown 和 JSON 格式。它对 PDF 文档有全面的理解,包括表格结构、阅读顺序和页面布局,还支持对扫描 PDF 的 OCR 功能。
以下是使用 Docling 将 PDF 转换为 Markdown 格式的代码示例:
fromdocling.document_converterimportDocumentConverter logger.info("Starting document processing.") converter = DocumentConverter() markdown_document = converter.convert(source).document.export_to_markdown()2. 向量数据库构建 我们使用 Chroma 构建向量数据库,将句子存储为向量嵌入,并在数据库中进行搜索。我们将持久化数据库存储在本地目录 “db_vector” 中。通过 OpenAI 的嵌入模型,我们将文档列表转换为向量,并存储在 Chroma 中。
以下是构建向量数据库的代码:
fromlangchain_community.vectorstoresimportChroma fromlangchain_openaiimportOpenAIEmbeddings embd = OpenAIEmbeddings() vectorstore_from_documents = Chroma.from_documents( documents=docs_list, collection_name="rag-chroma-google-v1", embedding=embd, persist_directory='db_vector' )(三)主图构建与状态管理 LangGraph 的核心概念之一是状态。每个图执行都会创建一个状态,该状态在图的节点执行时传递,并在每个节点执行后更新内部状态。
我们定义了两个类:Router 和 GradeHallucinations,分别用于存储用户查询的分类结果和回应中幻觉的存在与否。基于这些状态,我们构建了输入状态(InputState)和智能体状态(AgentState),其中 AgentState 包含用户查询的分类、研究计划的步骤列表、智能体可以引用的检索文档列表,以及幻觉的二进制评分。
以下是状态类的定义代码:
frompydanticimportBaseModel, Field fromtypingimportLiteral, TypedDict classRouter(TypedDict): """Classify user query.""" logic: str type: Literal["more-info","environmental","general"] classGradeHallucinations(BaseModel): """Binary score for hallucination present in generation answer.""" binary_score: str = Field(description="Answer is grounded in the facts, '1' or '0'")(四)工作流程详解 1. 第一步:分析和路由查询 这一步会更新智能体状态中的 Router 对象,其类型变量包含 “more-info”、“environmental” 或 “general” 中的一个值。根据这个信息,工作流将被路由到合适的节点,例如 “create_research_plan”、“ask_for_more_info” 或 “respond_to_general_query”。
以下是实现代码:
asyncdefanalyze_and_route_query( state: AgentState, *, config: RunnableConfig )-> dict[str, Router]: """Analyze the user's query and determine the appropriate routing.""" model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True) messages = [ {"role":"system","content": ROUTER_SYSTEM_PROMPT} ] + state.messages logging.info("---ANALYZE AND ROUTE QUERY---") response = cast( Router,awaitmodel.with_structured_output(Router).ainvoke(messages) ) return{"router": response}2. 第二步:创建研究计划 如果查询分类返回 “environmental”,用户的请求与文档相关,工作流将到达 “create_research_plan” 节点,该节点的功能是为回答与环境相关的查询创建逐步研究计划。
以下是实现代码:
asyncdefcreate_research_plan( state: AgentState, *, config: RunnableConfig )-> dict[str, list[str] | str]: """Create a step-by-step research plan for answering an environmental-related query.""" classPlan(TypedDict): """Generate research plan.""" steps: list[str] model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True) messages = [ {"role":"system","content": RESEARCH_PLAN_SYSTEM_PROMPT} ] + state.messages logging.info("---PLAN GENERATION---") response = cast(Plan,awaitmodel.with_structured_output(Plan).ainvoke(messages)) return{"steps": response["steps"],"documents":"delete"}3. 第三步:开展研究 这一步会从研究计划中取出第一个步骤,并调用研究者子图来执行研究。研究者子图会返回一系列文档片段,我们将在后续步骤中进一步处理。
以下是实现代码:
asyncdefconduct_research(state: AgentState)-> dict[str, Any]: """Execute the first step of the research plan.""" result =awaitresearcher_graph.ainvoke({"question": state.steps[0]}) # graph call directly docs = result["documents"] step = state.steps[0] logging.info(f"\n{len(docs)}documents retrieved in total for the step:{step}.") return{"documents": result["documents"],"steps": state.steps[1:]}4. 第四步:研究者子图构建 研究者子图包含查询生成和文档检索两个关键步骤。查询生成步骤会根据研究计划中的问题生成多个搜索查询,以帮助回答问题。文档检索步骤则使用混合搜索和 Cohere 重排技术,从向量数据库中检索相关文档。
以下是查询生成的代码:
asyncdefgenerate_queries( state: ResearcherState, *, config: RunnableConfig )-> dict[str, list[str]]: """Generate search queries based on the question.""" classResponse(TypedDict): queries: list[str] logger.info("---GENERATE QUERIES---") model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0) messages = [ {"role":"system","content": GENERATE_QUERIES_SYSTEM_PROMPT}, {"role":"human","content": state.question}, ] response = cast(Response,awaitmodel.with_structured_output(Response).ainvoke(messages)) queries = response["queries"] queries.append(state.question) logger.info(f"Queries:{queries}") return{"queries": response["queries"]}以下是文档检索和重排的代码:
def_setup_vectorstore()-> Chroma: """Set up and return the Chroma vector store instance.""" embeddings = OpenAIEmbeddings() returnChroma( collection_name=VECTORSTORE_COLLECTION, embedding_function=embeddings, persist_directory=VECTORSTORE_DIRECTORY ) # Create base retrievers retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K}) retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K}) retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K}) ensemble_retriever = EnsembleRetriever( retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25], weights=ENSEMBLE_WEIGHTS, ) # Set up Cohere re-ranking compressor = CohereRerank(top_n=2, model="rerank-english-v3.0") compression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=ensemble_retriever, )5. 第五步:检查是否完成 这一步通过检查研究计划中是否还有剩余步骤来确定研究过程是否完成。如果还有步骤,工作流将返回 “conduct_research” 节点继续执行;如果没有剩余步骤,则进入 “respond” 节点生成最终回应。
以下是实现代码:
defcheck_finished(state: AgentState)-> Literal["respond", "conduct_research"]: """Determine if the research process is complete.""" iflen(state.stepsor[]) >0: return"conduct_research" else: return"respond"6. 第六步:生成回应 这一步根据研究过程中检索到的文档和对话历史,生成对用户查询的最终回应。它利用语言模型将所有相关信息整合成一个全面的答案。
以下是实现代码:
asyncdefrespond( state: AgentState, *, config: RunnableConfig )-> dict[str, list[BaseMessage]]: """Generate the final response to the user's query.""" model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0) context = format_docs(state.documents) prompt = RESPONSE_SYSTEM_PROMPT.format(context=context) messages = [{"role":"system","content": prompt}] + state.messages response =awaitmodel.ainvoke(messages) return{"messages": [response]}7. 第七步:检查幻觉 这一步会分析语言模型生成的回应,判断其是否得到了检索到的文档事实的支持,并给出一个二进制评分结果。如果评分表明回应可能包含幻觉,工作流将被中断,并提示用户决定是否重新生成回应或结束流程。
以下是实现代码:
asyncdefcheck_hallucinations( state: AgentState, *, config: RunnableConfig )-> dict[str, Any]: """Analyze the response for hallucinations.""" model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True) system_prompt = CHECK_HALLUCINATIONS.format( documents=state.documents, generation=state.messages[-1] ) messages = [ {"role":"system","content": system_prompt} ] + state.messages logging.info("---CHECK HALLUCINATIONS---") response = cast(GradeHallucinations,awaitmodel.with_structured_output(GradeHallucinations).ainvoke(messages)) return{"hallucination": response}8. 第八步:人工审批(人工参与) 如果语言模型的回应未得到事实支持,可能包含幻觉,此时工作流将暂停,并将控制权交给用户。用户可以选择仅重新执行最后的生成步骤,而无需重新启动整个工作流,或者选择结束流程。这种人工参与机制确保了用户对整个过程的控制,避免了不必要的循环或不期望的操作。
以下是实现代码:
defhuman_approval(state: AgentState): _binary_score = state.hallucination.binary_score if_binary_score =="1": return"END" else: retry_generation = interrupt( { "question":"Is this correct?", "llm_output": state.messages[-1] } ) ifretry_generation =="y": print("Continue with retry...") return"respond" else: return"END"三、实战测试:多智能体 RAG 的强大能力 为了验证系统的性能,我们使用了一份关于谷歌环境可持续性战略的年度报告进行了测试。这份报告包含了丰富的数据和复杂的表格结构,非常适合用来测试系统的多步骤处理能力和文档解析功能。
(一)复杂问题测试 我们提出了一个复杂的问题:“检索新加坡第二个数据中心 2019 年和 2022 年的 PUE 效率值,以及 2023 年亚太地区的区域平均 CFE 值。”
系统成功地将这个问题拆解为多个步骤,并生成了相应的查询:
“查找新加坡第二个数据中心 2019 年和 2022 年的 PUE 效率值。” “查找 2023 年亚太地区的区域平均 CFE 值。” 通过检索和重排文档,系统最终给出了准确的答案:
新加坡第二个数据中心 2019 年的 PUE 效率值未提供,但 2022 年的 PUE 为 1.21。 2023 年亚太地区的区域平均 CFE 为 12%。 (二)与 ChatGPT 的对比测试 为了进一步验证系统的可靠性,我们将同样的问题提交给了 ChatGPT。结果发现,ChatGPT 返回的值是错误的,明显出现了幻觉现象。这表明,在处理复杂问题时,简单的语言模型可能会生成不准确的信息,而我们的多智能体 RAG 系统通过幻觉检查步骤能够有效避免这种情况。
四、技术挑战与展望:多智能体 RAG 的未来之路 尽管多智能体 RAG 在性能上有显著提升,但在实际应用中仍面临一些挑战:
延迟问题 :由于智能体交互的复杂性增加,响应时间可能会变长。如何在速度和准确性之间取得平衡是一个关键挑战。评估与可观测性 :随着多智能体 RAG 系统变得越来越复杂,持续的评估和可观测性变得必不可少。总的来说,多智能体 RAG 是人工智能领域的一项重大突破。它将大型语言模型的能力与自主推理和信息检索相结合,引入了一种新的智能和灵活性标准。随着人工智能的不断发展,多智能体 RAG 将在各个行业中发挥基础性作用,彻底改变我们使用技术的方式。