ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.1em;color: rgb(63, 63, 63);visibility: visible;"> 检索增强生成(Retrieval-Augmented Generation,RAG)通过将大型语言模型(LLM)与外部知识结合,改变了我们利用LLM的方式。一个典型的RAG系统会将文档(或其他数据)索引为向量嵌入,通过相似性搜索检索相关信息,并将其注入LLM的上下文中。这种方式在处理非结构化文本时非常强大。然而,现实中的企业数据通常包含结构化成分,例如数值属性、聚合和关系,而这些是文本嵌入无法很好处理的。ingFang SC";letter-spacing: 0.1em;color: rgb(63, 63, 63);">在本文中,我们将探讨如何通过集成知识图谱(特别是Neo4j)和结构化工具来处理更复杂的查询,同时保留基于嵌入的非结构化文本搜索的优势。阅读完本文后,您将学会如何构建一个能够有效处理非结构化和结构化查询的强大RAG应用程序。本文包含了学习和实验这种方法所需的所有内容,包括数据、完整代码和详细解释。ingFang SC";letter-spacing: 0.1em;color: rgb(63, 63, 63);">数据链接:数据集(https://www.kaggle.com/datasets/kshitijkutumbe/supply-chain-dataset-dummy/data)ingFang SC";letter-spacing: 0.1em;color: rgb(63, 63, 63);">创建本文中使用的Neo4j实例的链接:图实例(https://console.neo4j.io/)ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;display: table;padding-right: 0.2em;padding-left: 0.2em;color: rgb(255, 255, 255);background: rgb(1, 155, 252);">理解RAG应用ingFang SC";letter-spacing: 0.1em;color: rgb(63, 63, 63);">一个典型的RAG应用包括:ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(63, 63, 63);" class="list-paddingleft-1">ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-indent: -1em;display: block;margin: 0.2em 8px;">•ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: inherit;color: rgb(1, 155, 252);">向量存储/嵌入:将非结构化文本转换为向量嵌入。 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;text-indent: -1em;display: block;margin: 0.2em 8px;">•检索:使用相似性搜索找到最相关的文档或知识片段。 •生成:使用LLM处理用户查询并生成格式化答案。 然而,纯粹基于嵌入的方法的弱点在于处理结构化或关系型数据(例如数值过滤、排序、聚合)。这是知识图谱的优势所在。 文本嵌入的局限性文本嵌入在RAG应用中被广泛使用,但在处理结构化数据方面存在显著局限性。这些局限性会阻碍查询的准确性和有效性,尤其是在处理现实世界数据场景时。 过滤和排序文本嵌入擅长理解语义内容,但它们无法自然地处理过滤或排序等操作。例如,当您想要检索供应能力大于40,000的供应商时,文本嵌入无法胜任,因为它们并非为处理数值过滤设计的。它们无法对数值字段应用“>”或“<”这样的条件,而这些条件在许多数据驱动的查询中至关重要。 聚合文本嵌入在执行聚合操作时也存在困难。例如,像“欧洲有多少供应商?”这样的查询需要对数据进行计数或分组。文本嵌入本身并不提供直接的方式来聚合数据。尽管它们在语义搜索方面表现出色,但像计数、求和或分组这样的任务需要结构化操作,而嵌入无法直接完成。 数据复杂性在实际生产系统中,数据通常具有超越简单关联的复杂关系。嵌入旨在捕获语义含义,但可能会忽略实体之间复杂的关系连接。例如,在知识图谱中,供应商、位置和产品等实体是相互关联的,需要同时理解实体及其关系。嵌入无法有效处理这些复杂性,从而难以提取基于关系的准确洞察。 为什么需要知识图谱知识图谱是一种强大的数据结构,它以捕获不同实体之间关系的方式来表示知识。知识图谱不是以孤立的表格或列表形式表示数据,而是通过节点(表示实体)和边(表示实体之间的关系)来建模数据。这种方法特别适合捕获现实世界数据的复杂互联特性。 例如,在商业环境中,供应商、位置和产品等实体可以被建模为节点,节点之间通过“供应”或“位于”等关系连接。这种结构使我们能更直观地理解数据中不同元素的关系,非常适合需要导航复杂数据连接的任务。 存储结构化数据在知识图谱中,每个节点可以具有属性,例如供应能力、名称或位置,这些属性描述了它所表示的实体。节点之间的关系也可以具有属性,从而支持丰富的上下文数据表示。例如,一个供应商节点可能与一个位置节点通过“位于”关系连接,关系中包含城市或国家等属性。 使用Cypher查询Neo4j是一种流行的图数据库,它使用Cypher(一种声明性查询语言)与图进行交互。Cypher专为图设计,使得执行涉及过滤、聚合和路径匹配的复杂查询变得更容易。例如,您可以使用Cypher查找具有特定供应能力的供应商,或者查找图中两个位置之间的最短路径。 结合非结构化和结构化数据知识图谱的一大优势是能够结合结构化和非结构化数据。通过为节点添加嵌入属性,您可以将向量嵌入(表示非结构化数据,如文本描述)与图中的结构化数据一起存储。这使您能够在执行语义搜索(例如查找与“原材料”相关的供应商)的同时,执行传统的结构化查询(如“查找供应能力大于40,000的供应商”)。这种在单一数据库中融合结构化和非结构化数据的方法,使得能够运行全面的查询并做出更明智的决策。 将知识图谱与语言模型结合将知识图谱(KGs)与语言模型(LLMs)集成的关键在于充分利用两者的优势。LLMs擅长理解复杂语言、解释上下文并生成连贯的类人响应。然而,当需要执行结构化查询(如过滤、排序或对大型数据集进行聚合)时,LLMs并不是最佳选择。这时候,像Neo4j这样的知识图谱通过提供高效的结构化数据查询能力脱颖而出。 通过将生成数据库查询(例如Neo4j的Cypher)的复杂性卸载到专用工具函数上,您可以确保应用程序保持稳健、可靠和准确。这种任务分工使LLMs能够专注于其自然语言处理的优势,而知识图谱则高效处理结构化数据。结果是,您可以构建一个结合两者优势的系统,无缝处理非结构化语言数据和结构化、基于关系的查询。 项目概述:供应商管理我们将实现一个系统来存储、搜索和检索来自世界各地的供应商,每个供应商具有以下属性: 系统必须能够处理: • 基于供应能力的过滤。 • 计数和分组。 • 基于描述的向量相似性搜索。 • 聚合(例如按位置分组)。
安装所需库在笔记本或终端中安装以下库: !pipinstall--quietneo4jpyvislangchain-communitylangchain-openailanggraph 这些库包括: •neo4j:用于连接Neo4j的Python驱动。 •pyvis:用于可视化图(可选)。 •langchain-community:LangChain相关的社区工具。 •langchain-openai:用于使用OpenAI的LLM和嵌入。 •langgraph:用于构建带图逻辑的状态机(例如ReAct智能体)。
配置环境变量为了安全地连接Neo4j和OpenAI,我们将所需的凭据存储为环境变量。这使得代码可以访问敏感信息而无需硬编码。以下是设置方式: importos
NEO4J_URI="neo4j+s://<your-instance-id>.databases.neo4j.io" NEO4J_USER="neo4j" NEO4J_PASSWORD="<your-password>" OPENAI_API_KEY="<your-openai-key>"
os.environ["NEO4J_URI"]=NEO4J_URI os.environ["NEO4J_USERNAME"]=NEO4J_USER os.environ["NEO4J_PASSWORD"]=NEO4J_PASSWORD os.environ["OPENAI_API_KEY"]=OPENAI_API_KEY
此代码将Neo4j和OpenAI的凭据设置为环境变量,确保它们可以在整个应用程序中安全访问。 连接到Neo4j我们需要连接到一个Neo4j实例,以便能够执行数据导入和查询,以下代码可以实现这一点。 fromlangchain_community.graphsimportNeo4jGraph
graph=Neo4jGraph(refresh_schema=False)
Neo4jGraph负责建立连接,并可以选择性地刷新模式。
在Neo4j中构建知识图谱使用CSV进行数据导入您有两个CSV文件: 1.nodes.csv,包含列如id、location、name、description。 2.relationships.csv,包含列如START_ID、END_ID、TYPE。
以下是数据导入脚本: importcsv importnumpyasnp fromneo4jimportGraphDatabase
NODES_CSV="nodes.csv" RELATIONSHIPS_CSV="relationships.csv" defget_label_for_type(node_type): mapping={ "Supplier":"Supplier", "Manufacturer":"Manufacturer", "Distributor":"Distributor", "Retailer":"Retailer", "Product":"Product" } returnmapping.get(node_type,"Entity")
defingest_nodes(driver): withdriver.session()assession: withopen(NODES_CSV,mode='r',encoding='utf-8')asf: reader=csv.DictReader(f) forrowinreader: node_id=row['id:ID'] name=row['name'] node_type=row['type'] location=row['location'] supply_capacity=np.random.randint(1000,50001) description=row['description'] label=get_label_for_type(node_type) iflocation.strip(): query=f""" MERGE(n:{label}{{id id}}) SETn.name=$name,n.location=$location, n.description=$description,n.supply_capacity=$supply_capacity """ params={ "id":node_id, "name":name, "location":location, "description":description, "supply_capacity":supply_capacity } else: query=f""" MERGE(n:{label}{{id id}}) SETn.name=$name """ params={"id":node_id,"name":name} session.run(query,params)
defingest_relationships(driver): withdriver.session()assession: withopen(RELATIONSHIPS_CSV,mode='r',encoding='utf-8')asf: reader=csv.DictReader(f) forrowinreader: start_id=row[':START_ID'] end_id=row[':END_ID'] rel_type=row[':TYPE'] product=row['product'] ifproduct.strip(): query=f""" MATCH(start{{id start_id}}) MATCH(end{{id end_id}}) MERGE(start)-[r:{rel_type}{{product product}}]->(end) """ params={ "start_id":start_id, "end_id":end_id, "product":product } else: query=f""" MATCH(start{{id start_id}}) MATCH(end{{id end_id}}) MERGE(start)-[r:{rel_type}]->(end) """ params={ "start_id":start_id, "end_id":end_id } session.run(query,params)
•ingest_nodes函数从CSV中读取供应商数据并在Neo4j中创建节点。 • 每个节点表示一个供应商,具有名称、位置和供应能力等属性。 •MERGE命令确保节点被创建或更新而不会重复。 •ingest_relationships函数从CSV中读取关系并在节点之间创建连接。 • 在供应商和产品节点之间创建关系,关系具有产品等属性。 •MERGE命令确保关系唯一。
创建索引在Neo4j中创建索引或约束对于性能和数据唯一性至关重要: defcreate_indexes(driver): withdriver.session()assession: forlabelin["Supplier","Manufacturer","Distributor","Retailer","Product"]: session.run(f"CREATECONSTRAINTIFNOTEXISTSFOR(n:{label})REQUIREn.idISUNIQUE")
运行数据导入以下代码将之前定义的所有内容整合在一起,并启动数据导入过程到我们的Neo4j实例中。 driver=GraphDatabase.driver(NEO4J_URI,auth=(NEO4J_USER,NEO4J_PASSWORD)) create_indexes(driver) ingest_nodes(driver) ingest_relationships(driver) print("Dataingestioncomplete.") driver.close()
导入完成后,您可以在Neo4j AuraDB实例中可视化模式。  在Neo4j中存储向量嵌入尽管我们已经有了结构化数据,但我们仍希望为描述添加文本嵌入。通过在Neo4j中存储向量嵌入,您可以执行语义查询(例如,查找与查询文本相似的供应商)。 fromlangchain_openaiimportOpenAIEmbeddings fromlangchain_community.vectorstoresimportNeo4jVector
embedding=OpenAIEmbeddings(model="text-embedding-3-small") neo4j_vector=Neo4jVector.from_existing_graph( embedding=embedding, index_name="supply_chain", node_label="Supplier", text_node_properties=["description"], embedding_node_property="embedding", )
通过此配置: •node_label="Supplier"将嵌入存储限制为供应商节点。 •text_node_properties=["description"]指定哪些属性会被嵌入。 •embedding_node_property="embedding"是存储生成嵌入向量的位置。
现在,我们可以在Neo4j中执行语义相似性查询,将结构化数据与非结构化文本搜索相结合。 处理结构化查询的工具为了处理结构化查询(如计数或列出具有数值过滤的供应商),我们定义了一些工具。每个工具本质上是LLM可以调用的一个函数。我们使用Pydantic指定预期输入,确保清晰性和类型检查。 供应商计数工具我们需要一个工具来根据可选的过滤条件(如最小供应能力、最大供应能力或按属性分组)统计供应商数量。以下代码使用Pydantic定义了输入架构,并实现了查询Neo4j并返回供应商计数的函数。 输入架构我们需要指定输入架构以确保正确的信息被发送到工具。 frompydanticimportBaseModel,Field fromtypingimportOptional,Dict,List
classSupplierCountInput(BaseModel): min_supply_amount:Optional[int]=Field( description="Minimumsupplyamountofthesuppliers" ) max_supply_amount:Optional[int]=Field( description="Maximumsupplyamountofthesuppliers" ) grouping_key:Optional[str]=Field( description="Thekeytogroupbytheaggregation", enum=["supply_capacity","location"] )
SupplierCountInput定义了工具的输入结构,包括按供应能力过滤供应商的可选字段。它还提供了按属性(例如位置)分组结果的能力。
函数实现以下代码块中定义了返回供应商计数所需的函数: importre fromlangchain_core.toolsimporttool
defextract_param_name(filter:str)->str: pattern=r'\$\w+' match=re.search(pattern,filter) ifmatch: returnmatch.group()[1:] returnNone
@tool("supplier-count",args_schema=SupplierCountInput) defsupplier_count( min_supply_amount:Optional[int], max_supply_amount:Optional[int], grouping_key:Optional[str], )->List[Dict]: """CalculatethecountofSuppliersbasedonparticularfilters""" filters=[ ("t.supply_capacity>=$min_supply_amount",min_supply_amount), ("t.supply_capacity<=$max_supply_amount",max_supply_amount) ] params={ extract_param_name(condition):value forcondition,valueinfilters ifvalueisnotNone } where_clause="AND".join( [conditionforcondition,valueinfiltersifvalueisnotNone] ) cypher_statement="MATCH(t:Supplier)" ifwhere_clause: cypher_statement+=f"WHERE{where_clause}" return_clause=( f"t.{grouping_key},count(t)ASsupplier_count" ifgrouping_key else"count(t)ASsupplier_count" ) cypher_statement+=f"RETURN{return_clause}" print(cypher_statement)#Debuggingoutput returngraph.query(cypher_statement,params=params)
工作原理: •extract_param_name:此辅助函数从过滤条件中提取参数名称(例如$min_supply_amount)。 •supplier_count:此工具函数构造一个Cypher查询,根据给定的过滤条件统计供应商数量,并可选择按属性(如位置)分组结果。 • 它动态构建查询,添加适当的过滤条件,并使用Neo4j的查询系统执行查询。
供应商列表工具我们需要一个工具来列出供应商,可选地按供应能力排序、按能力过滤,并在提供description时执行向量搜索。 输入架构我们需要指定输入架构以确保正确的信息被发送到工具。 classSupplierListInput(BaseModel): sort_by:str=Field(description="HowtosortSuppliersbysupplycapacity",enum=['supply_capacity']) k:Optional[int]=Field(description="NumberofSupplierstoreturn") description:Optional[str]=Field(description="DescriptionoftheSuppliers") min_supply_amount:Optional[int]=Field(description="Minimumsupplyamountofthesuppliers") max_supply_amount:Optional[int]=Field(description="Maximumsupplyamountofthesuppliers")
SupplierListInput定义了此工具的输入架构,包含按供应能力过滤、按指定键(如supply_capacity)排序以及可选的描述(用于基于向量的搜索)的参数。
函数实现以下代码块中定义了返回供应商列表所需的函数: @tool("supplier-list",args_schema=SupplierListInput) defsupplier_list( sort_by:str="supply_capacity", k:int=4, description:Optional[str]=None, min_supply_amount:Optional[int]=None, max_supply_amount:Optional[int]=None, )->List[Dict]: """Listsuppliersbasedonparticularfilters"""
#Handlevector-onlysearchwhennoprefilteringisapplied ifdescriptionandnotmin_supply_amountandnotmax_supply_amount: returnneo4j_vector.similarity_search(description,k=k) filters=[ ("t.supply_capacity>=$min_supply_amount",min_supply_amount), ("t.supply_capacity<=$max_supply_amount",max_supply_amount) ] params={ key.split("$")[1]:valueforkey,valueinfiltersifvalueisnotNone } where_clause="AND".join([conditionforcondition,valueinfiltersifvalueisnotNone]) cypher_statement="MATCH(t:Supplier)" ifwhere_clause: cypher_statement+=f"WHERE{where_clause}" #Sortingandreturning cypher_statement+="RETURNt.nameASname,t.locationASlocation,t.descriptionasdescription,t.supply_capacityASsupply_capacityORDERBY" ifdescription: cypher_statement+=( "vector.similarity.cosine(t.embedding,$embedding)DESC" ) params["embedding"]=embedding.embed_query(description) elifsort_by=="supply_capacity": cypher_statement+="t.supply_capacityDESC" else: #Fallbackorotherpossiblesorting cypher_statement+="t.yearDESC" cypher_statement+="LIMITtoInteger($limit)" params["limit"]=kor4 print(cypher_statement)#Debuggingoutput data=graph.query(cypher_statement,params=params) returndata
supplier_list:此工具列出供应商,可选地按供应能力过滤,并按供应能力或其他标准排序。如果提供了描述,则使用Neo4j的向量存储执行向量相似性搜索以找到与描述匹配的供应商。该函数还处理查询的构建并在Neo4j中执行。
关键点: •向量搜索:如果仅提供描述,则完全依赖于Neo4j的向量存储。 •结构化+向量结合:如果还提供了其他过滤条件,则它会构建一个Cypher查询,按向量相似性或供应能力排序。 •鲁棒性:通过严格编码查询的构建方式,减少了错误并保持了清晰性。
集成LangChain和LangGraph在本节中,我们将创建一个ReAct风格的智能体,该智能体使用LangGraph决定何时调用工具(如supplier-count和supplier-list)。LangChain用于管理LLM接口,而LangGraph定义了智能体的流程。 构建智能体以下是帮助我们创建绑定到大型语言模型的工具的代码片段: fromlangchain_openaiimportChatOpenAI fromlangchain_core.messagesimportHumanMessage,SystemMessage
llm=ChatOpenAI(model='gpt-4-turbo') tools=[supplier_count,supplier_list] llm_with_tools=llm.bind_tools(tools) sys_msg=SystemMessage(content="YouareahelpfulassistanttaskedwithfindingandexplainingrelevantinformationaboutSupplychain")
使用LangGraph定义流程我们定义了两个节点: 1.assistant:使用LLM解析消息并决定是否需要调用工具。 2.tools:执行任何工具请求。 3. 条件边缘检查LLM的最后一条消息是否包含工具调用。如果是,则运行工具;如果不是,则终止。
fromlanggraph.graphimportStateGraph,START,MessagesState fromlanggraph.prebuiltimporttools_condition,ToolNode fromIPython.displayimportImage,display
defassistant(state:MessagesState): return{"messages":[llm_with_tools.invoke([sys_msg]+state["messages"])]}
builder=StateGraph(MessagesState) builder.add_node("assistant",assistant) builder.add_node("tools",ToolNode(tools)) #Defineedges: builder.add_edge(START,"assistant") #Ifthere'satoolcall,goto'tools';elsefinish builder.add_conditional_edges("assistant",tools_condition) builder.add_edge("tools","assistant") react_graph=builder.compile() display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))
 演示与测试我们将向智能体发送各种查询,并观察其如何决定调用工具(supplier-count或supplier-list)。 统计供应商当LLM识别到需要基于给定过滤条件统计供应商数量时,会调用supplier-count工具。然后将结果返回给用户。 fromlangchain_core.messagesimportHumanMessage
messages=[ HumanMessage( content="Howmanysuppliershavesupplycapacitymorethan20000andislocatedinOslo?" ) ] result=react_graph.invoke({"messages":messages}) forminresult["messages"]: m.pretty_print()
 1. LLM识别到需要统计供应商数量 → 调用supplier-count。 2. 工具返回供应商数量。 3. LLM将此信息传达给用户。
列出供应商当查询要求列出供应能力高于某一阈值的供应商时,会调用supplier-list工具。智能体处理请求并检索相关供应商。 messages=[ HumanMessage( content="Whatarethesuppliershavingcapacityabove40000?" ) ] result=react_graph.invoke({"messages":messages}) forminresult["messages"]: m.pretty_print()
  在这里,LLM调用supplier-list以检索供应能力高于40,000的供应商。 组合查询如果您提供的查询同时包含描述和数值过滤条件,智能体将结合向量相似性和Neo4j中的结构化查询: messages=[ HumanMessage( content="Findsuppliersthatdealwithsteelandhaveatleast20000supplycapacity." ) ] result=react_graph.invoke({"messages":messages}) forminresult["messages"]: m.pretty_print()
工具将为供应能力>=20000构建一个WHERE子句,同时使用描述中包含“dealing with steel”的向量属性进行语义匹配。  结论通过超越仅基于嵌入的查询,并结合知识图谱和结构化工具,我们解锁了强大的协同效应: •精确性:数值过滤、计数和分组由Neo4j的Cypher查询完美处理。 •语义相关性:文本嵌入在语义匹配描述方面仍然表现出色。 •稳定性:我们将基于生成的查询创建降至最低,而是依赖确定性的函数调用。这种方法大大减少了生产中的错误。 •可扩展性:随着模式的增长,您可以通过更多的专业工具扩展您的工具集。LLM负责协调这些工具,而不是生成复杂的查询。
您刚刚构建了一个功能齐全的RAG系统,将结构化和非结构化数据结合到一个管道中,确保两全其美——通过文本嵌入实现语义搜索,并通过Neo4j的知识图谱实现强大、精确的查询。
|