ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">在我之前的文章中,我探讨了如何利用基于智能体(Agent)的方法来模块化文档到知识图谱(KG)的构建流程。我们可以将这种方法进一步拓展,不仅利用大型语言模型(LLMs)进行信息抽取,还能让它们参与到本体(Ontology)的设计中。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">这开启了构建一个真正端到端框架的可能性:LLMs 辅助定义本体,然后直接从文本中提取信息并构建知识图谱。但同时我也意识到,构建一个有效的本体并非易事,它不仅取决于领域的复杂性,还与知识图谱的使用目的(例如推荐、问答、探索等)紧密相关。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">因此,我没有赋予系统完全自主权,而是引入了一种 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: inherit;color: rgb(1, 155, 252);">融合“人在环路”(Human-in-the-loop)技术的工作流程。这使得人类能够进行最终决策,从而确保本体设计与用户的目的相符。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">我也注意到,使用知识图谱进行问答(QA)比传统 RAG 方法更复杂,也更具挑战性。在本文中,我将介绍如何在统一的智能体驱动框架内实现 GraphRAG,并通过集成路由机制动态选择最佳策略。通过将基于智能体(Agent-based)的设计与路由相结合,我们可以为每个问题动态选择最适合的策略。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">总而言之,本文主要包含两大部分:ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);" class="list-paddingleft-1"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;text-indent: -1em;display: block;margin: 0.2em 8px;color: rgb(63, 63, 63);">• ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: inherit;color: rgb(1, 155, 252);">第一部分:用于知识图谱构建的人工参与的多智能体系统 数据集 本次,我将使用 CORD-19(https://github.com/allenai/cord19?tab=readme-ov-file) 数据集,这是一个关于 COVID-19 及相关冠状病毒研究的学术论文语料库,用于构建知识图谱。由于资源限制,我随机抽取了 1,000 篇论文,并提取了它们的摘要,将每篇摘要保存为一个独立的.txt文件。希望尽管数据量有限,我们仍能从中发现有价值的信息。
第一部分:用于从文本构建知识图谱的人工参与的多智能体系统 1. 探讨与概念澄清 在深入探讨系统设计之前,我想先澄清几个关键概念。我研究了 Neo4j,他们将知识图谱分为两种类型:文档结构图(Document Structure Graphs) 和领域图(Domain Graphs) 。
我在之前的文章中所做的是这两种类型的结合。实际上,这样做是可行的 ,并且适用于大多数情况。而选择哪种类型,很大程度上取决于知识图谱的具体用途。领域图虽然缺乏上下文信息,但它非常擅长解决聚合(Aggregation)查询。聚合查询就像是把分散在不同地方(文档段落)的关于同一个主题(例如药物 B 的症状)的信息汇集到一起。例如:针对 药物 B 的症状,有多篇 研究资源进行了探讨。通过使用领域图,我们可以聚合这些信息,并通过一个名为药物 B 的共同实体将那些分散的段落关联起来。所以当用户询问:药物 B 有什么症状?我们可以通过类似这样的查询来获取答案:
MATCH(a)-[:HAS_SYMPTOM]->(b)WHEREa.name="drugB"RETURNb 这也解决了 RAG 的一个局限性:RAG 通常需要截取 top-k 个相关段落,但有时我们需要的信息不止于此。使用图查询,我们则不需要使用“top-k”参数。然而,为了提供完整且有依据的答案,我们仍然需要将子图查询结果追溯到其原始文本段落,并将这些段落输入 LLMs。这时,文档结构图便能发挥作用。
接下来,我们进一步探讨实体间的**关系(Relationship)**类型,我将其分为两类:
• 开放关系(Open Relationship):这类关系没有预设的约束,关系类型也不是预先定义好的,通常以自然语言的形式表达。 • 封闭关系(Closed Relationship):这类关系则从一个固定的、预定义好的类型集合中选取。 选择哪种关系类型取决于下游任务。封闭关系适用于许多机器学习问题,如链接预测、推荐、探索等。而当实体间的关系过于复杂、灵活甚至未知,无法用一个预定义的关系列表来表达时,则会使用开放关系。
2. 实践方法 在本次工作中,我将继续结合 领域图和文档结构图,并采用封闭关系类型。
具体来说,我们将涵盖医学(Medical)领域,重点关注药物(DRUG)、疾病(DISEASE)、症状(SYMPTOM)这三类实体以及治疗(TREATS)、引起(CAUSES)、有副作用(HAS_SIDE_EFFECT)、有症状(HAS_SYMPTOM)这些关系。我们还将介绍如何确保提取出的知识图谱满足本体的约束条件 ,例如,HAS_SIDE_EFFECT关系仅存在于药物(DRUG)和症状(SYMPTOM)实体之间,从而使知识图谱更加可靠。
这次我选择了文档解析方法(适合处理短文档),而不是分块方法,来生成文档结构图。我相信将分块选项添加到框架中是可行的,但本文中不做展示。
回顾上一篇文章,我已经在models/schema.py中定义了本体模式(Ontology Schema)。现在我们无需再手动定义它们了。
fromtypingimportList,Literal,Optional,Type,Any, get_args, get_origin,Union frompydanticimportBaseModel, Field fromdataclassesimportdataclass classMention(BaseModel): type iteral["LOCATION","TIME","PERSON","EVENT","ORGANIZATION"] string:str= Field(..., description="命名实体(如地点、时间、人物或事件)在原文中出现的精确文本字符串。") classRelation(BaseModel): head:str= Field(..., description="头部提及的实体。")# 根据LLM3建议修改描述 tail:str= Field(..., description="尾部提及的实体。")# 根据LLM3建议修改描述 relation_description:str= Field(..., description="对头部(head)和尾部(tail)提及实体之间关系的简要描述。") classSection(BaseModel): title:str= Field(..., description="章节标题") summary:str= Field(..., description="一份简要总结(150-300 字),突出本章节涵盖的关键点。") mentions ist[Mention] = Field(..., description="在本章节中找到的提及(实体)的完整列表。") relations ist[Relation] = Field(..., description="在本章节中找到的提及之间的关系的完整列表。") @dataclass classDistilledUnit: title:str= Field(..., description="单元标题") summary:str= Field(..., description="对单元内容的简明总结,介于 150 至 300 字之间。") sections ist[Section] = Field(..., description="单元内容中包含的 `Section` 列表")下面是我们的新工作流程图:
我们设计了一个包含三个主要智能体 以及若干辅助工具的系统,用于协助构建本体和模式:
本体初始化智能体(Ontology Initialization Agent)
该智能体负责分析数据,以确定领域知识 ,包括相关的实体类型 和关系类型 。它利用了两个关键的辅助工具:
•数据检索工具(Retrieve data) :由于无法将所有数据都输入 LLMs,智能体将使用此工具随机选取样本进行分析。 •搜索工具(Search Tool) :一旦初步确定了领域知识,智能体可以使用此工具在线搜索该领域的常见实体和关系,以辅助最终决策。 重要的是,该智能体还支持人工反馈(Human Feedback) ,允许用户改进或调整本体,使其更好地符合他们的具体目标。这是“人在环路”的关键环节。
模式设计智能体(Schema Design Agent)
该智能体读取原始数据和前一个智能体生成的本体,以设计一个符合文档结构的模式(Schema) 。例如,如果输入文档是学术论文,模式可能包含诸如title(标题)、authors(作者)、publication date(发表日期)、sections(章节)、figures(图表)和tables(表格)等属性。与本体智能体类似,该智能体也支持交互式反馈(Interactive Feedback) ,允许用户根据需要修改模式。这一步的输出是一个伪代码定义(Pseudo Code Definition) ,它概述了本体和文档结构。
模式生成智能体(Schema Generation Agent)
该智能体接收上一步生成的伪代码,并将其转换为JSON 模式(JSON Schema) 。然后可以使用此模式通过Pydantic 等库生成模型。这一步是自动化生成代码模式的关键。
实际上,步骤 2 和步骤 3 可以合并为一步,但我发现当代码生成智能体收到关于模式模型定义的更清晰输入时,其性能会更好。
人工参与(Human-in-the-loop)示例:
首先,我们定义ontology_init_agent(本体初始化智能体)。以下代码片段展示了智能体的基本结构,包括名称、使用的模型、输出类型、指令以及可用的工具。
# agents/ontology_init_agent.py fromtypingimportList,Optional frommetagpt.actionsimportAction frommetagpt.agentsimportAgent frommetagpt.configimportConfig frommetagpt.environmentimportEnvironment # from metagpt.ext.stanford_corenlp.stanford_corenlp import StanfordCoreNLP # Original import commented out frommetagpt.logsimportlogger frommetagpt.toolsimportTool frommetagpt.rolesimportRole frommetagpt.prompts.baseimportBasePrompt fromtypingimportAny# 用于占位符类型 # Assume these are defined elsewhere (假定这些在其他地方已定义) instruct_model =Any# 占位符:指令模型 AgentName =Any# 占位符:智能体名称枚举 Ontology =Any# 占位符:本体模型 ONTOLOGY_INIT_PROMPT =""# 占位符:本体初始化提示词 retrieve_data =Any# 占位符:数据检索工具函数 search =Any# 占位符:搜索工具函数 defcreate_ontology_init_agent() -> Agent: """创建本体初始化智能体"""# 添加中文注释 agent = Agent( name=AgentName.ontology_init_agent.value, model=instruct_model, result_type=Ontology, instructions=ONTOLOGY_INIT_PROMPT, tools=[ Tool(retrieve_data, takes_ctx=True),# 数据检索工具,需要上下文 Tool(search, takes_ctx=False), # 搜索工具 ], retries=5,# 失败重试次数 ) returnagent所以流程是一个循环:智能体生成本体 -> 将本体展示给用户 -> 征求用户确认。只有当用户同意设计时,循环才会终止。你可以简单地要求用户输入“agree”来中断循环,但这里我将使用一个“人工偏好智能体(Human Preference Agent)”来将用户意图解析成结构化对象,从而使对话更自然。以下是用于表示人工评审结果的 Pydantic 模型:
frompydanticimportBaseModel, Field fromtypingimportOptional classHumanReview(BaseModel): is_agreed:bool= Field(..., description="指示人工评审员是否同意生成的本体。") feedback:Optional[str] = Field(description="人工评审员关于本体的评论或建议。")只有当is_agreed为True时,循环才会终止。
(可选)你可以使用一个“接口智能体(Interface Agent)”来与非技术用户沟通,用自然语言解释本体。以下代码展示了本体初始化节点的主要逻辑,包括与用户交互和调用智能体:
# nodes/ontology_init_node.py importasyncio importos importglob fromtypingimportAny fromdataclassesimportdataclass frommetagpt.actionsimportAction frommetagpt.agentsimportAgent frommetagpt.nodesimportBaseNode frommetagpt.graph_transimportGraphRunContext, Dependency, End frommetagpt.utils.task_groupimporttask_group_gather fromtqdmimporttqdm # Assume these are defined elsewhere (假定这些在其他地方已定义) create_ontology_init_agent =Any# 占位符 create_interface_agent =Any# 占位符 create_human_preference_agent =Any# 占位符 select_sample_data =Any# 占位符 Prompt =Any# 占位符 Ontology =Any# 占位符 HumanReview =Any# 占位符 SchemaDesignNode =Any# 占位符 # 创建智能体实例 ontology_init_agent = create_ontology_init_agent() interface_agent = create_interface_agent() human_preference_agent = create_human_preference_agent() @dataclass classOntologyInitNode(BaseNode): data_dir:str# 数据目录路径 asyncdefrun(self, ctx: GraphRunContext) -> SchemaDesignNode: """运行本体初始化节点"""# 添加中文注释 messages = []# 用于保存对话历史 deps = Dependency(data_dir=self.data_dir)# 创建依赖对象,存储数据目录信息 sample_data = select_sample_data(data_dir=self.data_dir)# 选取样本数据 # 第一步:收集数据背景信息 background = Prompt.ask( "能否请您详细介绍一下这些数据?\n" "它们是做什么用的,以及您打算如何使用它们?\n" "我询问这些是为了更好地理解数据背后的意图和目的,以便我们可以为知识图谱设计一个更合适的本体。" ) # 进入人工评审循环 whileTrue: # 第二步:运行本体初始化智能体,生成本体草案 # 第一次运行时提供背景和样本数据,后续运行则依赖消息历史 ontology_result =awaitontology_init_agent.run( user_prompt=f""" **背景**:{background} **样本数据**:{sample_data} """iflen(messages)==0elseNone, message_history=messages, deps=deps, ) messages += ontology_result.new_messages()# 添加智能体响应到对话历史 # 第三步:使用接口智能体解释本体草案给用户 interface_response =awaitinterface_agent.run(f"""{str(ontology_result.output)}""") messages += interface_response.new_messages()# 添加解释响应到对话历史 print(interface_response.output)# 打印解释给用户 # 第四步:获取用户反馈 user_response = input("请提供您的反馈或确认(输入 'agree' 同意):")# 提示用户输入 # 第五步:使用人工偏好智能体解析用户意图 human_review =awaithuman_preference_agent.run( user_prompt=user_response, message_history=messages ) review: HumanReview = human_review.output# 获取解析结果 ifreview.is_agreed:# 如果用户同意,则跳出循环 break else: messages += human_review.new_messages()# 如果不同意,添加反馈到历史,继续循环 # 循环结束后,返回包含本体和数据目录的SchemaDesignNode returnSchemaDesignNode(data_dir=self.data_dir, ontology=ontology_result.output)我们对模式设计节点(Schema Design Node)采用同样的方式,该节点负责设计并生成 JSON 模式。这是一个输出示例:
{ "$defs":{ "EntityType":{ "type":"string", "enum":[ "PERSON","ORGANIZATION","DATE","MEDICATION","DISEASE", "LOCATION","STATISTICAL_MEASURE" ], "title":"实体类型" }, "RelationType":{ "type":"string", "enum":[ "INFECTS","LOCALIZED_IN","TREATED_WITH","OCCURS_IN", "CAUSES","IS_PART_OF","IS_HOST_TO","IS_VARIANT_OF","IS_ASSOCIATED_WITH" ], "title":"关系类型" }, "Mention":{ "type":"object", "properties":{ "type":{"$ref":"#/$defs/EntityType"}, "text":{"type":"string","description":"提及的实体文本"} }, "required":["type","text"] }, "DocUnit":{ "type":"object", "properties":{ "text":{"type":"string","description":"单元文本内容"}, "section_title":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"单元所属章节标题"}, "mentions":{ "anyOf":[ {"type":"array","items":{"$ref":"#/$defs/Mention"}}, {"type":"null"} ], "default":null, "description":"本单元中提取出的提及列表" }, "relationships":{ "anyOf":[ {"type":"array","items":{"type":"object"}}, {"type":"null"} ], "default":null, "description":"本单元中提取出的关系列表" } }, "required":["text","section_title"] } }, "type":"object", "title":"文档", "properties":{ "title":{"type":"string","description":"文档标题"}, "authors":{"type":"array","items":{"type":"string"},"description":"文档作者列表"}, "abstract":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档摘要"}, "units":{"type":"array","items":{"$ref":"#/$defs/DocUnit"},"description":"文档单元列表"}, "source":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档来源"}, "doc_type":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档类型"} }, "required":["title","authors","units","abstract","source","doc_type"], "RelationConstraints":{ // 必须包含:关于每种关系类型可拥有的主语/宾语实体类型的约束 "CAUSES":[ {"subject_type":"DISEASE","object_type":"SYMPTOM"}, {"subject_type":"DISEASE","object_type":"DISEASE"} ], "TREATS":[ {"subject_type":"MEDICATION","object_type":"DISEASE"} ] } }为了确保当 LLMs 未能正确生成 JSON 时流程不会中断,我们可以添加一些后处理/验证和重新生成的循环。例如,为模式生成智能体添加一个输出验证器:
importre importjson fromtypingimportType,Any frommetagpt.agentsimportAgent frommetagpt.utils.model_utilsimportcreate_model_from_schema frommetagpt.constimportModelRetry frommetagpt.graph_transimportRunContext# 假设 RunContext 存在 frompydanticimportBaseModel fromtypingimportget_args, get_origin,Union# 导入 Union # Assume these are defined elsewhere (假定这些在其他地方已定义) instruct_model =Any# 占位符 AgentName =Any# 占位符 SCHEMA_GENERATION_PROMPT =""# 占位符 extract_json_from_markdown =Any# 占位符 (用于从 Markdown 提取 JSON) defcreate_schema_generation_agent() -> Agent: """创建模式生成智能体"""# 添加中文注释 agent = Agent( name=AgentName.schema_generation_agent.value, model=instruct_model, instructions=SCHEMA_GENERATION_PROMPT, result_type=str,# 智能体直接输出字符串形式的 JSON retries=5,# 失败重试次数 ) # 添加输出验证器 @agent.output_validator asyncdefvalidate_schema(_: RunContext, output:str) ->str: """验证生成的 JSON 模式是否有效且符合要求"""# 添加中文注释 try: # 从可能的 Markdown 代码块中提取 JSON 字符串 match= re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output) ifnotmatch: raiseModelRetry("输出不是有效的 JSON Markdown 代码块。")# 改进错误消息 json_string =match.group(1).strip() schema =json.loads(json_string)# 解析 JSON # 基本模式结构验证 if"$defs"notinschema: raiseModelRetry("模式必须包含 '$defs' 部分。")# 改进错误消息 if"type"notinschemaorschema["type"] !="object": raiseModelRetry("模式根节点必须是类型为 'object' 的对象。")# 改进错误消息 # 验证关键模型的存在和结构 try: doc_model = create_model_from_schema(schema,"Doc") exceptExceptionase: raiseModelRetry(f"无法从模式创建 'Doc' 模型:{e}") try: doc_unit_model = create_model_from_schema(schema,"DocUnit") exceptExceptionase: raiseModelRetry(f"无法从模式创建 'DocUnit' 模型:{e}") try: # 检查 RelationType 枚举定义 create_model_from_schema(schema,"RelationType") except: raiseModelRetry("模式必须定义 `RelationType` (Enum) 模型。") # 检查 Doc 模型必须有 'units' 属性 if'units'notindoc_model.model_fields: raiseModelRetry("模型 'Doc' 必须包含属性 'units'。") # 检查 DocUnit 模型必须有可选的 'mentions' 属性 mentions_field_info = doc_unit_model.model_fields.get("mentions") ifmentions_field_info: # 检查是否是 Union 且包含 NoneType ifnot(get_origin(mentions_field_info.annotation)isUnionand type(None)inget_args(mentions_field_info.annotation)): raiseModelRetry(f"模型 'DocUnit' 的属性 'mentions' 必须是可选的 (允许 null)。") else: # 如果 'mentions' 属性缺失,也视为错误 raiseModelRetry(f"模型 'DocUnit' 必须包含属性 'mentions'。") # 检查并验证 RelationConstraints 部分 if"RelationConstraints"notinschema: raiseModelRetry(f"模式必须包含 `RelationConstraints`。") else: relation_constraints = schema["RelationConstraints"] ifnotisinstance(relation_constraints,dict): raiseModelRetry("`RelationConstraints` 必须是一个字典。")# 改进错误消息 forkey, valueinrelation_constraints.items(): ifnotisinstance(key,str)ornotisinstance(value,list): raiseModelRetry("RelationConstraints 的键必须是字符串 (关系类型),值必须是列表。")# 改进错误消息 forconstraintinvalue: ifnotisinstance(constraint,dict)or"subject_type"notinconstraintor"object_type"notinconstraint: raiseModelRetry(f"RelationConstraints 中的每个约束必须是包含 'subject_type' 和 'object_type' 的字典。无效约束:{constraint}")# 改进错误消息 # 验证通过,返回原始输出字符串 returnoutput exceptExceptionase: # 捕获所有验证中的异常,返回 ModelRetry raiseModelRetry(f"无效的 JSON 模式:{e}")根据我的经验,使用单次提示(One Shot Prompting),qwen2.5 coder 或gpt4o-mini 通常能在第一次尝试时就生成正确的结果。以下代码展示了模式设计节点如何使用这些智能体并集成人工反馈:
# nodes/schema_design_node.py importasyncio importos importglob fromtypingimportAny fromdataclassesimportdataclass frommetagpt.actionsimportAction frommetagpt.agentsimportAgent frommetagpt.nodesimportBaseNode frommetagpt.graph_transimportGraphRunContext, Dependency, End frommetagpt.utils.task_groupimporttask_group_gather fromtqdmimporttqdm # Assume these are defined elsewhere (假定这些在其他地方已定义) create_schema_design_agent =Any# 占位符 create_interface_agent =Any# 占位符 create_human_preference_agent =Any# 占位符 select_sample_data =Any# 占位符 Ontology =Any# 占位符 HumanReview =Any# 占位符 InformationExtractionNode =Any# 占位符 create_schema_generation_agent =Any# 占位符 extract_json_from_markdown =Any# 占位符 # 创建智能体实例 schema_design_agent = create_schema_design_agent() interface_agent = create_interface_agent() human_preference_agent = create_human_preference_agent() schema_generation_agent = create_schema_generation_agent()# 使用上面定义的带验证器的智能体 @dataclass classSchemaDesignNode(BaseNode): data_dir:str# 数据目录路径 ontology: Ontology# 本体信息 asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> InformationExtractionNode: """运行模式设计节点"""# 添加中文注释 messages = []# 用于保存对话历史 # 第 1 步:选取数据样本 sample_data:str= select_sample_data(self.data_dir) # 进入人工评审循环 whileTrue: # 第 2 步:运行模式设计智能体,生成模式草案 # 第一次运行时提供文档样本和本体信息,后续依赖消息历史 schema_design_result =awaitschema_design_agent.run( user_prompt=f"文档样本:{sample_data}\n" "\n=============\n" f"本体:{str(self.ontology)}"iflen(messages) ==0elseNone, message_history=messages, ) messages += schema_design_result.new_messages()# 添加智能体响应到对话历史 # 第 3 步:使用接口智能体解释模式草案给用户 # 移除思维标签,清理输出以便展示 interface_response_result =awaitinterface_agent.run( user_prompt="" f"{str(schema_design_result.output).replace('<think>','').replace('</think>','')}" ) print(interface_response_result.output)# 打印解释给用户 # 第 4 步:获取用户反馈 user_response =input("请提供您的反馈或确认(输入 'agree' 同意):")# 提示用户输入 # 第 5 步:使用人工偏好智能体解析用户意图 human_review =awaithuman_preference_agent.run( user_prompt=user_response, message_history=messages ) review: HumanReview = human_review.output# 获取解析结果 ifreview.is_agreed: # 如果用户同意,则跳出循环 break else: messages += human_review.new_messages()# 如果不同意,添加反馈到历史,继续循环 # 循环结束后,运行模式生成智能体,将伪代码转换为 JSON Schema schema_result =awaitschema_generation_agent.run( user_prompt=schema_design_result.output# 输入是模式设计的伪代码输出 ) schema = extract_json_from_markdown(schema_result.output)# 从带 Markdown 格式的输出中提取 JSON # 返回包含数据目录和最终模式的 InformationExtractionNode returnInformationExtractionNode(data_dir=self.data_dir, schema=schema)这就是模式设计(Schema Design)部分。
现在我们进入信息抽取(Information Extraction)阶段,这部分与上一篇文章中的做法非常相似。但由于我们没有预定义的 Mention 模型,数据模型只能在运行时(runtime)定义。因此,与其在智能体中显式定义输出模型,我们可以通过Type[BaseModel]对其进行抽象,并将output_type作为函数的参数传递。以下是用于提及检测智能体的定义:
fromtypingimportList,Type,Any frompydanticimportBaseModel frommetagpt.agentsimportAgent frommetagpt.graph_transimportRunContext# 假设 RunContext 存在 # Assume these are defined elsewhere (假定这些在其他地方已定义) instruct_model =Any# 占位符 AgentName =Any# 占位符 MENTION_DETECTION_PROMPT =""# 占位符 defcreate_mention_detection_agent( mention_model:Type[BaseModel],# 提及模型的类型 entity_types ist[str] # 需要检测的实体类型列表 ) -> Agent: """创建提及检测智能体"""# 添加中文注释 agent = Agent( name=AgentName.mention_detection_agent.value, model=instruct_model, system_prompt=MENTION_DETECTION_PROMPT.format(entity_types=entity_types),# 格式化系统提示词 result_type=List[mention_model],# 预期结果是提及模型的列表 retries=5,# 失败重试次数 ) # 提及检测智能体此处没有额外的输出验证器 returnagent类似地,我们创建relation_extraction_agent(关系抽取智能体)。我也使用了output_validation来确保抽取出的关系符合本体的约束。这是为确保知识图谱质量设置的“质量控制关卡”。
# agent/relation_extraction_agent.py importre fromtypingimportList,Type,Any,Dict# 导入 Dict frompydanticimportBaseModel frommetagpt.agentsimportAgent frommetagpt.constimportModelRetry frommetagpt.graph_transimportRunContext, Dependency# 假设 RunContext 和 Dependency 存在 # Assume these are defined elsewhere (假定这些在其他地方已定义) instruct_model =Any# 占位符 AgentName =Any# 占位符 RELATION_EXTRACTION_PROMPT =""# 占位符 build_dynamic_relation_model =Any# 占位符 (用于构建动态关系模型) defcreate_relation_extraction_agent(relation_model:Type[BaseModel], mention_strings ist[str], relation_types ist[str], constraints ict[str,List[Dict[str,str]]]) -> Agent: """创建关系抽取智能体"""# 添加中文注释 agent = Agent( name=AgentName.relation_extraction_agent.value, model=instruct_model, system_prompt=RELATION_EXTRACTION_PROMPT.format( mention_strings=mention_strings, relation_types=relation_types, constraints=constraints),# 将约束传递给提示词 result_type=List[relation_model],# 预期结果是关系模型的列表 retries=5,# 失败重试次数 ) # 添加输出验证器以检查关系是否符合本体约束 @agent.output_validator asyncdefvalidate_relation_constraint(ctx: RunContext[Dependency], list_relations ist[Any]):# 接收 LLM 的原始输出列表 """验证抽取出的关系是否符合本体约束"""# 添加中文注释 importre # 这个正则表达式预期提及内容的格式为 <TYPE>text</TYPE> pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>")# 预编译正则表达式 validated_relations = []# 用于存放验证通过的关系对象 forrelation_dictinlist_relations:# LLM 输出通常是字典列表 # 检查是否为字典,并包含必要的键 ifnotisinstance(relation_dict,dict)or'subject'notinrelation_dictor'predicate'notinrelation_dictor'object'notinrelation_dict: raiseModelRetry(f"关系输出格式不正确,应为包含 subject, predicate, object 键的字典。无效输出:{relation_dict}")# 改进错误消息 # 解析主语和宾语字符串,获取类型和文本 subject_str = relation_dict.get('subject','') object_str = relation_dict.get('object','') predicate = relation_dict.get('predicate') ifnotsubject_strornotobject_strornotpredicate: raiseModelRetry(f"关系信息不完整,缺少 subject, object 或 predicate。无效输出:{relation_dict}")# 改进错误消息 subject_match = pattern.match(subject_str) object_match = pattern.match(object_str) ifnotsubject_match: raiseModelRetry(f"主语 '{subject_str}' 不符合 <TYPE>text</TYPE> 格式。")# 改进错误消息 ifnotobject_match: raiseModelRetry(f"宾语 '{object_str}' 不符合 <TYPE>text</TYPE> 格式。")# 改进错误消息 subject_type = subject_match.group("type") object_type = object_match.group("type") triplet_type = { "subject_type": subject_type, "object_type": object_type } # 从依赖中获取关系约束并进行检查 relationship_constraints = ctx.deps.relationship_constraints# 确保约束已加载到deps中 allowed_constraints = relationship_constraints.get(predicate) ifallowed_constraintsisNone: # 如果关系类型未在约束中定义,则视为无效 raiseModelRetry(f"关系类型 '{predicate}' 未在 RelationConstraints 中定义。")# 改进错误消息 iftriplet_typenotinallowed_constraints: raiseModelRetry(f"三元组: ({subject_type})-[:{predicate}]->({object_type}) 不符合本体约束!")# 改进错误消息 # 验证通过后,尝试将字典解析为 Pydantic 模型实例 try: validated_relation = relation_model(**relation_dict)# 使用字典解包创建模型实例 validated_relations.append(validated_relation) exceptExceptionase: raiseModelRetry(f"三元组 '{relation_dict}' 无法解析为预期的关系模型 '{relation_model.__name__}':{e}")# 改进错误消息 returnvalidated_relations# 返回验证通过的 Pydantic 模型列表 returnagent信息抽取节点(Information Extraction Node)中的逻辑流程如下。它协调文档蒸馏、提及检测和关系抽取这三个步骤:
importasyncio importos importglob importjson fromtypingimportList,Type,Any,Dict fromdataclassesimportdataclass frommetagpt.nodesimportBaseNode frommetagpt.graph_transimportGraphRunContext, Dependency, End frommetagpt.utils.task_groupimporttask_group_gather fromtqdmimporttqdm frompydanticimportBaseModel fromtypingimportget_args, get_origin,Literal# 导入get_origin, Literal # Assume these are defined elsewhere (假定这些在其他地方已定义) create_doc_distiller_agent =Any# 占位符 create_mention_detection_agent =Any# 占位符 create_relation_extraction_agent =Any# 占位符 create_model_from_schema =Any# 占位符 build_dynamic_relation_model =Any# 占位符 AgentName =Any# 占位符 instruct_model =Any# 占位符 logger =Any# 占位符 @dataclass classInformationExtractionNode(BaseNode[None, Dependency,None]): """信息抽取节点:协调文档解析、提及检测和关系抽取"""# 添加中文注释 def__init__(self, data_dir:str, schema ict, **kwargs): super().__init__(**kwargs) self.data_dir = data_dir self.schema = schema # 从 Schema 中创建 Pydantic 模型和类型定义 # 假定 create_model_from_schema 可以处理枚举定义并返回类型 self.relation_types:Type= create_model_from_schema(schema,"RelationType") # 假定 create_model_from_schema 返回的是 Literal 或 Enum 类型 self.entity_types:Type= create_model_from_schema(schema,"EntityType") # 创建文档主模型和单元模型 self.doc_model:Type[BaseModel] = create_model_from_schema(schema,"Doc") # 创建提及模型 self.mention_model:Type[BaseModel] = create_model_from_schema(schema,"Mention")# 假定返回 Pydantic Mention 模型 # 初始化智能体实例 # 假定 create_doc_distiller_agent 接收输出模型类型 self.doc_distiller_agent = create_doc_distiller_agent(output_model=self.doc_model, schema="")# Schema 参数看起来未使用 # 假定 entity_types 可以转换为字符串列表 self.mention_detection_agent = create_mention_detection_agent( self.mention_model, entity_types=list(get_args(self.entity_types))ifget_origin(self.entity_types)isLiteralelse[]# 从 Literal 类型中提取枚举值 ) asyncdefrun_task(self, ctx: GraphRunContext[None, Dependency], data:str, output_path:str): """处理单个文档的信息抽取任务"""# 添加中文注释 # 第 1 步:蒸馏文档结构 doc_results =awaittask_group_gather( [ lambda:self.doc_distiller_agent.run( user_prompt=f"原始文档:{data}\n"# 翻译提示词 ) ], timeout_seconds=100,# 设定超时时间 ) doc_result = doc_results[0] doc = doc_result.output# doc 是 self.doc_model 的实例 doc_units = doc.units# doc_units 是 DocUnit 实例的列表 # 第 2 步:为每个文档单元检测提及 all_entity_types_list =list(get_args(self.entity_types))ifget_origin(self.entity_types)isLiteralelse[] mentions_results =awaittask_group_gather( [ # 为每个文档单元创建一个提及检测任务 lambdai=i:self.mention_detection_agent.run( user_prompt=f"""段落:{doc_units[i].text}\n 实体类型:{all_entity_types_list}"""# 翻译提示词,传递实体类型列表 ) foriinrange(len(doc_units)) ], timeout_seconds=100# 设定超时时间 ) # 将提取出的提及分配回对应的文档单元 foriinrange(len(doc_units)): doc_units[i].mentions = mentions_results[i].output# mentions_results[i].output 是 List[Mention] # 第 3 步:为包含提及的文档单元抽取关系 relation_extraction_agents = [] relation_indices = []# 记录原始文档单元的索引 all_relation_types_list =list(get_args(self.relation_types))ifget_origin(self.relation_types)isLiteralelse[] foridx, mention_list_resultinenumerate(mentions_results): # 仅处理找到提及的单元 ifmention_list_result.outputandlen(mention_list_result.output) >0: # 格式化提及内容,用于关系抽取提示词/模型构建 # 根据前面 Pydantic 模型定义,使用 .type 和 .string mention_strings = [ f"<{item.type}>{item.string}</{item.type}>" foriteminmention_list_result.output# mention_list_result.output 是 List[Mention] ] # 根据找到的提及和模式中定义的关系类型动态构建关系模型 # 假定 build_dynamic_relation_model 接收提及列表和关系类型列表 relation_model = build_dynamic_relation_model(mention_strings=mention_strings, relation_types=all_relation_types_list) # 为这个单元创建关系抽取智能体 relation_extraction_agents.append( create_relation_extraction_agent( relation_model, constraints=ctx.deps.relationship_constraints,# 从上下文依赖中传递约束 mention_strings=mention_strings, relation_types=all_relation_types_list# 传递所有定义的关系类型 ) ) relation_indices.append(idx)# 存储原始单元索引 # 并行运行关系抽取智能体 ifrelation_extraction_agents:# 仅当存在需要抽取关系的单元时运行 relations_results =awaittask_group_gather( [ # 为每个关系抽取智能体创建一个任务 lambdai=i: relation_extraction_agents[i].run( user_prompt=doc_units[relation_indices[i]].text,# 使用原始文档单元的文本作为输入 deps=ctx.deps# 传递依赖,用于验证器 ) foriinrange(len(relation_extraction_agents)) ], timeout_seconds=100# 设定超时时间 ) # 将提取出的关系分配回对应的文档单元 forrel_idx, doc_idxinenumerate(relation_indices): try: # relations_results[rel_idx].output 是 List[Relation] (动态模型实例) # 假定 DocUnit 有一个 'relationships' 属性可以接受这个列表 doc_units[doc_idx].relationships = relations_results[rel_idx].output exceptExceptionase: # 处理分配过程中可能出现的错误 logger.error(f"为文档单元{doc_idx}分配关系失败:{e}") doc_units[doc_idx].relationships =None# 失败时设置为 None # 使用更新后的单元列表更新原始文档对象 doc.units = doc_units # 可选:保存处理后的文档 (原代码中注释掉了) # with open(output_path, "w", encoding="utf-8") as f: # 添加编码 # json.dump(doc.dict(), f, indent=2, ensure_ascii=False) # 添加 ensure_ascii=False asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> End: """批量处理文件进行信息抽取"""# 添加中文注释 # 确保 Dependency 对象包含 relationship_constraints ifnothasattr(ctx.deps,'relationship_constraints'): ctx.deps.relationship_constraints = {}# 初始化如果不存在 # 从 Schema 中加载关系约束到依赖对象中 if"RelationConstraints"inself.schema: ctx.deps.relationship_constraints =self.schema["RelationConstraints"] else: logger.warning("Schema 中不包含 'RelationConstraints'。关系验证可能无法正常工作。")# 增加警告 # 查找所有输入文本文件 # 根据实际设置调整路径。原代码使用 /data/cord-19/articles/ files = glob.glob(os.path.join(self.data_dir,"*.txt"))# 使用 self.data_dir 提高灵活性 # 准备 run_task 的参数列表 args = [] forfileintqdm(files, desc="准备任务参数"):# 添加进度条描述 withopen(file,"r", encoding="utf-8")asf:# 指定编码 basename = os.path.basename(file) sample_data = f.read() output_path =f"data/processed/{basename}.json"# 定义输出路径 args.append([sample_data, output_path]) # 分批处理文件 bs=10# 批次大小 foriintqdm(range(0,len(args),bs), desc="处理文件批次"):# 添加批次进度条描述 batch_args = args[i:i+bs] # task_group_gather 接收一个可等待对象(callable)列表 awaittask_group_gather( [ # 使用 lambda 捕获当前批次的文件数据和输出路径,创建异步任务 (lambdadata=data, output_path=output_path:self.run_task( ctx=ctx,# 传递当前上下文 data=data, output_path=output_path)) fordata, output_pathinbatch_args ], timeout_seconds=120,# 设置批次任务的超时时间 ) # 完成所有批次处理后,返回 End 节点 returnEnd(None)这就是信息抽取(Information Extraction)的全部过程。
3. 集成 Neo4j 将提取出的三元组集成到 Neo4j 时,我们需要注意的另一点是创建节点和节点间关系有两种方式:CREATE(创建)或MERGE(合并)节点。CREATE模式会创建两个独立的节点,即使它们指向的是同一个概念;而MERGE模式则会在表面形式相同时仅保留一个节点,但这要求你确保它们确实指向的是同一个实体。
在实践中,在医学等某些领域存在许多歧义,一个术语可能指代不同的概念。我们需要一个链接(Linking)步骤,在一个词汇表/参考集中识别某个提及的唯一标识符。我们将在后续的文章中更深入地探讨这个话题。目前,我将使用MERGE方法来在提及之间构建桥接关系。
以下是一个将三元组导入 Neo4j 的示例代码片段。请注意,您需要根据实际的文件结构和三元组格式加载数据。
fromneo4jimportGraphDatabase, basic_auth fromtqdmimporttqdm importos importglob importjson importre# 用于解析三元组字符串 # 假设 URI, AUTH, pattern 已定义。您需要根据实际情况替换或定义它们。 # 三元组的结构假定为一个列表,列表元素是字典,例如 {'subject': '<TYPE>text</TYPE>', 'predicate': 'RELATION_TYPE', 'object': '<TYPE>text</TYPE>'} # pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>") # 用于解析主语和宾语字符串 URI ="bolt://localhost:7687"# Neo4j 数据库连接 URI 占位符 AUTH = basic_auth("neo4j","password")# Neo4j 认证信息占位符 - 请替换为您的实际凭据 # 示例:从之前生成的 JSON 文件中加载三元组 triplets = []# 存放加载的三元组 data_dir ="data/processed"# 之前信息抽取生成的 JSON 文件目录 json_files = glob.glob(os.path.join(data_dir,"*.json")) # 用于解析三元组字符串的正则表达式 pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>") forjson_fileintqdm(json_files, desc="加载三元组"):# 为文件加载添加进度条 withopen(json_file,'r', encoding='utf-8')asf:# 指定编码 try: doc_data = json.load(f) # 假设 doc_data 结构包含 units -> relationships if'units'indoc_data: forunitindoc_data['units']: if'relationships'inunitandunit['relationships']: # 遍历抽取出的关系列表 forrelinunit['relationships']: # rel 应该是 Pydantic 模型实例的字典表示,其中 subject 和 object 已经是 <TYPE>text</TYPE> 格式的字符串 ifisinstance(rel,dict)and'subject'inreland'predicate'inreland'object'inrel: triplets.append(rel)# 直接添加符合格式的字典 exceptExceptionase: # print(f"加载文件 {json_file} 中的三元组失败: {e}") # 打印加载失败信息 (可选) pass# 忽略加载单个文件失败的情况 (原代码行为) # 连接 Neo4j 数据库并导入数据 withGraphDatabase.driver(URI, auth=AUTH)asdriver: driver.verify_connectivity() print("连接 Neo4j 数据库成功")# 翻译连接成功消息 fortripletintqdm(triplets, desc="导入三元组到 Neo4j"):# 为导入过程添加进度条 try: # 解析主语和宾语字符串以获取类型和文本 subject_match = pattern.match(triplet.get('subject','')) object_match = pattern.match(triplet.get('object','')) ifnotsubject_matchornotobject_match: # print(f"跳过无效三元组格式: {triplet}") # 打印跳过信息 (可选) continue# 跳过格式不符的三元组 subject_type = subject_match.group("type") subject_text = subject_match.group("text") object_type = object_match.group("type") object_text = object_match.group("text") predicate = triplet.get('predicate') ifnotsubject_typeornotsubject_textornotobject_typeornotobject_textornotpredicate: # print(f"跳过不完整三元组: {triplet}") # 打印跳过信息 (可选) continue# 跳过信息不完整的三元组 # 执行 MERGE 查询,创建或合并节点和关系 result = driver.execute_query(f""" MERGE (a:{subject_type}{{name: $head}}) MERGE (b:{object_type}{{name: $tail}}) MERGE (a)-[:{predicate}]->(b) """, head=subject_text, tail=object_text, database="neo4j",# 如果需要,指定数据库名称 ).summary # print(f"导入三元组 ({subject_text})-[:{predicate}]->({object_text}) 成功") # 打印导入成功信息 (可选) exceptExceptionase: # print(f"导入三元组 {triplet} 失败: {e}") # 打印导入失败信息 (可选) pass# 忽略导入单个三元组失败的情况 (原代码行为)结果展示
我原本期望能提取到更多药物副作用的信息,但目前看来,语料库中关于副作用的提及较少。未来可以通过扩展数据集或优化模型来改善这一情况。
总结 在本文中,我展示了如何将“人工参与(Human-in-the-loop)”设计集成到用于知识图谱构建的多智能体系统中,并以 CORD-19 数据集为例进行了案例研究。通过结合领域图 和文档结构图 ,并借助封闭关系(Closed Relationship)强制执行本体约束 ,我们可以构建出更准确、结构化的知识图谱表示。