|
在本文中我们将介绍如何使用LangGraph 实现复杂的 RAG 智能体。该智能体能够重写用户问题、对其进行分类、验证文档相关性,甚至在最终优雅放弃前,还能用优化后的查询进行重试。 在我们深入构建高级 RAG 智能体之前,重新审视如何将 RAG 用作 LangGraph 智能体中的工具会很有帮助。 介绍传统的 RAG(检索增强生成)系统适用于简单的问题,但难以处理复杂的对话场景。当用户提出后续问题时会发生什么,例如“定价怎么样?这些依赖于上下文的查询经常失败,因为系统缺乏对话记忆和智能查询处理。 今天,我们将构建一个高级 RAG 智能体,通过以下方式解决这些挑战: - 智能查询重新表述:将后续问题转换为独立查询
- 智能主题检测:确保查询保持在我们的知识域内
- 文档质量评估:在生成响应之前验证检索到的内容
- 自适应查询增强:在初始尝试失败时迭代改进搜索
- 持久对话记忆:跨多个交互维护上下文
让我们使用真实场景逐步构建此系统:一个技术支持的知识库。 系统架构我们的高级 RAG 智能体实施了复杂的多阶段工作流程: - 查询增强器→使用对话历史记录重新表述问题
- 主题验证器→确定查询是否与我们的知识领域匹配
- Content Retriever→从我们的知识库中获取相关文档
- 相关性评估员→评估文档质量和相关性
- 响应生成器 →创建上下文答案
- 查询优化器→在需要时优化搜索(具有循环保护)
这创建了一个强大的系统,可以处理复杂的对话,同时保持质量和相关性。 实现第 1 步:使用uv我们使用快速的 Python 包管理器uv来快速设置环境。 1.1 创建和激活虚拟环境uvvenvrag-envsourcerag-env/bin/activate 创建一个名为rag-env虚拟环境并激活它的。 1.2 安装所需的软件包现在,安装此 RAG智能体的核心依赖项: uvpipinstall\\langchain\\langgraph\\langchain-google-genai\\langchain-community\\python-dotenv\\jupyterlab\\ipykernel 1.3 向 Jupyter 注册虚拟环境要将您的rag-env作为 Jupyter 内核: python-mipykernelinstall--user--name=rag-env--display-name"RAGAgent(uv)" 现在,您可以在 Jupyter Notebook 或 JupyterLab 中选择RAG 代理 (uv)作为内核。 1.4 添加 LLM API 密钥接下来,在项目根目录中创建一个.env文件并添加您的 Gemini API 密钥: GOOGLE_API_KEY=your_google_gemini_api_key_here 1.5 依赖项fromdotenvimportload_dotenvload_dotenv()
# Core LangChain componentsfromlangchain.schemaimportDocumentfromlangchain_google_genaiimportChatGoogleGenerativeAIfromlangchain_huggingfaceimportHuggingFaceEmbeddingsfromlangchain_community.vectorstoresimportChromafromlangchain_core.promptsimportChatPromptTemplate
# Graph and state managementfromtypingimportTypedDict,Listfromlangchain_core.messagesimportBaseMessage, HumanMessage, AIMessage, SystemMessagefrompydanticimportBaseModel, Fieldfromlanggraph.graphimportStateGraph, ENDfromlanggraph.checkpoint.memoryimportMemorySaver
技术栈基本原理: - Google Gemini:用于理解复杂查询的高级推理功能
- HuggingFace Embeddings:高质量、经济高效的嵌入生成
- Chroma Vector DB:用于开发的轻量级、快速矢量存储
- LangGraph:通过状态管理实现复杂的工作流程编排
- Pydantic:确保 LLM作的结构化、经过验证的输出
第 2 步:建立我们的知识库我们为“ TechFlow Solutions”创建一个全面的技术支持的知识库: # Initialize our embedding modelembedding_model= HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Initialize our embedding modelembedding_model= HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Create comprehensive technology support knowledge baseknowledge_documents= [ Document( page_content="TechFlow Solutions offers three main service tiers: Basic Support ($29/month) includes email support and basic troubleshooting, Professional Support ($79/month) includes priority phone support and advanced diagnostics, Enterprise Support ($199/month) includes 24/7 dedicated support and custom integrations.", metadata={"source":"pricing_guide.pdf","category":"pricing"}, ), Document( page_content="Our cloud infrastructure services include: Virtual Private Servers starting at $15/month, Managed Databases from $45/month, Content Delivery Network at $0.08/GB, and Load Balancing services at $25/month. All services include 99.9% uptime guarantee.", metadata={"source":"infrastructure_pricing.pdf","category":"services"}, ), Document( page_content="TechFlow Solutions was founded in 2018 by Maria Rodriguez, a former Google engineer with 15 years of experience in cloud architecture. The company has grown from 3 employees to over 150 team members across 12 countries, specializing in enterprise cloud solutions.", metadata={"source":"company_history.pdf","category":"company"}, ), Document( page_content="Our technical support team operates 24/7 for Enterprise customers, business hours (9 AM - 6 PM EST) for Professional customers, and email-only support for Basic customers. Average response times: Enterprise (15 minutes), Professional (2 hours), Basic (24 hours).", metadata={"source":"support_procedures.pdf","category":"support"}, )]
# Build vector databasevector_store= Chroma.from_documents(knowledge_documents, embedding_model)document_retriever= vector_store.as_retriever(search_kwargs={"k":2})
知识库设计: - 多样化的内容类型:涵盖定价、服务、公司信息、支持程序
- 丰富的元数据:实现更好的文档组织和过滤
- 现实范围:足够全面,可以演示复杂的场景
- 业务背景:反映现实世界的企业知识管理需求
第 3 步:状态管理系统我们的智能体使用复杂的状态管理来跟踪对话流: classConversationState(TypedDict):conversation_history ist[BaseMessage]#Fullconversationthreadretrieved_documents ist[Document]#Currentretrieveddocumentstopic_relevance:str#On-topicoroff-topicclassificationenhanced_query:str#Reformulatedquestionshould_generate:bool#Whethertoproceedwithanswergenerationoptimization_attempts:int#Numberofqueryrefinementattemptscurrent_query:HumanMessage#User'scurrentquestion 状态架构的好处: - 对话连续性:跨多个回合维护上下文
- 质量控制:跟踪文档相关性和生成决策
- 循环预防:监控细化尝试以避免无限循环
- 调试支持:全面的状态跟踪以进行故障排除
第 4 步:核心智能体组件4.1 查询增强器 — 智能问题重新表述defenhance_user_query(state: ConversationState): """ Reformulates user questions based on conversation history to create standalone queries optimized for vector search. """ print(f"Enhancing query:{state['current_query'].content}")
# Initialize state for new query processing state["retrieved_documents"] = [] state["topic_relevance"] ="" state["enhanced_query"] ="" state["should_generate"] =False state["optimization_attempts"] =0
# Ensure conversation history exists if"conversation_history"notinstateorstate["conversation_history"]isNone: state["conversation_history"] = []
# Add current query to history if not already present ifstate["current_query"]notinstate["conversation_history"]: state["conversation_history"].append(state["current_query"])
# Check if we have conversation context iflen(state["conversation_history"]) >1: # Extract context and current question previous_messages = state["conversation_history"][:-1] current_question = state["current_query"].content
# Build context-aware prompt context_messages = [ SystemMessage( content="""You are an expert query reformulator. Transform the user's question into a standalone, search-optimized query that incorporates relevant context from the conversation history.
Guidelines: - Make the question self-contained and clear - Preserve the user's intent while adding necessary context - Optimize for vector database retrieval - Keep the reformulated query concise but comprehensive""" ) ] context_messages.extend(previous_messages) context_messages.append(HumanMessage(content=f"Current question:{current_question}"))
# Generate enhanced query enhancement_prompt = ChatPromptTemplate.from_messages(context_messages) llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.1)
formatted_prompt = enhancement_prompt.format() response = llm.invoke(formatted_prompt) enhanced_question = response.content.strip()
print(f"Enhanced query:{enhanced_question}") state["enhanced_query"] = enhanced_question else: # First question in conversation - use as-is state["enhanced_query"] = state["current_query"].content print(f"First query - using original:{state['enhanced_query']}")
returnstate
增强策略: - 上下文集成:将对话历史记录与当前问题相结合
- 搜索优化:创建与矢量数据库配合良好的查询
- 意图保留:保持用户的原始意图,同时增加清晰度
- 效率:跳过第一个问题的增强功能以减少延迟
4.2 主题验证器 — 智能域分类classTopicRelevance(BaseModel): """Structured output for topic classification""" classification:str= Field( description="Is the question about TechFlow Solutions services/pricing/company? Answer 'RELEVANT' or 'IRRELEVANT'" ) confidence:str= Field( description="Confidence level: 'HIGH', 'MEDIUM', or 'LOW'" )
defvalidate_topic_relevance(state: ConversationState): """ Determines if the user's question is within our knowledge domain. Uses the enhanced query for better classification accuracy. """ print("Validating topic relevance...")
classification_prompt = SystemMessage( content="""You are a topic classifier for TechFlow Solutions support system.
RELEVANT topics include: - TechFlow Solutions services (cloud infrastructure, migration, DevOps) - Pricing for any TechFlow Solutions products or services - Company information (history, team, locations) - Support procedures and response times - Security and compliance features - Technical specifications and capabilities
IRRELEVANT topics include: - General technology questions not specific to TechFlow - Other companies' products or services - Personal questions unrelated to business - Weather, news, or general knowledge queries
Classify based on the enhanced query which incorporates conversation context.""" )
user_question = HumanMessage( content=f"Enhanced query to classify:{state['enhanced_query']}" )
# Create classification chain classification_chain = ChatPromptTemplate.from_messages([classification_prompt, user_question]) llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)
structured_llm = llm.with_structured_output(TopicRelevance) classifier = classification_chain | structured_llm
result = classifier.invoke({}) state["topic_relevance"] = result.classification.strip()
print(f"Topic classification:{state['topic_relevance']}(Confidence:{result.confidence})") returnstate
分类优势: - 领域特异性:明确定义系统可以处理哪些问题
- 置信度评分:提供有关分类确定性的透明度
- 上下文感知:使用增强的查询以提高准确性
- 结构化输出:确保一致、可解析的响应
4.3 内容检索器 — 智能文档获取deffetch_relevant_content(state: ConversationState): """ Retrieves documents from the knowledge base using the enhanced query. """ print("Fetching relevant documents...")
# Use enhanced query for better retrieval retrieved_docs = document_retriever.invoke(state["enhanced_query"])
print(f"Retrieved{len(retrieved_docs)}documents") fori, docinenumerate(retrieved_docs): print(f" Document{i+1}:{doc.page_content[:50]}...")
state["retrieved_documents"] = retrieved_docs returnstate
4.4 相关性评估员 — 文档质量控制classDocumentRelevance(BaseModel): """Structured output for document relevance assessment""" relevance:str= Field( description="Is this document relevant to answering the question? Answer 'RELEVANT' or 'IRRELEVANT'" ) reasoning:str= Field( description="Brief explanation of why the document is relevant or irrelevant" )
defassess_document_relevance(state: ConversationState): """ Evaluates each retrieved document to determine if it's relevant for answering the user's question. """ print("Assessing document relevance...")
assessment_prompt = SystemMessage( content="""You are a document relevance assessor. Evaluate whether each document contains information that can help answer the user's question.
A document is RELEVANT if it contains: - Direct answers to the question - Supporting information that contributes to a complete answer - Context that helps understand the topic
A document is IRRELEVANT if it: - Contains no information related to the question - Discusses completely different topics - Provides no value for answering the question
Be strict but fair in your assessment.""" )
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0) structured_llm = llm.with_structured_output(DocumentRelevance)
relevant_documents = []
fori, docinenumerate(state["retrieved_documents"]): assessment_query = HumanMessage( content=f"""Question:{state['enhanced_query']}
Document to assess: {doc.page_content}
Is this document relevant for answering the question?""" )
assessment_chain = ChatPromptTemplate.from_messages([assessment_prompt, assessment_query]) assessor = assessment_chain | structured_llm
result = assessor.invoke({})
print(f"Document{i+1}:{result.relevance}-{result.reasoning}")
ifresult.relevance.strip().upper() =="RELEVANT": relevant_documents.append(doc)
# Update state with filtered documents state["retrieved_documents"] = relevant_documents state["should_generate"] =len(relevant_documents) >0
print(f"Final relevant documents:{len(relevant_documents)}") returnstate
质量控制优势: - 精度提升:生成前过滤掉不相关的文档
- 透明度:为相关性决策提供推理
- 质量保证:保持响应生成的高标准
4.5 响应生成器 — 上下文感知答案创建defgenerate_contextual_response(state: ConversationState): """ Generates final response using conversation history and relevant documents. """ print("Generating contextual response...")
if"conversation_history"notinstateorstate["conversation_history"]isNone: raiseValueError("Conversation history is required for response generation")
# Extract components for response generation conversation_context = state["conversation_history"] relevant_docs = state["retrieved_documents"] enhanced_question = state["enhanced_query"]
# Create comprehensive response template response_template ="""You are a knowledgeable TechFlow Solutions support agent. Generate a helpful, accurate response based on the conversation history and retrieved documents.
Guidelines: - Use information from the provided documents to answer the question - Maintain conversation context and refer to previous exchanges when relevant - Be conversational and helpful in tone - If the documents don't fully answer the question, acknowledge limitations - Provide specific details when available (prices, timeframes, etc.)
Conversation History: {conversation_history}
Retrieved Knowledge: {document_context}
Current Question: {current_question}
Generate a helpful response:"""
response_prompt = ChatPromptTemplate.from_template(response_template) llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.3)
# Create response generation chain response_chain = response_prompt | llm
# Generate response response = response_chain.invoke({ "conversation_history": conversation_context, "document_context": relevant_docs, "current_question": enhanced_question })
generated_response = response.content.strip()
# Add response to conversation history state["conversation_history"].append(AIMessage(content=generated_response))
print(f"Generated response:{generated_response[:100]}...") returnstate
4.6 查询优化器 — 自适应搜索改进defoptimize_search_query(state: ConversationState): """ Refines the search query when initial retrieval doesn't yield relevant results. Includes loop prevention to avoid infinite optimization cycles. """ print("Optimizing search query...")
current_attempts = state.get("optimization_attempts",0)
# Prevent infinite optimization loops ifcurrent_attempts >=2: print("⚠Maximum optimization attempts reached") returnstate
current_query = state["enhanced_query"]
optimization_prompt = SystemMessage( content="""You are a search query optimizer. The current query didn't retrieve relevant documents.
Create an improved version that: - Uses different keywords or synonyms - Adjusts the query structure for better matching - Maintains the original intent while improving searchability - Considers alternative ways to express the same concept
Provide only the optimized query without explanations.""" )
optimization_request = HumanMessage( content=f"Current query that needs optimization:{current_query}" )
optimization_chain = ChatPromptTemplate.from_messages([optimization_prompt, optimization_request]) llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.2)
formatted_prompt = optimization_chain.format() response = llm.invoke(formatted_prompt) optimized_query = response.content.strip()
# Update state state["enhanced_query"] = optimized_query state["optimization_attempts"] = current_attempts +1
print(f"Optimized query (attempt{current_attempts +1}):{optimized_query}") returnstate
步骤 5.使用智能路由进行工作流编排defroute_by_topic(state: ConversationState): """Routes based on topic relevance classification""" print("Routing based on topic relevance...")
relevance = state.get("topic_relevance","").strip().upper()
ifrelevance =="RELEVANT": print(" → Proceeding to content retrieval") return"fetch_content" else: print(" → Routing to off-topic handler") return"handle_off_topic"
defroute_by_document_quality(state: ConversationState): """Routes based on document relevance assessment""" print("Routing based on document quality...")
optimization_attempts = state.get("optimization_attempts",0)
ifstate.get("should_generate",False): print(" → Generating response with relevant documents") return"generate_response" elifoptimization_attempts >=2: print(" → Maximum optimization attempts reached") return"handle_no_results" else: print(" → Optimizing query for better results") return"optimize_query"
# Helper functions for edge casesdefhandle_off_topic_queries(state: ConversationState): """Handles queries outside our knowledge domain""" print("Handling off-topic query...")
if"conversation_history"notinstateorstate["conversation_history"]isNone: state["conversation_history"] = []
off_topic_response ="""I'm specialized in helping with TechFlow Solutions services, pricing, and company information. Your question seems to be outside my area of expertise.
I can help you with: - Our cloud infrastructure services and pricing - Support procedures and response times - Company information and team details - Security and compliance features
Is there something specific about TechFlow Solutions I can help you with?"""
state["conversation_history"].append(AIMessage(content=off_topic_response)) returnstate
defhandle_no_relevant_results(state: ConversationState): """Handles cases where no relevant documents are found after optimization""" print("No relevant results found after optimization...")
if"conversation_history"notinstateorstate["conversation_history"]isNone: state["conversation_history"] = []
no_results_response ="""I apologize, but I couldn't find specific information to answer your question in our current knowledge base.
This might be because: - The information isn't available in our documentation - Your question might need clarification - You might need to contact our support team directly
For immediate assistance, you can reach our support team at support@techflow.com or call 1-800-TECHFLOW."""
state["conversation_history"].append(AIMessage(content=no_results_response)) returnstate
步骤 6.完整的工作流程组装# Initialize conversation memoryconversation_memory = MemorySaver()
# Create workflow graphworkflow = StateGraph(ConversationState)
# Add all processing nodesworkflow.add_node("enhance_query", enhance_user_query)workflow.add_node("validate_topic", validate_topic_relevance)workflow.add_node("handle_off_topic", handle_off_topic_queries)workflow.add_node("fetch_content", fetch_relevant_content)workflow.add_node("assess_relevance", assess_document_relevance)workflow.add_node("generate_response", generate_contextual_response)workflow.add_node("optimize_query", optimize_search_query)workflow.add_node("handle_no_results", handle_no_relevant_results)
# Define workflow connectionsworkflow.add_edge("enhance_query","validate_topic")
# Conditional routing based on topic relevanceworkflow.add_conditional_edges( "validate_topic", route_by_topic, { "fetch_content":"fetch_content", "handle_off_topic":"handle_off_topic", },)
# Content processing pipelineworkflow.add_edge("fetch_content","assess_relevance")
# Conditional routing based on document qualityworkflow.add_conditional_edges( "assess_relevance", route_by_document_quality, { "generate_response":"generate_response", "optimize_query":"optimize_query", "handle_no_results":"handle_no_results", },)
# Optimization loopworkflow.add_edge("optimize_query","fetch_content")
# Terminal nodesworkflow.add_edge("generate_response", END)workflow.add_edge("handle_no_results", END)workflow.add_edge("handle_off_topic", END)
# Set entry pointworkflow.set_entry_point("enhance_query")
# Compile the workflowadvanced_rag_agent = workflow.compile(checkpointer=conversation_memory)
步骤 7.测试我们的高级 RAG智能体用各种场景测试我们的系统: 测试#1: print("Testing Advanced RAG Agent\n")
# Test 1: Off-topic queryprint("=== Test 1: Off-Topic Query ===")test_input = {"current_query": HumanMessage(content="What's the weather like today?")}result = advanced_rag_agent.invoke( input=test_input, config={"configurable": {"thread_id":"test_session_1"}})print(f"Response:{result['conversation_history'][-1].content}\n")
TestingAdvanced RAG Agent
=== Test1: Off-Topic Query ===Enhancing query: What's the weather like today?First query - using original: What's the weather like today?Validating topic relevance...Topic classification: IRRELEVANT (Confidence: HIGH)Routing basedontopic relevance... → Routing tooff-topic handlerHandlingoff-topic query...Response: I'm specialized in helping with TechFlow Solutions services, pricing, and company information. Your question seems to be outside my area of expertise.
I can help you with: - Our cloud infrastructure services and pricing - Support procedures and response times - Company information and team details - Security and compliance features
Is there something specific about TechFlow Solutions I can help you with?
#Test2:On-topicqueryaboutpricingprint("===Test2:ServicePricingQuery===")test_input={"current_query":HumanMessage(content="Whatareyoursupportservicepricingoptions?")}result=advanced_rag_agent.invoke(input=test_input,config={"configurable":{"thread_id":"test_session_2"}})print(f"Response:{result['conversation_history'][-1].content}\n")===Test2:ServicePricingQuery===Enhancingquery:Whatareyoursupportservicepricingoptions?📝Firstquery-usingoriginal:Whatareyoursupportservicepricingoptions?🎯Validatingtopicrelevance...🏷️Topicclassification:RELEVANT(Confidence:HIGH)🚦Routingbasedontopicrelevance...→Proceedingtocontentretrieval📚Fetchingrelevantdocuments...📄Retrieved2documentsDocument1:TechFlowSolutionsoffersthreemainservicetiers...Document2:Ourcloudinfrastructureservicesinclude:Virtual...🔍Assessingdocumentrelevance...📋Document1:RELEVANT-Thedocumentdirectlyanswersthequestionbylistingthenames,features,andpricesofthesupportservicetiersofferedbyTechFlowSolutions.📋Document2:IRRELEVANT-Thedocumentdescribespricingoptionsforcloudinfrastructureservices,notsupportservices.Therefore,it'snotrelevanttothequestionaboutsupportservicepricing.✅Finalrelevantdocuments:1🚦Routingbasedondocumentquality...→Generatingresponsewithrelevantdocuments💬Generatingcontextualresponse...📝Generatedresponse:Wehavethreesupportservicetiersavailable.BasicSupportis$29permonthandincludesemailsup...Response:Wehavethreesupportservicetiersavailable.BasicSupportis$29permonthandincludesemailsupportandbasictroubleshooting.ProfessionalSupportis$79permonth,providingpriorityphonesupportandadvanceddiagnostics.Finally,EnterpriseSupport,at$199permonth,includes24/7dedicatedsupportandcustomintegrations. 结论我们构建了一个复杂的 RAG智能体,它远远超出了简单的问答。该系统展示了多种人工智能技术如何协同工作,以创建更智能、上下文感知和可靠的对话式人工智能。 主要创新包括: 自然对话的上下文感知问题重写 通过分类和分级进行多层质量控制 迭代细化以提高检索成功率 强大的工作流程管理,具有适当的错误处理
此架构为构建生产就绪型 RAG 应用程序提供了坚实的基础,这些应用程序可以处理复杂的多轮对话,同时保持响应的高质量和相关性。 后续步骤要进一步增强此系统,考虑: - 添加更复杂的嵌入模型以更好地检索
- 实施反馈循环以实现持续改进
- 添加评估指标以衡量性能
完整代码仓库:https://github.com/piyushagni5/langgraph-ai/blob/main/rag/Building%20an%20Advanced%20RAG%20Agent.ipynb |