在本文中我们将介绍如何使用LangGraph 实现复杂的 RAG 智能体。该智能体能够重写用户问题、对其进行分类、验证文档相关性,甚至在最终优雅放弃前,还能用优化后的查询进行重试。
在我们深入构建高级 RAG 智能体之前,重新审视如何将 RAG 用作 LangGraph 智能体中的工具会很有帮助。
传统的 RAG(检索增强生成)系统适用于简单的问题,但难以处理复杂的对话场景。当用户提出后续问题时会发生什么,例如“定价怎么样?这些依赖于上下文的查询经常失败,因为系统缺乏对话记忆和智能查询处理。
今天,我们将构建一个高级 RAG 智能体,通过以下方式解决这些挑战:
让我们使用真实场景逐步构建此系统:一个技术支持的知识库。
我们的高级 RAG 智能体实施了复杂的多阶段工作流程:
这创建了一个强大的系统,可以处理复杂的对话,同时保持质量和相关性。
uv我们使用快速的 Python 包管理器uv来快速设置环境。
uvvenvrag-envsourcerag-env/bin/activate
创建一个名为rag-env虚拟环境并激活它的。
现在,安装此 RAG智能体的核心依赖项:
uvpipinstall\\langchain\\langgraph\\langchain-google-genai\\langchain-community\\python-dotenv\\jupyterlab\\ipykernel
要将您的rag-env作为 Jupyter 内核:
python-mipykernelinstall--user--name=rag-env--display-name"RAGAgent(uv)"
现在,您可以在 Jupyter Notebook 或 JupyterLab 中选择RAG 代理 (uv)作为内核。
接下来,在项目根目录中创建一个.env文件并添加您的 Gemini API 密钥:
GOOGLE_API_KEY=your_google_gemini_api_key_here
fromdotenvimportload_dotenvload_dotenv()# Core LangChain componentsfromlangchain.schemaimportDocumentfromlangchain_google_genaiimportChatGoogleGenerativeAIfromlangchain_huggingfaceimportHuggingFaceEmbeddingsfromlangchain_community.vectorstoresimportChromafromlangchain_core.promptsimportChatPromptTemplate# Graph and state managementfromtypingimportTypedDict,Listfromlangchain_core.messagesimportBaseMessage, HumanMessage, AIMessage, SystemMessagefrompydanticimportBaseModel, Fieldfromlanggraph.graphimportStateGraph, ENDfromlanggraph.checkpoint.memoryimportMemorySaver
技术栈基本原理:
我们为“ TechFlow Solutions”创建一个全面的技术支持的知识库:
# Initialize our embedding modelembedding_model= HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")# Initialize our embedding modelembedding_model= HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")# Create comprehensive technology support knowledge baseknowledge_documents= [Document(page_content="TechFlow Solutions offers three main service tiers: Basic Support ($29/month) includes email support and basic troubleshooting, Professional Support ($79/month) includes priority phone support and advanced diagnostics, Enterprise Support ($199/month) includes 24/7 dedicated support and custom integrations.",metadata={"source":"pricing_guide.pdf","category":"pricing"},),Document(page_content="Our cloud infrastructure services include: Virtual Private Servers starting at $15/month, Managed Databases from $45/month, Content Delivery Network at $0.08/GB, and Load Balancing services at $25/month. All services include 99.9% uptime guarantee.",metadata={"source":"infrastructure_pricing.pdf","category":"services"},),Document(page_content="TechFlow Solutions was founded in 2018 by Maria Rodriguez, a former Google engineer with 15 years of experience in cloud architecture. The company has grown from 3 employees to over 150 team members across 12 countries, specializing in enterprise cloud solutions.",metadata={"source":"company_history.pdf","category":"company"},),Document(page_content="Our technical support team operates 24/7 for Enterprise customers, business hours (9 AM - 6 PM EST) for Professional customers, and email-only support for Basic customers. Average response times: Enterprise (15 minutes), Professional (2 hours), Basic (24 hours).",metadata={"source":"support_procedures.pdf","category":"support"},)]# Build vector databasevector_store= Chroma.from_documents(knowledge_documents, embedding_model)document_retriever= vector_store.as_retriever(search_kwargs={"k":2})
知识库设计:
我们的智能体使用复杂的状态管理来跟踪对话流:
classConversationState(TypedDict):conversation_historyist[BaseMessage]#Fullconversationthreadretrieved_documents
ist[Document]#Currentretrieveddocumentstopic_relevance:str#On-topicoroff-topicclassificationenhanced_query:str#Reformulatedquestionshould_generate:bool#Whethertoproceedwithanswergenerationoptimization_attempts:int#Numberofqueryrefinementattemptscurrent_query:HumanMessage#User'scurrentquestion
状态架构的好处:
defenhance_user_query(state: ConversationState):"""Reformulates user questions based on conversation history to createstandalone queries optimized for vector search."""print(f"Enhancing query:{state['current_query'].content}")# Initialize state for new query processingstate["retrieved_documents"] = []state["topic_relevance"] =""state["enhanced_query"] =""state["should_generate"] =Falsestate["optimization_attempts"] =0# Ensure conversation history existsif"conversation_history"notinstateorstate["conversation_history"]isNone:state["conversation_history"] = []# Add current query to history if not already presentifstate["current_query"]notinstate["conversation_history"]:state["conversation_history"].append(state["current_query"])# Check if we have conversation contextiflen(state["conversation_history"]) >1:# Extract context and current questionprevious_messages = state["conversation_history"][:-1]current_question = state["current_query"].content# Build context-aware promptcontext_messages = [SystemMessage(content="""You are an expert query reformulator. Transform the user's question into a standalone,search-optimized query that incorporates relevant context from the conversation history.Guidelines:- Make the question self-contained and clear- Preserve the user's intent while adding necessary context- Optimize for vector database retrieval- Keep the reformulated query concise but comprehensive""")]context_messages.extend(previous_messages)context_messages.append(HumanMessage(content=f"Current question:{current_question}"))# Generate enhanced queryenhancement_prompt = ChatPromptTemplate.from_messages(context_messages)llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.1)formatted_prompt = enhancement_prompt.format()response = llm.invoke(formatted_prompt)enhanced_question = response.content.strip()print(f"Enhanced query:{enhanced_question}")state["enhanced_query"] = enhanced_questionelse:# First question in conversation - use as-isstate["enhanced_query"] = state["current_query"].contentprint(f"First query - using original:{state['enhanced_query']}")returnstate
增强策略:
classTopicRelevance(BaseModel):"""Structured output for topic classification"""classification:str= Field(description="Is the question about TechFlow Solutions services/pricing/company? Answer 'RELEVANT' or 'IRRELEVANT'")confidence:str= Field(description="Confidence level: 'HIGH', 'MEDIUM', or 'LOW'")defvalidate_topic_relevance(state: ConversationState):"""Determines if the user's question is within our knowledge domain.Uses the enhanced query for better classification accuracy."""print("Validating topic relevance...")classification_prompt = SystemMessage(content="""You are a topic classifier for TechFlow Solutions support system.RELEVANT topics include:- TechFlow Solutions services (cloud infrastructure, migration, DevOps)- Pricing for any TechFlow Solutions products or services- Company information (history, team, locations)- Support procedures and response times- Security and compliance features- Technical specifications and capabilitiesIRRELEVANT topics include:- General technology questions not specific to TechFlow- Other companies' products or services- Personal questions unrelated to business- Weather, news, or general knowledge queriesClassify based on the enhanced query which incorporates conversation context.""")user_question = HumanMessage(content=f"Enhanced query to classify:{state['enhanced_query']}")# Create classification chainclassification_chain = ChatPromptTemplate.from_messages([classification_prompt, user_question])llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)structured_llm = llm.with_structured_output(TopicRelevance)classifier = classification_chain | structured_llmresult = classifier.invoke({})state["topic_relevance"] = result.classification.strip()print(f"Topic classification:{state['topic_relevance']}(Confidence:{result.confidence})")returnstate
分类优势:
deffetch_relevant_content(state: ConversationState):"""Retrieves documents from the knowledge base using the enhanced query."""print("Fetching relevant documents...")# Use enhanced query for better retrievalretrieved_docs = document_retriever.invoke(state["enhanced_query"])print(f"Retrieved{len(retrieved_docs)}documents")fori, docinenumerate(retrieved_docs):print(f" Document{i+1}:{doc.page_content[:50]}...")state["retrieved_documents"] = retrieved_docsreturnstate
classDocumentRelevance(BaseModel):"""Structured output for document relevance assessment"""relevance:str= Field(description="Is this document relevant to answering the question? Answer 'RELEVANT' or 'IRRELEVANT'")reasoning:str= Field(description="Brief explanation of why the document is relevant or irrelevant")defassess_document_relevance(state: ConversationState):"""Evaluates each retrieved document to determine if it's relevantfor answering the user's question."""print("Assessing document relevance...")assessment_prompt = SystemMessage(content="""You are a document relevance assessor. Evaluate whether each documentcontains information that can help answer the user's question.A document is RELEVANT if it contains:- Direct answers to the question- Supporting information that contributes to a complete answer- Context that helps understand the topicA document is IRRELEVANT if it:- Contains no information related to the question- Discusses completely different topics- Provides no value for answering the questionBe strict but fair in your assessment.""")llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)structured_llm = llm.with_structured_output(DocumentRelevance)relevant_documents = []fori, docinenumerate(state["retrieved_documents"]):assessment_query = HumanMessage(content=f"""Question:{state['enhanced_query']}Document to assess:{doc.page_content}Is this document relevant for answering the question?""")assessment_chain = ChatPromptTemplate.from_messages([assessment_prompt, assessment_query])assessor = assessment_chain | structured_llmresult = assessor.invoke({})print(f"Document{i+1}:{result.relevance}-{result.reasoning}")ifresult.relevance.strip().upper() =="RELEVANT":relevant_documents.append(doc)# Update state with filtered documentsstate["retrieved_documents"] = relevant_documentsstate["should_generate"] =len(relevant_documents) >0print(f"Final relevant documents:{len(relevant_documents)}")returnstate
质量控制优势:
defgenerate_contextual_response(state: ConversationState):"""Generates final response using conversation history and relevant documents."""print("Generating contextual response...")if"conversation_history"notinstateorstate["conversation_history"]isNone:raiseValueError("Conversation history is required for response generation")# Extract components for response generationconversation_context = state["conversation_history"]relevant_docs = state["retrieved_documents"]enhanced_question = state["enhanced_query"]# Create comprehensive response templateresponse_template ="""You are a knowledgeable TechFlow Solutions support agent. Generate a helpful,accurate response based on the conversation history and retrieved documents.Guidelines:- Use information from the provided documents to answer the question- Maintain conversation context and refer to previous exchanges when relevant- Be conversational and helpful in tone- If the documents don't fully answer the question, acknowledge limitations- Provide specific details when available (prices, timeframes, etc.)Conversation History:{conversation_history}Retrieved Knowledge:{document_context}Current Question: {current_question}Generate a helpful response:"""response_prompt = ChatPromptTemplate.from_template(response_template)llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.3)# Create response generation chainresponse_chain = response_prompt | llm# Generate responseresponse = response_chain.invoke({"conversation_history": conversation_context,"document_context": relevant_docs,"current_question": enhanced_question})generated_response = response.content.strip()# Add response to conversation historystate["conversation_history"].append(AIMessage(content=generated_response))print(f"Generated response:{generated_response[:100]}...")returnstate
defoptimize_search_query(state: ConversationState):"""Refines the search query when initial retrieval doesn't yield relevant results.Includes loop prevention to avoid infinite optimization cycles."""print("Optimizing search query...")current_attempts = state.get("optimization_attempts",0)# Prevent infinite optimization loopsifcurrent_attempts >=2:print("⚠Maximum optimization attempts reached")returnstatecurrent_query = state["enhanced_query"]optimization_prompt = SystemMessage(content="""You are a search query optimizer. The current query didn't retrieve relevant documents.Create an improved version that:- Uses different keywords or synonyms- Adjusts the query structure for better matching- Maintains the original intent while improving searchability- Considers alternative ways to express the same conceptProvide only the optimized query without explanations.""")optimization_request = HumanMessage(content=f"Current query that needs optimization:{current_query}")optimization_chain = ChatPromptTemplate.from_messages([optimization_prompt, optimization_request])llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0.2)formatted_prompt = optimization_chain.format()response = llm.invoke(formatted_prompt)optimized_query = response.content.strip()# Update statestate["enhanced_query"] = optimized_querystate["optimization_attempts"] = current_attempts +1print(f"Optimized query (attempt{current_attempts +1}):{optimized_query}")returnstate
defroute_by_topic(state: ConversationState):"""Routes based on topic relevance classification"""print("Routing based on topic relevance...")relevance = state.get("topic_relevance","").strip().upper()ifrelevance =="RELEVANT":print(" → Proceeding to content retrieval")return"fetch_content"else:print(" → Routing to off-topic handler")return"handle_off_topic"defroute_by_document_quality(state: ConversationState):"""Routes based on document relevance assessment"""print("Routing based on document quality...")optimization_attempts = state.get("optimization_attempts",0)ifstate.get("should_generate",False):print(" → Generating response with relevant documents")return"generate_response"elifoptimization_attempts >=2:print(" → Maximum optimization attempts reached")return"handle_no_results"else:print(" → Optimizing query for better results")return"optimize_query"# Helper functions for edge casesdefhandle_off_topic_queries(state: ConversationState):"""Handles queries outside our knowledge domain"""print("Handling off-topic query...")if"conversation_history"notinstateorstate["conversation_history"]isNone:state["conversation_history"] = []off_topic_response ="""I'm specialized in helping with TechFlow Solutions services, pricing, and company information.Your question seems to be outside my area of expertise.I can help you with:- Our cloud infrastructure services and pricing- Support procedures and response times- Company information and team details- Security and compliance featuresIs there something specific about TechFlow Solutions I can help you with?"""state["conversation_history"].append(AIMessage(content=off_topic_response))returnstatedefhandle_no_relevant_results(state: ConversationState):"""Handles cases where no relevant documents are found after optimization"""print("No relevant results found after optimization...")if"conversation_history"notinstateorstate["conversation_history"]isNone:state["conversation_history"] = []no_results_response ="""I apologize, but I couldn't find specific information to answer your question in our current knowledge base.This might be because:- The information isn't available in our documentation- Your question might need clarification- You might need to contact our support team directlyFor immediate assistance, you can reach our support team at support@techflow.com or call 1-800-TECHFLOW."""state["conversation_history"].append(AIMessage(content=no_results_response))returnstate
# Initialize conversation memoryconversation_memory = MemorySaver()# Create workflow graphworkflow = StateGraph(ConversationState)# Add all processing nodesworkflow.add_node("enhance_query", enhance_user_query)workflow.add_node("validate_topic", validate_topic_relevance)workflow.add_node("handle_off_topic", handle_off_topic_queries)workflow.add_node("fetch_content", fetch_relevant_content)workflow.add_node("assess_relevance", assess_document_relevance)workflow.add_node("generate_response", generate_contextual_response)workflow.add_node("optimize_query", optimize_search_query)workflow.add_node("handle_no_results", handle_no_relevant_results)# Define workflow connectionsworkflow.add_edge("enhance_query","validate_topic")# Conditional routing based on topic relevanceworkflow.add_conditional_edges("validate_topic",route_by_topic,{"fetch_content":"fetch_content","handle_off_topic":"handle_off_topic",},)# Content processing pipelineworkflow.add_edge("fetch_content","assess_relevance")# Conditional routing based on document qualityworkflow.add_conditional_edges("assess_relevance",route_by_document_quality,{"generate_response":"generate_response","optimize_query":"optimize_query","handle_no_results":"handle_no_results",},)# Optimization loopworkflow.add_edge("optimize_query","fetch_content")# Terminal nodesworkflow.add_edge("generate_response", END)workflow.add_edge("handle_no_results", END)workflow.add_edge("handle_off_topic", END)# Set entry pointworkflow.set_entry_point("enhance_query")# Compile the workflowadvanced_rag_agent = workflow.compile(checkpointer=conversation_memory)
用各种场景测试我们的系统:
测试#1:
print("Testing Advanced RAG Agent\n")# Test 1: Off-topic queryprint("=== Test 1: Off-Topic Query ===")test_input = {"current_query": HumanMessage(content="What's the weather like today?")}result = advanced_rag_agent.invoke(input=test_input,config={"configurable": {"thread_id":"test_session_1"}})print(f"Response:{result['conversation_history'][-1].content}\n")
TestingAdvanced RAG Agent=== Test1: Off-Topic Query ===Enhancing query: What's the weather like today?First query - using original: What's the weather like today?Validating topic relevance...Topic classification: IRRELEVANT (Confidence: HIGH)Routing basedontopic relevance...→ Routing tooff-topic handlerHandlingoff-topic query...Response: I'm specialized in helping with TechFlow Solutions services, pricing, and company information.Your question seems to be outside my area of expertise.I can help you with:- Our cloud infrastructure services and pricing- Support procedures and response times- Company information and team details- Security and compliance featuresIs there something specific about TechFlow Solutions I can help you with?
#Test2:On-topicqueryaboutpricingprint("===Test2:ServicePricingQuery===")test_input={"current_query":HumanMessage(content="Whatareyoursupportservicepricingoptions?")}result=advanced_rag_agent.invoke(input=test_input,config={"configurable":{"thread_id":"test_session_2"}})print(f"Response:{result['conversation_history'][-1].content}\n")===Test2:ServicePricingQuery===Enhancingquery:Whatareyoursupportservicepricingoptions?📝Firstquery-usingoriginal:Whatareyoursupportservicepricingoptions?🎯Validatingtopicrelevance...🏷️Topicclassification:RELEVANT(Confidence:HIGH)🚦Routingbasedontopicrelevance...→Proceedingtocontentretrieval📚Fetchingrelevantdocuments...📄Retrieved2documentsDocument1:TechFlowSolutionsoffersthreemainservicetiers...Document2:Ourcloudinfrastructureservicesinclude:Virtual...🔍Assessingdocumentrelevance...📋Document1:RELEVANT-Thedocumentdirectlyanswersthequestionbylistingthenames,features,andpricesofthesupportservicetiersofferedbyTechFlowSolutions.📋Document2:IRRELEVANT-Thedocumentdescribespricingoptionsforcloudinfrastructureservices,notsupportservices.Therefore,it'snotrelevanttothequestionaboutsupportservicepricing.✅Finalrelevantdocuments:1🚦Routingbasedondocumentquality...→Generatingresponsewithrelevantdocuments💬Generatingcontextualresponse...📝Generatedresponse:Wehavethreesupportservicetiersavailable.BasicSupportis$29permonthandincludesemailsup...Response:Wehavethreesupportservicetiersavailable.BasicSupportis$29permonthandincludesemailsupportandbasictroubleshooting.ProfessionalSupportis$79permonth,providingpriorityphonesupportandadvanceddiagnostics.Finally,EnterpriseSupport,at$199permonth,includes24/7dedicatedsupportandcustomintegrations.
我们构建了一个复杂的 RAG智能体,它远远超出了简单的问答。该系统展示了多种人工智能技术如何协同工作,以创建更智能、上下文感知和可靠的对话式人工智能。
主要创新包括:
自然对话的上下文感知问题重写
通过分类和分级进行多层质量控制
迭代细化以提高检索成功率
强大的工作流程管理,具有适当的错误处理
此架构为构建生产就绪型 RAG 应用程序提供了坚实的基础,这些应用程序可以处理复杂的多轮对话,同时保持响应的高质量和相关性。
要进一步增强此系统,考虑:
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |