通过创建一个自定义流程来自动上传业务数据
在这一节,我会带你创建一个自定义流程,通过大语言模型自动生成节点定义、关系和 Cypher 查询,基于数据集进行操作。这种方法也适用于其他 DataFrame,同时该方法也能够自动识别其 Schema。
需要注意的是,这种方法在性能上会是个问题,尤其是与 Langchain 的 LLMGraphTransformer 相比,我将在下一节中进行介绍。而本节主要帮助你理解如果从零开始构建该过程,从原理出发,帮助你有机会设计自己的 Graph-Builder。实际上,目前所谓最佳方法的主要限制来自于它对数据的天然含义和模式高度敏感。因此,需要跳出固有的思维模式就显得至关重要,这样才能够帮助你从零开始设计 GraphRAG,或利用现有的,最佳实践的 GraphRAG 来满足你的业务需求。
现在,让我们深入研究,设置我们将在接下来的练习中使用的大语言模型。你可以使用 Langchain 所支持的任何大语言模型,只要其性能能够满足你真是的业务需要。
这里我们有两个可选的免费方案:DeepSeek-V3(注册后可获得 10 元的额度,有效期一个月)和 Ollama(可以让你轻松的在本地运行开源模型)。对于这两种方案我都进行了测试,尽管 DeepSeek-V3 提供了和 GPT-4o 类似的性能,我仍然推荐你选择 Ollama 进行学习,这样,你可以更深入的了解从模型下载到运行的整个过程。
在 Ollama 示例中,我们将使用 Qwen2.5-Coder:7B,它针对代码任务进行了微调,并在代码生成、推理和修复代码错误方面表现出色。根据你本地计算机的配置来决定是否使用更高参数量的版本,如 14B 或 32B。
让我们从初始化模型开始:
llm=OllamaLLM(model="qwen2.5-coder:latest")
让我们开始提取数据集的结构,并定义节点及其属性:
node_structure="\n".join([f"{col}:{','.join(map(str,movies[col].unique()[:3]))}..."forcolinmovies.columns])print(node_structure)对于数据集中的每一列(例如:电影类型、导演),我们来展示一些样本值。这将帮助大语言模型理解数据格式以及每一列的典型值。
ReleaseYear:1907,1908,1909...Titleanielboone,Laughinggas,Theadventuresofdollie...Origin/Ethnicity:American...Director:Wallacemccutcheonandediwins.porter,Edwinstantonporter,D.w.griffith...Cast:Williamcraven,florencelawrence,Bertharegustus,edwardboulden,Arthurv.johnson,lindaarvidson...Genre:Biographical,Comedy,Drama...Plot:Boone'sdaughterbefriendsanindianmaidenasbooneandhiscompanionstartoutonahuntingexpedition.whileheisaway,boone'scabinisattackedbytheindians,whosetitonfireandabductboone'sdaughter.boonereturns,swearsvengeance,thenheadsoutonthetrailtotheindiancamp.hisdaughterescapesbutischased.theindiansencounterboone,whichsetsoffahugefightontheedgeofacliff.aburningarrowgetsshotintotheindiancamp.boonegetstiedtothestakeandtortured.theburningarrowsetstheindiancamponfire,causingpanic.booneisrescuedbyhishorse,andboonehasaknifefightinwhichhekillstheindianchief.[2],Theplotisthatofablackwomangoingtothedentistforatoothacheandbeinggivenlaughinggas.onherwaywalkinghome,andinothersituations,shecan'tstoplaughing,andeveryoneshemeets"catches"thelaughterfromher,includingavendorandpoliceofficers.,Onabeautifulsummerdayafatherandmothertaketheirdaughterdollieonanoutingtotheriver.themotherrefusestobuyagypsy'swares.thegypsytriestorobthemother,butthefatherdriveshimoff.thegypsyreturnstothecampanddevisesaplan.theyreturnandkidnapdolliewhileherparentsaredistracted.arescuecrewisorganized,butthegypsytakesdollietohiscamp.theygagdollieandhideherinabarrelbeforetherescuepartygetstothecamp.oncetheyleavethegypsiesandescapesintheirwagon.asthewagoncrossestheriver,thebarrelfallsintothewater.stillsealedinthebarrel,dollieissweptdownstreamindangerouscurrents.aboywhoisfishingintheriverfindsthebarrel,anddollieisreunitedsafelywithherparents...
生成节点
接下来,我们使用大语言模型的提示词模板来引导模型如何提取节点及其属性。让我们先看看完整的代码:
# 设置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def validate_node_definition(node_def: Dict) -> bool:"""验证节点结构定义"""if not isinstance(node_def, dict):return Falsereturn all(isinstance(v, dict) and all(isinstance(k, str) for k in v.keys())for v in node_def.values())@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def get_node_definitions(chain, structure: str, example: Dict) -> Dict[str, Dict[str, str]]:"""使用重试逻辑来获取节点定义"""try:# 从大语言模型获得响应response = chain.invoke({"structure": structure, "example": example})# 解析响应node_defs = ast.literal_eval(response)# 验证结构if not validate_node_definition(node_defs):raise ValueError("无效的节点结构定")return node_defsexcept (ValueError, SyntaxError) as e:logger.error(f"解析节点定义时出错: {e}")raise# 更新节点定义模板node_example = {"NodeLabel1": {"property1": "row['property1']", "property2": "row['property2']"},"NodeLabel2": {"property1": "row['property1']", "property2": "row['property2']"},"NodeLabel3": {"property1": "row['property1']", "property2": "row['property2']"},}define_nodes_prompt = PromptTemplate(input_variables=["example", "structure"],template=("""分析以下数据集结构并提取节点的实体标签及其属性。\n节点属性应基于数据集列和它们的值。\n返回的结果应为一个字典,其中键是节点标签,值是节点属性。\n\n示例: {example}\n\n数据集结构:\n{structure}\n\n确保包括所有可能的节点标签及其属性。\n如果某个属性可以是其自己的节点,请将其作为单独的节点标签。\n请不要使用三重反引号标识代码块,只需返回元组的列表。\n仅返回包含节点标签和属性的字典,不要包含任何其他文本或引号。"""),)# 带有错误处理机制的执行过程try:node_chain = define_nodes_prompt | llmnode_definitions = get_node_definitions(node_chain, structure=node_structure, example=node_example)logger.info(f"节点定义: {node_definitions}")except Exception as e:logger.error(f"获取节点定义失败: {e}")raise
在这个代码片段中,我们首先使用 logging 库设置日志记录, logging 是一个 Python 模块,用于跟踪执行过程中的事件(如错误或状态更新):
logging.basicConfig(level=logging.INFO)logger=logging.getLogger(__name__)
我们使用 basicConfig 配置日志记录,以显示 INFO 级别或更高的消息,并初始化日志记录器实例,我将在代码中用它来记录消息。
这个步骤其实不是必需的,你也可以用 print 语句来代替它。然而,这是一个良好的工程实践。
接下来,我将创建一个函数来验证大语言模型生成的节点:
defvalidate_node_definition(node_defict)->bool:"""验证节点结构定义"""ifnotisinstance(node_def,dict):returnFalsereturnall(isinstance(v,dict)andall(isinstance(k,str)forkinv.keys())forvinnode_def.values())
该函数的输入是一个字典,其中键是节点标签(例如:Movie),值是属性的字典(例如:title、year)。
首先,函数检查 node_def 是否是一个字典,并验证字典中的每个值是否也是字典,并且这些字典中的所有键是否都是字符串。如果结构有效,则返回 True 。
接下来,创建一个函数来调用 LLM 链并实际生成节点:
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=1,min=4,max=10))defget_node_definitions(chain,structure:str,exampleict)->Dict[str,Dict[str,str]]:"""获取带有重试逻辑的节点定义"""try:#从大语言模型获取响应response=chain.invoke({"structure":structure,"example":example})#解析响应node_defs=ast.literal_eval(response)#验证结构ifnotvalidate_node_definition(node_defs):raiseValueError("无效的节点结构定义")returnnode_defs
如果你不熟悉 Python 中的装饰器,可能会好奇 @retry(...) 这部分是做什么的,可以将其看作是一个包装函数,围绕着实际的 get_node_definitions 函数。在这种情况下,我调用了 retry 装饰器,如果发生错误,它会自动重试该函数。
● stop_after_attempt(3) : 最多重试 3 次。
● wait_exponential : 在重试之间增加延迟的时长(例如:4 秒、8 秒、16 秒等等)。
函数的输入是:
●chain : LangChain 管道(提示 + LLM)。我会在稍后定义这个管道。
●structure : 数据集结构(列和示例值)。
● example : 用于引导 LLM 的示例节点定义。
接下来,chain.invoke 将结构和示例发送给 LLM,并接收一个字符串作为响应。ast.literal_eval 将字符串响应转换成 Python 字典。
我使用 validate_node_definition 检查解析后的字典是否符合正确的格式,如果结构无效,它会引发 ValueError 。
except(ValueError,SyntaxError)ase:logger.error(f"Errorparsingnodedefinitions:{e}")raise如果响应无法解析或验证,会记录错误信息,该函数会抛出异常。
接下来,我们为 LLM 提供一个提示词模板,以引导其完成节点生成任务:
define_nodes_prompt=PromptTemplate(input_variables=["example","structure"],template=("""分析以下数据集结构并提取节点的实体标签及其属性。\n节点属性应基于数据集列和它们的值。\n返回的结果应为一个字典,其中键是节点标签,值是节点属性。\n\n示例:{example}\n\n数据集结构:\n{structure}\n\n确保包括所有可能的节点标签及其属性。\n如果某个属性可以是其自己的节点,请将其作为单独的节点标签。\n请不要使用三重反引号标识代码块,只需返回元组的列表。\n仅返回包含节点标签和属性的字典,不要包含任何其他文本或引号。"""),)请注意,我提供了本节开始时定义的节点结构,以及如何生成节点字典的示例:
node_example={"NodeLabel1":{"property1":"row['property1']","property2":"row['property2']"},"NodeLabel2":{"property1":"row['property1']","property2":"row['property2']"},"NodeLabel3":{"property1":"row['property1']","property2":"row['property2']"},}在示例中,键是节点标签(例如:Movie、Director),值是映射到数据集列的属性字典(例如:row[’property1’] )。
接下来,让我们执行链:
try:node_chain=define_nodes_prompt|llmnode_definitions=get_node_definitions(node_chain,structure=node_structure,example=node_example)logger.info(f"节点定义:{node_definitions}")exceptExceptionase:logger.error(f"获取节点定义失败:{e}")raise在 LangChain 中,我们使用结构化提示词 | LLM | … 来创建一个链,将提示词模板与 LLM 结合,形成一个管道。我们使用 get_node_definitions 来获取并验证节点定义。
如果过程中出现失败,错误会被记录,并且程序会引发异常。
如果过程成功,它将生成类似于以下内容的结果:
INFO:__main__:NodeDefinitions:{'Movie':{'ReleaseYear':"row['ReleaseYear']",'Title':"row['Title']"},'Director':{'Name':"row['Director']"},'Cast':{'Actor':"row['Cast']"},'Genre':{'Type':"row['Genre']"},'
lot':{'Description':"row['
lot']"}}生成关系
一旦节点被定义,我们就可以识别它们之间的关系。接下来,我们来看看完整的代码是怎样的:
class RelationshipIdentifier:"""识别图数据库中节点之间的关系。"""RELATIONSHIP_EXAMPLE = [("NodeLabel1", "RelationshipLabel", "NodeLabel2"),("NodeLabel1", "RelationshipLabel", "NodeLabel3"),("NodeLabel2", "RelationshipLabel", "NodeLabel3"),]PROMPT_TEMPLATE = PromptTemplate(input_variables=["structure", "node_definitions", "example"],template="""考虑以下数据集结构:\n{structure}\n\n考虑以下节点定义:\n{node_definitions}\n\n根据数据集结构和节点定义,识别节点之间的关系(边)。\n以三元组的形式返回关系,其中每个三元组包含起始节点标签、关系标签和结束节点标签,每个三元组是一个元组。\n请仅返回元组列表。请不要使用三重反引号标识代码块,只返回元组列表。\n\n示例:\n{example}""")def __init__(self, llm: Any, logger: logging.Logger = None):self.llm = llmself.logger = logger or logging.getLogger(__name__)self.chain = self.PROMPT_TEMPLATE | self.llmdef validate_relationships(self, relationships: List[Tuple]) -> bool:"""验证关系结构"""return all(isinstance(rel, tuple) andlen(rel) == 3 andall(isinstance(x, str) for x in rel)for rel in relationships)@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def identify_relationships(self, structure: str, node_definitions: Dict) -> List[Tuple]:"""识别关系并应用重试逻辑"""try:response = self.chain.invoke({"structure": structure,"node_definitions": str(node_definitions),"example": str(self.RELATIONSHIP_EXAMPLE)})relationships = ast.literal_eval(response)if not self.validate_relationships(relationships):raise ValueError("无效的关系结构")self.logger.info(f"已验证 {len(relationships)} 个关系")return relationshipsexcept Exception as e:self.logger.error(f"验证关系时出现错误:{e}")raisedef get_relationship_types(self) -> List[str]:"""提取唯一的关系类型。"""return list(set(rel[1] for rel in self.identify_relationships()))# 用法identifier = RelationshipIdentifier(llm=llm)relationships = identifier.identify_relationships(node_structure, node_definitions)print("关系:", relationships)
由于这段代码需要进行比节点生成更多的操作,我们将代码组织在一个类中 —— RelationshipIdentifier —— 以封装所有关系提取、验证和日志记录的逻辑。我们使用类似的逻辑,因此我们提供一个关系示例:
RELATIONSHIP_EXAMPLE=[("NodeLabel1","RelationshipLabel","NodeLabel2"),("NodeLabel1","RelationshipLabel","NodeLabel3"),("NodeLabel2","RelationshipLabel","NodeLabel3"),]在这里,每个关系都是一个元组,包含以下内容:
●起始节点标签:源节点的标签(例如:Movie)。
●关系标签:连接类型(例如:DIRECTED_BY)。
●结束节点标签:目标节点的标签(例如:Director)。
接下来,我们定义实际的提示词模板:
PROMPT_TEMPLATE = PromptTemplate(input_variables=["structure", "node_definitions", "example"],template="""考虑以下数据集结构:\n{structure}\n\n考虑以下节点定义:\n{node_definitions}\n\n根据数据集结构和节点定义,识别节点之间的关系(边)。\n以三元组的形式返回关系,其中每个三元组包含起始节点标签、关系标签和结束节点标签,每个三元组是一个元组。\n请仅返回元组列表。请不要使用三重反引号标识代码块,只返回元组列表。\n\n示例:\n{example}""")
在这种情况下,我们有三个输入变量:
●structure:数据集结构,列出了列和示例值。我在本节开始时定义了它。
● node_definitions :节点标签及其属性的字典。这些节点是在上一节中由 LLM 生成的。
● example :三元组格式的示例关系。
接下来,我将使用者三个属性初始化类:
def__init__(self,llm:Any,logger:logging.Logger=None):self.llm=llmself.logger=loggerorlogging.getLogger(__name__)self.chain=self.PROMPT_TEMPLATE|self.llm
●llm :用于处理提示的语言模型(例如:GPT-4o-mini)。
● logger :可选参数,用于记录进度和错误(如果未提供,则默认为标准日志记录器)。
●self.chain :将提示词模板与 LLM 结合,创建一个可重用的管道。
类似之前的做法,我们创建一个方法来验证生成的关系:
defvalidate_relationships(self,relationshipsist[Tuple])->bool:"""验证关系结构。"""returnall(isinstance(rel,tuple)andlen(rel)==3andall(isinstance(x,str)forxinrel)forrelinrelationships)
该方法检查每个项目是否是元组,确保每个元组包含三个元素,并且所有元素都是字符串(例如:节点标签或关系类型)。最后,如果满足这些条件,则返回 TRUE ,否则返回 FALSE 。
接下来,我们创建一个方法来调用链并生成关系:
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=1,min=4,max=10))defidentify_relationships(self,structure:str,node_definitionsict)->List[Tuple]:"""识别关系并应用重试逻辑。"""try:response=self.chain.invoke({"structure":structure,"node_definitions":str(node_definitions),"example":str(self.RELATIONSHIP_EXAMPLE)})relationships=ast.literal_eval(response)ifnotself.validate_relationships(relationships):raiseValueError("无效的关系结构")self.logger.info(f"已验证{len(relationships)}个关系")returnrelationships
我们再次使用 retry 装饰器来在失败时重新尝试 LLM 链,并以类似于节点生成时的方式调用链。
此外,我们使用 ast.literal_eval 将 LLM 的字符串输出转换成 Python 列表,并使用 validate_relationships 来确保输出格式正确。
exceptExceptionase:self.logger.error(f"Erroridentifyingrelationships:{e}")raise如果该方法失败,它会记录错误并最多重试 3 次。
最后一个方法返回唯一的关系标签(例如:DIRECTED_BY、ACTED_IN):
defget_relationship_types(self)->List[str]:"""Extractuniquerelationshiptypes."""returnlist(set(rel[1]forrelinself.identify_relationships()))
它调用 identify_relationships 方法来获取关系列表。然后,它提取每个元组中的第二个元素(关系标签),使用 set 来去除重复项,并将结果转换回列表。
现在,终于到了生成关系的时候了:
identifier=RelationshipIdentifier(llm=llm)relationships=identifier.identify_relationships(node_structure,node_definitions)print("Relationships:",relationships)如果 LLM 在 3 次尝试内成功,它将返回一个类似以下内容的关系列表,以元组格式表示:
INFO:__main__:Identified4relationshipsRelationships:[('Movie','DirectedBy','Director'),('Movie','Starring','Cast'),('Movie','HasGenre','Genre'),('Movie','ContainsPlot','
lot')]生成 Cypher 查询
在节点和关系定义完成后,我创建了 Cypher 查询将它们加载到 Neo4j 中。这个过程遵循与节点生成和关系生成类似的逻辑。然而,我们增加了几个额外的步骤来进行验证,因为生成的输出将用于将数据加载到我们的知识图谱中。因此,我们需要尽可能提高成功的概率。让我们首先看看完整的代码:
class CypherQueryBuilder:"""构建用于 Neo4j 图数据库的 Cypher 查询。"""INPUT_EXAMPLE = """NodeLabel1: value1, value2NodeLabel2: value1, value2"""EXAMPLE_CYPHER = example_cypher = """CREATE (n1:NodeLabel1 {property1: "row['property1']", property2: "row['property2']"})CREATE (n2:NodeLabel2 {property1: "row['property1']", property2: "row['property2']"})CREATE (n1)-[:RelationshipLabel]->(n2);"""PROMPT_TEMPLATE = PromptTemplate(input_variables=["structure", "node_definitions", "relationships", "example"],template="""考虑以下节点定义:\n{node_definitions}\n\n考虑以下关系:\n{relationships}\n\n生成 Cypher 查询以创建节点和关系,使用下面的节点定义和关系。记得用数据集中的实际数据替换占位符值。\n包括每个节点的所有属性,按照节点定义,并创建关系。\n返回一个包含每个查询用分号分隔的单个字符串。\n请不要在响应中包含任何其他文本或引号。\n请仅返回包含 Cypher 查询的字符串。请不要使用三重反引号标识代码块。\n\n示例输入:\n{input}\n\n示例输出Cypher查询:\n{cypher}""")def __init__(self, llm: Any, logger: logging.Logger = None):self.llm = llmself.logger = logger or logging.getLogger(__name__)# self.chain = LLMChain(llm=llm, prompt=self.PROMPT_TEMPLATE)self.chain = self.PROMPT_TEMPLATE | self.llmdef validate_cypher_query(self, query: str) -> bool:"""使用 LLM 和正则表达式模式验证 Cypher 查询语法。"""VALIDATION_PROMPT = PromptTemplate(input_variables=["query"],template="""验证此Cypher查询并返回 TRUE 或 FALSE:查询: {query}检查规则:1. 有效的 CREATE 语句2. 正确的属性格式3. 有效的关系语法4. 无缺失的括号5. 有效的属性名称6. 有效的关系类型如果查询有效,返回 TRUE;如果无效,返回 FALSE。""")try:# 基本模式验证basic_valid = all(re.search(pattern, query) for pattern in [r'CREATE \(',r'\{.*?\}',r'\)-\[:.*?\]->'])if not basic_valid:return False# LLM 验证validation_chain = VALIDATION_PROMPT | self.llmresult = validation_chain.invoke({"query": query})# 解析结果is_valid = "TRUE" in result.upper()if not is_valid:self.logger.warning(f"LLM 验证查询失败: {query}")return is_validexcept Exception as e:self.logger.error(f"验证错误: {e}")return Falsedef sanitize_query(self, query: str) -> str:"""清理并格式化 Cypher 查询"""return (query.strip().replace('\n', ' ').replace(' ', ' ').replace("'row[", "row['").replace("]'", "']"))@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def build_queries(self, node_definitions: Dict, relationships: List) -> str:"""构建带有重试逻辑的 Cypher 查询。"""try:response = self.chain.invoke({"node_definitions": str(node_definitions),"relationships": str(relationships),"input": self.INPUT_EXAMPLE,"cypher": self.EXAMPLE_CYPHER})# 获取位于三重反引号内的响应。if '```' in response:response = response.split('```')[1]# 清理响应queries = self.sanitize_query(response)# 验证查询if not self.validate_cypher_query(queries):raise ValueError("无效的 Cypher 查询语法")self.logger.info("成功生成 Cypher 查询")return queriesexcept Exception as e:self.logger.error(f"构建 Cypher 查询出错: {e}")raisedef split_queries(self, queries: str) -> List[str]:"""将组合的查询拆分为单独的语句。"""return [q.strip() for q in queries.split(';') if q.strip()]# 用法builder = CypherQueryBuilder(llm=llm)cypher_queries = builder.build_queries(node_definitions, relationships)print("Cypher 查询:", cypher_queries)
我们提供一个提示词模板来帮助 LLM:
PROMPT_TEMPLATE = PromptTemplate(input_variables=["structure", "node_definitions", "relationships", "example"],template="""考虑以下节点定义:\n{node_definitions}\n\n考虑以下关系:\n{relationships}\n\n生成 Cypher 查询以创建节点和关系,使用下面的节点定义和关系。记得用数据集中的实际数据替换占位符值。\n包括每个节点的所有属性,按照节点定义,并创建关系。\n返回一个包含每个查询用分号分隔的单个字符串。\n请不要在响应中包含任何其他文本或引号。\n请仅返回包含 Cypher 查询的字符串。请不要使用三重反引号标识代码块。\n\n示例输入:\n{input}\n\n示例输出Cypher查询:\n{cypher}""")
现在,我提供了四个变量给提示词模板:
● structure :数据集结构,作为上下文。
●node_definitions :生成的节点及其属性。
●relationships :节点之间生成的关系。
●example : 用于格式参考的示例查询。
def__init__(self,llm:Any,logger:logging.Logger=None):self.llm=llmself.logger=loggerorlogging.getLogger(__name__)self.chain=self.PROMPT_TEMPLATE|self.llm
我们以与关系类相同的方式初始化该类。
接下来,我定义了一个验证方法来检查生成的输出:
defvalidate_cypher_query(self,query:str)->bool:"""使用LLM和正则表达式模式验证Cypher查询语法。"""VALIDATION_PROMPT=PromptTemplate(input_variables=["query"],template="""验证此Cypher查询并返回TRUE或FALSE:查询:{query}检查规则:1.有效的CREATE语句2.正确的属性格式3.有效的关系语法4.无缺失的括号5.有效的属性名称6.有效的关系类型如果查询有效,返回TRUE;如果无效,返回FALSE。""")try:#基本模式验证basic_valid=all(re.search(pattern,query)forpatternin[r'CREATE\(',r'\{.*?\}',r'\)-\[:.*?\]->'])ifnotbasic_valid:returnFalse#LLM验证validation_chain=VALIDATION_PROMPT|self.llmresult=validation_chain.invoke({"query":query})#解析结果is_valid="TRUE"inresult.upper()ifnotis_valid:self.logger.warning(f"LLM验证查询失败:{query}")returnis_validexceptExceptionase:self.logger.error(f"验证错误:{e}")returnFalse该方法执行两个验证步骤。首先是使用正则表达式进行基本验证:
basic_valid=all(re.search(pattern,query)forpatternin[r'CREATE\(',r'\{.*?\}',r'\)-\[:.*?\]->'])ifnotbasic_valid:returnFalse这确保查询包含必要的 Cypher 语法:
●CREATE :确保节点和关系正在被创建。
● {.*?} :确保包含属性。
● -: .*?→ :确保关系格式正确。
然后,它使用 LLM 执行高级验证:
validation_chain=VALIDATION_PROMPT|self.llmresult=validation_chain.invoke({"query":query})is_valid="TRUE"inresult.upper()验证在提示中指定,我要求 LLM 确保以下几点:
1. 有效的 CREATE 语句
2. 正确的属性格式
3. 有效的关系语法
4. 无缺失的括号
5.有效的属性名称
6.有效的关系类型
到目前为止一切看上去工作的都还不错,这里,让我再添加一个方法,进一步清理生成的输出:
defsanitize_query(self,query:str)->str:"""清理并格式化Cypher查询。"""return(query.strip().replace('\n','').replace('','').replace("'row[","row['").replace("]'","']"))我将移除不必要的空格以及换行符(\n),并修复与数据集引用相关的潜在格式问题(例如:row[’property1’])。
请根据你所使用的大语言模型考虑更新此方法,较小参数量的模型可能需要更多的数据清理操作。
接下来,我来定义一个查询调用方法:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))def build_queries(self, node_definitions: Dict, relationships: List) -> str:"""构建带有重试逻辑的 Cypher 查询。"""try:response = self.chain.invoke({"node_definitions": str(node_definitions),"relationships": str(relationships),"input": self.INPUT_EXAMPLE,"cypher": self.EXAMPLE_CYPHER})# 获取位于三重反引号内的响应if '```' in response:response = response.split('```')[1]# 清理响应queries = self.sanitize_query(response)# 验证查询if not self.validate_cypher_query(queries):raise ValueError("无效的 Cypher 查询语法")self.logger.info("成功生成 Cypher 查询")return queriesexcept Exception as e:self.logger.error(f"构建 Cypher 查询时出错: {e}")raise
这个方法与关系构建器类中的方法类似,唯一的不同之处是:
if'```'inresponse:response=response.split('```')[1]在这里,LLM 可能会提供额外的 Markdown 格式来指定它是一个代码块。如果 LLM 的响应中存在这种格式,我只会提取三重反引号内的代码。
接下来,我定义一个方法,将单一的 Cypher 查询字符串拆分成单独的语句:
defsplit_queries(self,queries:str)->List[str]:"""将组合的查询拆分为单独的语句"""return[q.strip()forqinqueries.split(';')ifq.strip()]例如,以下 Cypher 查询:
CREATE(n1:Movie{title:"Inception"});CREATE(n2
irector{name:"Nolan"});这将转换成以下形式:
["CREATE(n1:Movie{title:'Inception'})","CREATE(n2
irector{name:'Nolan'})"]这将非常有用,因为可以遍历查询列表。
最后,初始化类并生成 Cypher 查询:
builder=CypherQueryBuilder(llm=llm)cypher_queries=builder.build_queries(node_definitions,relationships)print("Cypher查询:",cypher_queries)成功时,输出将如下所示:
INFO:__main__:SuccessfullygeneratedCypherqueriesCypherQueries:CREATE(m:Movie{Release_Year:"row['ReleaseYear']",Title:"row['Title']"})CREATE(d
irector{Name:"row['Director']"})CREATE(c:Cast{Actor:"row['Cast']"})CREATE(g:Genre{Type:"row['Genre']"})CREATE(p
lot{Description:"row['
lot']"})CREATE(m)-[
irected_By]->(d)CREATE(m)-[:Starring]->(c)CREATE(m)-[:Has_Genre]->(g)CREATE(m)-[:Contains_Plot]->(p)最后,遍历数据集,并为每一行执行生成的 Cypher 查询。
logs = ""total_rows = len(df)def sanitize_value(value):if isinstance(value, str):return value.replace('"', '')return str(value)for index, row in tqdm(df.iterrows(),total=total_rows,desc="正在加载数据到 Neo4j",position=0,leave=True):# 将占位符替换为实际的值cypher_query = cypher_queriesfor column in df.columns:cypher_query = cypher_query.replace(f"row['{column}']",f'{sanitize_value(row[column])}')try:# 执行查询并更新进度conn.execute_query(cypher_query)except Exception as e:logs += f"在行 {index+1}: {str(e)} 出现错误\n"
请注意,我定义了一个空字符串变量 logs ,用于捕获潜在的失败。我还添加了一个清理函数,用于传递给每个行输入的值:
defsanitize_value(value):ifisinstance(value,str):returnvalue.replace('"','')returnstr(value)将防止包含双引号的字符串破坏查询语法。
接下来,我们来遍历数据集:
forindex,rowintqdm(df.iterrows(),total=total_rows,desc="正在加载数据到Neo4j",position=0,leave=True):#将占位符替换为实际的值cypher_query=cypher_queriesforcolumnindf.columns:cypher_query=cypher_query.replace(f"row['{column}']",f'{sanitize_value(row[column])}')try:#执行查询并更新进度conn.execute_query(cypher_query)exceptExceptionase:logs+=f"在行{index+1}:{str(e)}出现错误\n"正如我在练习开始时提到的,我使用 tqdm 为进度条添加了一个漂亮的外观,以可视化的方式显示处理了多少行数据。我传递了 df.iterrows() 来遍历 DataFrame,提供索引和行数据。total=total_rows 由 tqdm 用于计算进度。添加 desc=”正在加载数据到 Neo4j” 来为进度条提供标签。最后,position=0, leave=True 确保进度条在控制台中保持可见。
接下来,我将像 row[’column_name’] 这样的占位符替换成实际的数据集值,将每个值传递给 sanitize_value 函数,并执行查询。
让我们检查一下数据集是否已上传。切换到 Neo4j,并运行以下 Cypher 查询:
MATCHp=(m:Movie)-[r]-(n)RETURNpLIMIT100;
在我的机器上,LLM 生成了以下图表:
这与我们手动上传的知识图谱非常相似。对于一个简单的 LLM 来说,这还不赖对吧。虽然这需要相当多的编码工作,但我们现在可以将其重用于多个数据集,更重要的是,可以将其作为基础,创建更复杂的 LLM 图形构建器。
在我提供的示例中,还没有通过提供实体、关系和属性来帮助 LLM。然而,考虑将它们作为示例来提高 LLM 的性能。此外,更现代化的方法利用思维链来提出额外的节点和关系。这使得模型能够顺序推理并进一步改进结果。另一种策略是提供行样本,以更好的适应每行中提供的值。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |