返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

当智能体遇上GraphRAG:构建下一代动态路由知识图谱问答系统

[复制链接]
链载Ai 显示全部楼层 发表于 昨天 18:40 |阅读模式 打印 上一主题 下一主题

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.1em;color: rgb(63, 63, 63);visibility: visible;">

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">在我之前的文章中,我探讨了如何利用基于智能体(Agent)的方法来模块化文档到知识图谱(KG)的构建流程。我们可以将这种方法进一步拓展,不仅利用大型语言模型(LLMs)进行信息抽取,还能让它们参与到本体(Ontology)的设计中。

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">这开启了构建一个真正端到端框架的可能性:LLMs 辅助定义本体,然后直接从文本中提取信息并构建知识图谱。但同时我也意识到,构建一个有效的本体并非易事,它不仅取决于领域的复杂性,还与知识图谱的使用目的(例如推荐、问答、探索等)紧密相关。

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">因此,我没有赋予系统完全自主权,而是引入了一种ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: inherit;color: rgb(1, 155, 252);">融合“人在环路”(Human-in-the-loop)技术的工作流程。这使得人类能够进行最终决策,从而确保本体设计与用户的目的相符。

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">我也注意到,使用知识图谱进行问答(QA)比传统 RAG 方法更复杂,也更具挑战性。在本文中,我将介绍如何在统一的智能体驱动框架内实现 GraphRAG,并通过集成路由机制动态选择最佳策略。通过将基于智能体(Agent-based)的设计与路由相结合,我们可以为每个问题动态选择最适合的策略。

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">总而言之,本文主要包含两大部分:

    ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);" class="list-paddingleft-1">
  • ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;text-indent: -1em;display: block;margin: 0.2em 8px;color: rgb(63, 63, 63);">
    ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: inherit;color: rgb(1, 155, 252);">第一部分:用于知识图谱构建的人工参与的多智能体系统
  • 第二部分:智能体驱动的 GraphRAG

数据集

本次,我将使用 CORD-19(https://github.com/allenai/cord19?tab=readme-ov-file) 数据集,这是一个关于 COVID-19 及相关冠状病毒研究的学术论文语料库,用于构建知识图谱。由于资源限制,我随机抽取了 1,000 篇论文,并提取了它们的摘要,将每篇摘要保存为一个独立的.txt文件。希望尽管数据量有限,我们仍能从中发现有价值的信息。

第一部分:用于从文本构建知识图谱的人工参与的多智能体系统

1. 探讨与概念澄清

在深入探讨系统设计之前,我想先澄清几个关键概念。我研究了 Neo4j,他们将知识图谱分为两种类型:文档结构图(Document Structure Graphs)领域图(Domain Graphs)

来源:Neo4j

我在之前的文章中所做的是这两种类型的结合。实际上,这样做是可行的,并且适用于大多数情况。而选择哪种类型,很大程度上取决于知识图谱的具体用途。领域图虽然缺乏上下文信息,但它非常擅长解决聚合(Aggregation)查询。聚合查询就像是把分散在不同地方(文档段落)的关于同一个主题(例如药物 B 的症状)的信息汇集到一起。例如:针对药物 B的症状,有多篇研究资源进行了探讨。通过使用领域图,我们可以聚合这些信息,并通过一个名为药物 B的共同实体将那些分散的段落关联起来。所以当用户询问:药物 B 有什么症状?我们可以通过类似这样的查询来获取答案:

MATCH(a)-[:HAS_SYMPTOM]->(b)WHEREa.name="drugB"RETURNb

这也解决了 RAG 的一个局限性:RAG 通常需要截取 top-k 个相关段落,但有时我们需要的信息不止于此。使用图查询,我们则不需要使用“top-k”参数。然而,为了提供完整且有依据的答案,我们仍然需要将子图查询结果追溯到其原始文本段落,并将这些段落输入 LLMs。这时,文档结构图便能发挥作用。

接下来,我们进一步探讨实体间的**关系(Relationship)**类型,我将其分为两类:

  • • 开放关系(Open Relationship):这类关系没有预设的约束,关系类型也不是预先定义好的,通常以自然语言的形式表达。
  • • 封闭关系(Closed Relationship):这类关系则从一个固定的、预定义好的类型集合中选取。

选择哪种关系类型取决于下游任务。封闭关系适用于许多机器学习问题,如链接预测、推荐、探索等。而当实体间的关系过于复杂、灵活甚至未知,无法用一个预定义的关系列表来表达时,则会使用开放关系。

2. 实践方法

在本次工作中,我将继续结合领域图和文档结构图,并采用封闭关系类型。

具体来说,我们将涵盖医学(Medical)领域,重点关注药物(DRUG)、疾病(DISEASE)、症状(SYMPTOM)这三类实体以及治疗(TREATS)、引起(CAUSES)、有副作用(HAS_SIDE_EFFECT)、有症状(HAS_SYMPTOM)这些关系。我们还将介绍如何确保提取出的知识图谱满足本体的约束条件,例如,HAS_SIDE_EFFECT关系仅存在于药物(DRUG)症状(SYMPTOM)实体之间,从而使知识图谱更加可靠。

这次我选择了文档解析方法(适合处理短文档),而不是分块方法,来生成文档结构图。我相信将分块选项添加到框架中是可行的,但本文中不做展示。

回顾上一篇文章,我已经在models/schema.py中定义了本体模式(Ontology Schema)。现在我们无需再手动定义它们了。

fromtypingimportList,Literal,Optional,Type,Any, get_args, get_origin,Union
frompydanticimportBaseModel, Field
fromdataclassesimportdataclass

classMention(BaseModel):
typeiteral["LOCATION","TIME","PERSON","EVENT","ORGANIZATION"]
string:str= Field(...,
description="命名实体(如地点、时间、人物或事件)在原文中出现的精确文本字符串。")


classRelation(BaseModel):
head:str= Field(..., description="头部提及的实体。")# 根据LLM3建议修改描述
tail:str= Field(..., description="尾部提及的实体。")# 根据LLM3建议修改描述
relation_description:str= Field(...,
description="对头部(head)和尾部(tail)提及实体之间关系的简要描述。")


classSection(BaseModel):
title:str= Field(..., description="章节标题")
summary:str= Field(...,
description="一份简要总结(150-300 字),突出本章节涵盖的关键点。")
mentionsist[Mention] = Field(...,
description="在本章节中找到的提及(实体)的完整列表。")
relationsist[Relation] = Field(...,
description="在本章节中找到的提及之间的关系的完整列表。")


@dataclass
classDistilledUnit:
title:str= Field(..., description="单元标题")
summary:str= Field(..., description="对单元内容的简明总结,介于 150 至 300 字之间。")
sectionsist[Section] = Field(..., description="单元内容中包含的 `Section` 列表")

下面是我们的新工作流程图:

工作流程概览

我们设计了一个包含三个主要智能体以及若干辅助工具的系统,用于协助构建本体和模式:

本体初始化智能体(Ontology Initialization Agent)

该智能体负责分析数据,以确定领域知识,包括相关的实体类型关系类型。它利用了两个关键的辅助工具:

  • 数据检索工具(Retrieve data):由于无法将所有数据都输入 LLMs,智能体将使用此工具随机选取样本进行分析。
  • 搜索工具(Search Tool):一旦初步确定了领域知识,智能体可以使用此工具在线搜索该领域的常见实体和关系,以辅助最终决策。

重要的是,该智能体还支持人工反馈(Human Feedback),允许用户改进或调整本体,使其更好地符合他们的具体目标。这是“人在环路”的关键环节。

模式设计智能体(Schema Design Agent)

该智能体读取原始数据和前一个智能体生成的本体,以设计一个符合文档结构的模式(Schema)。例如,如果输入文档是学术论文,模式可能包含诸如title(标题)、authors(作者)、publication date(发表日期)、sections(章节)、figures(图表)和tables(表格)等属性。与本体智能体类似,该智能体也支持交互式反馈(Interactive Feedback),允许用户根据需要修改模式。这一步的输出是一个伪代码定义(Pseudo Code Definition),它概述了本体和文档结构。

模式生成智能体(Schema Generation Agent)

该智能体接收上一步生成的伪代码,并将其转换为JSON 模式(JSON Schema)。然后可以使用此模式通过Pydantic等库生成模型。这一步是自动化生成代码模式的关键。

实际上,步骤 2 和步骤 3 可以合并为一步,但我发现当代码生成智能体收到关于模式模型定义的更清晰输入时,其性能会更好。

人工参与(Human-in-the-loop)示例:

首先,我们定义ontology_init_agent(本体初始化智能体)。以下代码片段展示了智能体的基本结构,包括名称、使用的模型、输出类型、指令以及可用的工具。

# agents/ontology_init_agent.py
fromtypingimportList,Optional
frommetagpt.actionsimportAction
frommetagpt.agentsimportAgent
frommetagpt.configimportConfig
frommetagpt.environmentimportEnvironment
# from metagpt.ext.stanford_corenlp.stanford_corenlp import StanfordCoreNLP # Original import commented out
frommetagpt.logsimportlogger
frommetagpt.toolsimportTool
frommetagpt.rolesimportRole
frommetagpt.prompts.baseimportBasePrompt
fromtypingimportAny# 用于占位符类型

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model =Any# 占位符:指令模型
AgentName =Any# 占位符:智能体名称枚举
Ontology =Any# 占位符:本体模型
ONTOLOGY_INIT_PROMPT =""# 占位符:本体初始化提示词
retrieve_data =Any# 占位符:数据检索工具函数
search =Any# 占位符:搜索工具函数

defcreate_ontology_init_agent() -> Agent:
"""创建本体初始化智能体"""# 添加中文注释
agent = Agent(
name=AgentName.ontology_init_agent.value,
model=instruct_model,
result_type=Ontology,
instructions=ONTOLOGY_INIT_PROMPT,
tools=[
Tool(retrieve_data, takes_ctx=True),# 数据检索工具,需要上下文
Tool(search, takes_ctx=False), # 搜索工具
],
retries=5,# 失败重试次数
)

returnagent

所以流程是一个循环:智能体生成本体 -> 将本体展示给用户 -> 征求用户确认。只有当用户同意设计时,循环才会终止。你可以简单地要求用户输入“agree”来中断循环,但这里我将使用一个“人工偏好智能体(Human Preference Agent)”来将用户意图解析成结构化对象,从而使对话更自然。以下是用于表示人工评审结果的 Pydantic 模型:

frompydanticimportBaseModel, Field
fromtypingimportOptional

classHumanReview(BaseModel):
is_agreed:bool= Field(..., description="指示人工评审员是否同意生成的本体。")
feedback:Optional[str] = Field(description="人工评审员关于本体的评论或建议。")

只有当is_agreedTrue时,循环才会终止。

(可选)你可以使用一个“接口智能体(Interface Agent)”来与非技术用户沟通,用自然语言解释本体。以下代码展示了本体初始化节点的主要逻辑,包括与用户交互和调用智能体:

# nodes/ontology_init_node.py
importasyncio
importos
importglob
fromtypingimportAny
fromdataclassesimportdataclass
frommetagpt.actionsimportAction
frommetagpt.agentsimportAgent
frommetagpt.nodesimportBaseNode
frommetagpt.graph_transimportGraphRunContext, Dependency, End
frommetagpt.utils.task_groupimporttask_group_gather
fromtqdmimporttqdm

# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_ontology_init_agent =Any# 占位符
create_interface_agent =Any# 占位符
create_human_preference_agent =Any# 占位符
select_sample_data =Any# 占位符
Prompt =Any# 占位符
Ontology =Any# 占位符
HumanReview =Any# 占位符
SchemaDesignNode =Any# 占位符

# 创建智能体实例
ontology_init_agent = create_ontology_init_agent()
interface_agent = create_interface_agent()
human_preference_agent = create_human_preference_agent()


@dataclass
classOntologyInitNode(BaseNode):
data_dir:str# 数据目录路径

asyncdefrun(self, ctx: GraphRunContext) -> SchemaDesignNode:
"""运行本体初始化节点"""# 添加中文注释

messages = []# 用于保存对话历史
deps = Dependency(data_dir=self.data_dir)# 创建依赖对象,存储数据目录信息
sample_data = select_sample_data(data_dir=self.data_dir)# 选取样本数据

# 第一步:收集数据背景信息
background = Prompt.ask(
"能否请您详细介绍一下这些数据?\n"
"它们是做什么用的,以及您打算如何使用它们?\n"
"我询问这些是为了更好地理解数据背后的意图和目的,以便我们可以为知识图谱设计一个更合适的本体。"
)

# 进入人工评审循环
whileTrue:
# 第二步:运行本体初始化智能体,生成本体草案
# 第一次运行时提供背景和样本数据,后续运行则依赖消息历史
ontology_result =awaitontology_init_agent.run(
user_prompt=f"""
**背景**:{background}
**样本数据**:{sample_data}
"""iflen(messages)==0elseNone,
message_history=messages,
deps=deps,
)

messages += ontology_result.new_messages()# 添加智能体响应到对话历史

# 第三步:使用接口智能体解释本体草案给用户
interface_response =awaitinterface_agent.run(f"""{str(ontology_result.output)}""")
messages += interface_response.new_messages()# 添加解释响应到对话历史

print(interface_response.output)# 打印解释给用户

# 第四步:获取用户反馈
user_response = input("请提供您的反馈或确认(输入 'agree' 同意):")# 提示用户输入

# 第五步:使用人工偏好智能体解析用户意图
human_review =awaithuman_preference_agent.run(
user_prompt=user_response,
message_history=messages
)

review: HumanReview = human_review.output# 获取解析结果
ifreview.is_agreed:# 如果用户同意,则跳出循环
break
else:
messages += human_review.new_messages()# 如果不同意,添加反馈到历史,继续循环


# 循环结束后,返回包含本体和数据目录的SchemaDesignNode
returnSchemaDesignNode(data_dir=self.data_dir, ontology=ontology_result.output)

我们对模式设计节点(Schema Design Node)采用同样的方式,该节点负责设计并生成 JSON 模式。这是一个输出示例:

{
"$defs":{
"EntityType":{
"type":"string",
"enum":[
"PERSON","ORGANIZATION","DATE","MEDICATION","DISEASE",
"LOCATION","STATISTICAL_MEASURE"
],
"title":"实体类型"
},
"RelationType":{
"type":"string",
"enum":[
"INFECTS","LOCALIZED_IN","TREATED_WITH","OCCURS_IN",
"CAUSES","IS_PART_OF","IS_HOST_TO","IS_VARIANT_OF","IS_ASSOCIATED_WITH"
],
"title":"关系类型"
},
"Mention":{
"type":"object",
"properties":{
"type":{"$ref":"#/$defs/EntityType"},
"text":{"type":"string","description":"提及的实体文本"}
},
"required":["type","text"]
},
"DocUnit":{
"type":"object",
"properties":{
"text":{"type":"string","description":"单元文本内容"},
"section_title":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"单元所属章节标题"},
"mentions":{
"anyOf":[
{"type":"array","items":{"$ref":"#/$defs/Mention"}},
{"type":"null"}
],
"default":null,
"description":"本单元中提取出的提及列表"
},
"relationships":{
"anyOf":[
{"type":"array","items":{"type":"object"}},
{"type":"null"}
],
"default":null,
"description":"本单元中提取出的关系列表"
}
},
"required":["text","section_title"]
}
},
"type":"object",
"title":"文档",
"properties":{
"title":{"type":"string","description":"文档标题"},
"authors":{"type":"array","items":{"type":"string"},"description":"文档作者列表"},
"abstract":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档摘要"},
"units":{"type":"array","items":{"$ref":"#/$defs/DocUnit"},"description":"文档单元列表"},
"source":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档来源"},
"doc_type":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档类型"}
},
"required":["title","authors","units","abstract","source","doc_type"],

"RelationConstraints":{
// 必须包含:关于每种关系类型可拥有的主语/宾语实体类型的约束
"CAUSES":[
{"subject_type":"DISEASE","object_type":"SYMPTOM"},
{"subject_type":"DISEASE","object_type":"DISEASE"}
],
"TREATS":[
{"subject_type":"MEDICATION","object_type":"DISEASE"}
]
}
}

为了确保当 LLMs 未能正确生成 JSON 时流程不会中断,我们可以添加一些后处理/验证和重新生成的循环。例如,为模式生成智能体添加一个输出验证器:

importre
importjson
fromtypingimportType,Any
frommetagpt.agentsimportAgent
frommetagpt.utils.model_utilsimportcreate_model_from_schema
frommetagpt.constimportModelRetry
frommetagpt.graph_transimportRunContext# 假设 RunContext 存在
frompydanticimportBaseModel
fromtypingimportget_args, get_origin,Union# 导入 Union

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model =Any# 占位符
AgentName =Any# 占位符
SCHEMA_GENERATION_PROMPT =""# 占位符
extract_json_from_markdown =Any# 占位符 (用于从 Markdown 提取 JSON)

defcreate_schema_generation_agent() -> Agent:
"""创建模式生成智能体"""# 添加中文注释
agent = Agent(
name=AgentName.schema_generation_agent.value,
model=instruct_model,
instructions=SCHEMA_GENERATION_PROMPT,
result_type=str,# 智能体直接输出字符串形式的 JSON
retries=5,# 失败重试次数
)

# 添加输出验证器
@agent.output_validator
asyncdefvalidate_schema(_: RunContext, output:str) ->str:
"""验证生成的 JSON 模式是否有效且符合要求"""# 添加中文注释
try:
# 从可能的 Markdown 代码块中提取 JSON 字符串
match= re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
ifnotmatch:
raiseModelRetry("输出不是有效的 JSON Markdown 代码块。")# 改进错误消息

json_string =match.group(1).strip()
schema =json.loads(json_string)# 解析 JSON

# 基本模式结构验证
if"$defs"notinschema:
raiseModelRetry("模式必须包含 '$defs' 部分。")# 改进错误消息
if"type"notinschemaorschema["type"] !="object":
raiseModelRetry("模式根节点必须是类型为 'object' 的对象。")# 改进错误消息


# 验证关键模型的存在和结构
try:
doc_model = create_model_from_schema(schema,"Doc")
exceptExceptionase:
raiseModelRetry(f"无法从模式创建 'Doc' 模型:{e}")

try:
doc_unit_model = create_model_from_schema(schema,"DocUnit")
exceptExceptionase:
raiseModelRetry(f"无法从模式创建 'DocUnit' 模型:{e}")

try:
# 检查 RelationType 枚举定义
create_model_from_schema(schema,"RelationType")
except:
raiseModelRetry("模式必须定义 `RelationType` (Enum) 模型。")

# 检查 Doc 模型必须有 'units' 属性
if'units'notindoc_model.model_fields:
raiseModelRetry("模型 'Doc' 必须包含属性 'units'。")

# 检查 DocUnit 模型必须有可选的 'mentions' 属性
mentions_field_info = doc_unit_model.model_fields.get("mentions")

ifmentions_field_info:
# 检查是否是 Union 且包含 NoneType
ifnot(get_origin(mentions_field_info.annotation)isUnionand
type(None)inget_args(mentions_field_info.annotation)):
raiseModelRetry(f"模型 'DocUnit' 的属性 'mentions' 必须是可选的 (允许 null)。")
else:
# 如果 'mentions' 属性缺失,也视为错误
raiseModelRetry(f"模型 'DocUnit' 必须包含属性 'mentions'。")


# 检查并验证 RelationConstraints 部分
if"RelationConstraints"notinschema:
raiseModelRetry(f"模式必须包含 `RelationConstraints`。")
else:
relation_constraints = schema["RelationConstraints"]
ifnotisinstance(relation_constraints,dict):
raiseModelRetry("`RelationConstraints` 必须是一个字典。")# 改进错误消息
forkey, valueinrelation_constraints.items():
ifnotisinstance(key,str)ornotisinstance(value,list):
raiseModelRetry("RelationConstraints 的键必须是字符串 (关系类型),值必须是列表。")# 改进错误消息
forconstraintinvalue:
ifnotisinstance(constraint,dict)or"subject_type"notinconstraintor"object_type"notinconstraint:
raiseModelRetry(f"RelationConstraints 中的每个约束必须是包含 'subject_type' 和 'object_type' 的字典。无效约束:{constraint}")# 改进错误消息


# 验证通过,返回原始输出字符串
returnoutput
exceptExceptionase:
# 捕获所有验证中的异常,返回 ModelRetry
raiseModelRetry(f"无效的 JSON 模式:{e}")

根据我的经验,使用单次提示(One Shot Prompting),qwen2.5 codergpt4o-mini通常能在第一次尝试时就生成正确的结果。以下代码展示了模式设计节点如何使用这些智能体并集成人工反馈:

# nodes/schema_design_node.py
importasyncio
importos
importglob
fromtypingimportAny
fromdataclassesimportdataclass
frommetagpt.actionsimportAction
frommetagpt.agentsimportAgent
frommetagpt.nodesimportBaseNode
frommetagpt.graph_transimportGraphRunContext, Dependency, End
frommetagpt.utils.task_groupimporttask_group_gather
fromtqdmimporttqdm

# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_schema_design_agent =Any# 占位符
create_interface_agent =Any# 占位符
create_human_preference_agent =Any# 占位符
select_sample_data =Any# 占位符
Ontology =Any# 占位符
HumanReview =Any# 占位符
InformationExtractionNode =Any# 占位符
create_schema_generation_agent =Any# 占位符
extract_json_from_markdown =Any# 占位符

# 创建智能体实例
schema_design_agent = create_schema_design_agent()
interface_agent = create_interface_agent()
human_preference_agent = create_human_preference_agent()
schema_generation_agent = create_schema_generation_agent()# 使用上面定义的带验证器的智能体


@dataclass
classSchemaDesignNode(BaseNode):
data_dir:str# 数据目录路径
ontology: Ontology# 本体信息

asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> InformationExtractionNode:
"""运行模式设计节点"""# 添加中文注释

messages = []# 用于保存对话历史
# 第 1 步:选取数据样本
sample_data:str= select_sample_data(self.data_dir)

# 进入人工评审循环
whileTrue:
# 第 2 步:运行模式设计智能体,生成模式草案
# 第一次运行时提供文档样本和本体信息,后续依赖消息历史
schema_design_result =awaitschema_design_agent.run(
user_prompt=f"文档样本:{sample_data}\n"
"\n=============\n"
f"本体:{str(self.ontology)}"iflen(messages) ==0elseNone,
message_history=messages,
)

messages += schema_design_result.new_messages()# 添加智能体响应到对话历史

# 第 3 步:使用接口智能体解释模式草案给用户
# 移除思维标签,清理输出以便展示
interface_response_result =awaitinterface_agent.run(
user_prompt=""
f"{str(schema_design_result.output).replace('<think>','').replace('</think>','')}"
)

print(interface_response_result.output)# 打印解释给用户

# 第 4 步:获取用户反馈
user_response =input("请提供您的反馈或确认(输入 'agree' 同意):")# 提示用户输入

# 第 5 步:使用人工偏好智能体解析用户意图
human_review =awaithuman_preference_agent.run(
user_prompt=user_response,
message_history=messages
)

review: HumanReview = human_review.output# 获取解析结果

ifreview.is_agreed: # 如果用户同意,则跳出循环
break
else:
messages += human_review.new_messages()# 如果不同意,添加反馈到历史,继续循环

# 循环结束后,运行模式生成智能体,将伪代码转换为 JSON Schema
schema_result =awaitschema_generation_agent.run(
user_prompt=schema_design_result.output# 输入是模式设计的伪代码输出
)
schema = extract_json_from_markdown(schema_result.output)# 从带 Markdown 格式的输出中提取 JSON

# 返回包含数据目录和最终模式的 InformationExtractionNode
returnInformationExtractionNode(data_dir=self.data_dir, schema=schema)

这就是模式设计(Schema Design)部分。

现在我们进入信息抽取(Information Extraction)阶段,这部分与上一篇文章中的做法非常相似。但由于我们没有预定义的 Mention 模型,数据模型只能在运行时(runtime)定义。因此,与其在智能体中显式定义输出模型,我们可以通过Type[BaseModel]对其进行抽象,并将output_type作为函数的参数传递。以下是用于提及检测智能体的定义:

fromtypingimportList,Type,Any
frompydanticimportBaseModel
frommetagpt.agentsimportAgent
frommetagpt.graph_transimportRunContext# 假设 RunContext 存在

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model =Any# 占位符
AgentName =Any# 占位符
MENTION_DETECTION_PROMPT =""# 占位符

defcreate_mention_detection_agent(
mention_model:Type[BaseModel],# 提及模型的类型
entity_typesist[str] # 需要检测的实体类型列表
) -> Agent:
"""创建提及检测智能体"""# 添加中文注释
agent = Agent(
name=AgentName.mention_detection_agent.value,
model=instruct_model,
system_prompt=MENTION_DETECTION_PROMPT.format(entity_types=entity_types),# 格式化系统提示词
result_type=List[mention_model],# 预期结果是提及模型的列表
retries=5,# 失败重试次数
)
# 提及检测智能体此处没有额外的输出验证器

returnagent

类似地,我们创建relation_extraction_agent(关系抽取智能体)。我也使用了output_validation来确保抽取出的关系符合本体的约束。这是为确保知识图谱质量设置的“质量控制关卡”。

# agent/relation_extraction_agent.py
importre
fromtypingimportList,Type,Any,Dict# 导入 Dict
frompydanticimportBaseModel
frommetagpt.agentsimportAgent
frommetagpt.constimportModelRetry
frommetagpt.graph_transimportRunContext, Dependency# 假设 RunContext 和 Dependency 存在

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model =Any# 占位符
AgentName =Any# 占位符
RELATION_EXTRACTION_PROMPT =""# 占位符
build_dynamic_relation_model =Any# 占位符 (用于构建动态关系模型)

defcreate_relation_extraction_agent(relation_model:Type[BaseModel], mention_stringsist[str], relation_typesist[str], constraintsict[str,List[Dict[str,str]]]) -> Agent:
"""创建关系抽取智能体"""# 添加中文注释
agent = Agent(
name=AgentName.relation_extraction_agent.value,
model=instruct_model,
system_prompt=RELATION_EXTRACTION_PROMPT.format(
mention_strings=mention_strings,
relation_types=relation_types,
constraints=constraints),# 将约束传递给提示词
result_type=List[relation_model],# 预期结果是关系模型的列表
retries=5,# 失败重试次数
)

# 添加输出验证器以检查关系是否符合本体约束
@agent.output_validator
asyncdefvalidate_relation_constraint(ctx: RunContext[Dependency], list_relationsist[Any]):# 接收 LLM 的原始输出列表
"""验证抽取出的关系是否符合本体约束"""# 添加中文注释
importre

# 这个正则表达式预期提及内容的格式为 <TYPE>text</TYPE>
pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>")# 预编译正则表达式

validated_relations = []# 用于存放验证通过的关系对象
forrelation_dictinlist_relations:# LLM 输出通常是字典列表
# 检查是否为字典,并包含必要的键
ifnotisinstance(relation_dict,dict)or'subject'notinrelation_dictor'predicate'notinrelation_dictor'object'notinrelation_dict:
raiseModelRetry(f"关系输出格式不正确,应为包含 subject, predicate, object 键的字典。无效输出:{relation_dict}")# 改进错误消息

# 解析主语和宾语字符串,获取类型和文本
subject_str = relation_dict.get('subject','')
object_str = relation_dict.get('object','')
predicate = relation_dict.get('predicate')

ifnotsubject_strornotobject_strornotpredicate:
raiseModelRetry(f"关系信息不完整,缺少 subject, object 或 predicate。无效输出:{relation_dict}")# 改进错误消息

subject_match = pattern.match(subject_str)
object_match = pattern.match(object_str)

ifnotsubject_match:
raiseModelRetry(f"主语 '{subject_str}' 不符合 <TYPE>text</TYPE> 格式。")# 改进错误消息
ifnotobject_match:
raiseModelRetry(f"宾语 '{object_str}' 不符合 <TYPE>text</TYPE> 格式。")# 改进错误消息

subject_type = subject_match.group("type")
object_type = object_match.group("type")

triplet_type = {
"subject_type": subject_type,
"object_type": object_type
}

# 从依赖中获取关系约束并进行检查
relationship_constraints = ctx.deps.relationship_constraints# 确保约束已加载到deps中
allowed_constraints = relationship_constraints.get(predicate)

ifallowed_constraintsisNone:
# 如果关系类型未在约束中定义,则视为无效
raiseModelRetry(f"关系类型 '{predicate}' 未在 RelationConstraints 中定义。")# 改进错误消息

iftriplet_typenotinallowed_constraints:
raiseModelRetry(f"三元组: ({subject_type})-[:{predicate}]->({object_type}) 不符合本体约束!")# 改进错误消息

# 验证通过后,尝试将字典解析为 Pydantic 模型实例
try:
validated_relation = relation_model(**relation_dict)# 使用字典解包创建模型实例
validated_relations.append(validated_relation)
exceptExceptionase:
raiseModelRetry(f"三元组 '{relation_dict}' 无法解析为预期的关系模型 '{relation_model.__name__}':{e}")# 改进错误消息


returnvalidated_relations# 返回验证通过的 Pydantic 模型列表

returnagent

信息抽取节点(Information Extraction Node)中的逻辑流程如下。它协调文档蒸馏、提及检测和关系抽取这三个步骤:

importasyncio
importos
importglob
importjson
fromtypingimportList,Type,Any,Dict
fromdataclassesimportdataclass
frommetagpt.nodesimportBaseNode
frommetagpt.graph_transimportGraphRunContext, Dependency, End
frommetagpt.utils.task_groupimporttask_group_gather
fromtqdmimporttqdm
frompydanticimportBaseModel
fromtypingimportget_args, get_origin,Literal# 导入get_origin, Literal

# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_doc_distiller_agent =Any# 占位符
create_mention_detection_agent =Any# 占位符
create_relation_extraction_agent =Any# 占位符
create_model_from_schema =Any# 占位符
build_dynamic_relation_model =Any# 占位符
AgentName =Any# 占位符
instruct_model =Any# 占位符
logger =Any# 占位符

@dataclass
classInformationExtractionNode(BaseNode[None, Dependency,None]):
"""信息抽取节点:协调文档解析、提及检测和关系抽取"""# 添加中文注释

def__init__(self, data_dir:str, schemaict, **kwargs):
super().__init__(**kwargs)

self.data_dir = data_dir
self.schema = schema

# 从 Schema 中创建 Pydantic 模型和类型定义
# 假定 create_model_from_schema 可以处理枚举定义并返回类型
self.relation_types:Type= create_model_from_schema(schema,"RelationType")
# 假定 create_model_from_schema 返回的是 Literal 或 Enum 类型
self.entity_types:Type= create_model_from_schema(schema,"EntityType")

# 创建文档主模型和单元模型
self.doc_model:Type[BaseModel] = create_model_from_schema(schema,"Doc")
# 创建提及模型
self.mention_model:Type[BaseModel] = create_model_from_schema(schema,"Mention")# 假定返回 Pydantic Mention 模型

# 初始化智能体实例
# 假定 create_doc_distiller_agent 接收输出模型类型
self.doc_distiller_agent = create_doc_distiller_agent(output_model=self.doc_model, schema="")# Schema 参数看起来未使用

# 假定 entity_types 可以转换为字符串列表
self.mention_detection_agent = create_mention_detection_agent(
self.mention_model,
entity_types=list(get_args(self.entity_types))ifget_origin(self.entity_types)isLiteralelse[]# 从 Literal 类型中提取枚举值
)

asyncdefrun_task(self, ctx: GraphRunContext[None, Dependency], data:str, output_path:str):
"""处理单个文档的信息抽取任务"""# 添加中文注释

# 第 1 步:蒸馏文档结构
doc_results =awaittask_group_gather(
[
lambda:self.doc_distiller_agent.run(
user_prompt=f"原始文档:{data}\n"# 翻译提示词
)
],
timeout_seconds=100,# 设定超时时间
)
doc_result = doc_results[0]
doc = doc_result.output# doc 是 self.doc_model 的实例
doc_units = doc.units# doc_units 是 DocUnit 实例的列表

# 第 2 步:为每个文档单元检测提及
all_entity_types_list =list(get_args(self.entity_types))ifget_origin(self.entity_types)isLiteralelse[]
mentions_results =awaittask_group_gather(
[
# 为每个文档单元创建一个提及检测任务
lambdai=i:self.mention_detection_agent.run(
user_prompt=f"""段落:{doc_units[i].text}\n
实体类型:{all_entity_types_list}"""# 翻译提示词,传递实体类型列表
)
foriinrange(len(doc_units))
],
timeout_seconds=100# 设定超时时间
)

# 将提取出的提及分配回对应的文档单元
foriinrange(len(doc_units)):
doc_units[i].mentions = mentions_results[i].output# mentions_results[i].output 是 List[Mention]

# 第 3 步:为包含提及的文档单元抽取关系
relation_extraction_agents = []
relation_indices = []# 记录原始文档单元的索引
all_relation_types_list =list(get_args(self.relation_types))ifget_origin(self.relation_types)isLiteralelse[]

foridx, mention_list_resultinenumerate(mentions_results):
# 仅处理找到提及的单元
ifmention_list_result.outputandlen(mention_list_result.output) >0:
# 格式化提及内容,用于关系抽取提示词/模型构建
# 根据前面 Pydantic 模型定义,使用 .type 和 .string
mention_strings = [
f"<{item.type}>{item.string}</{item.type}>"
foriteminmention_list_result.output# mention_list_result.output 是 List[Mention]
]

# 根据找到的提及和模式中定义的关系类型动态构建关系模型
# 假定 build_dynamic_relation_model 接收提及列表和关系类型列表
relation_model = build_dynamic_relation_model(mention_strings=mention_strings,
relation_types=all_relation_types_list)

# 为这个单元创建关系抽取智能体
relation_extraction_agents.append(
create_relation_extraction_agent(
relation_model,
constraints=ctx.deps.relationship_constraints,# 从上下文依赖中传递约束
mention_strings=mention_strings,
relation_types=all_relation_types_list# 传递所有定义的关系类型
)
)

relation_indices.append(idx)# 存储原始单元索引

# 并行运行关系抽取智能体
ifrelation_extraction_agents:# 仅当存在需要抽取关系的单元时运行
relations_results =awaittask_group_gather(
[
# 为每个关系抽取智能体创建一个任务
lambdai=i: relation_extraction_agents[i].run(
user_prompt=doc_units[relation_indices[i]].text,# 使用原始文档单元的文本作为输入
deps=ctx.deps# 传递依赖,用于验证器
)
foriinrange(len(relation_extraction_agents))
],
timeout_seconds=100# 设定超时时间
)

# 将提取出的关系分配回对应的文档单元
forrel_idx, doc_idxinenumerate(relation_indices):
try:
# relations_results[rel_idx].output 是 List[Relation] (动态模型实例)
# 假定 DocUnit 有一个 'relationships' 属性可以接受这个列表
doc_units[doc_idx].relationships = relations_results[rel_idx].output
exceptExceptionase:
# 处理分配过程中可能出现的错误
logger.error(f"为文档单元{doc_idx}分配关系失败:{e}")
doc_units[doc_idx].relationships =None# 失败时设置为 None


# 使用更新后的单元列表更新原始文档对象
doc.units = doc_units

# 可选:保存处理后的文档 (原代码中注释掉了)
# with open(output_path, "w", encoding="utf-8") as f: # 添加编码
# json.dump(doc.dict(), f, indent=2, ensure_ascii=False) # 添加 ensure_ascii=False

asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> End:
"""批量处理文件进行信息抽取"""# 添加中文注释

# 确保 Dependency 对象包含 relationship_constraints
ifnothasattr(ctx.deps,'relationship_constraints'):
ctx.deps.relationship_constraints = {}# 初始化如果不存在

# 从 Schema 中加载关系约束到依赖对象中
if"RelationConstraints"inself.schema:
ctx.deps.relationship_constraints =self.schema["RelationConstraints"]
else:
logger.warning("Schema 中不包含 'RelationConstraints'。关系验证可能无法正常工作。")# 增加警告


# 查找所有输入文本文件
# 根据实际设置调整路径。原代码使用 /data/cord-19/articles/
files = glob.glob(os.path.join(self.data_dir,"*.txt"))# 使用 self.data_dir 提高灵活性

# 准备 run_task 的参数列表
args = []
forfileintqdm(files, desc="准备任务参数"):# 添加进度条描述
withopen(file,"r", encoding="utf-8")asf:# 指定编码
basename = os.path.basename(file)
sample_data = f.read()
output_path =f"data/processed/{basename}.json"# 定义输出路径

args.append([sample_data, output_path])

# 分批处理文件
bs=10# 批次大小
foriintqdm(range(0,len(args),bs), desc="处理文件批次"):# 添加批次进度条描述
batch_args = args[i:i+bs]
# task_group_gather 接收一个可等待对象(callable)列表
awaittask_group_gather(
[
# 使用 lambda 捕获当前批次的文件数据和输出路径,创建异步任务
(lambdadata=data, output_path=output_path:self.run_task(
ctx=ctx,# 传递当前上下文
data=data,
output_path=output_path))
fordata, output_pathinbatch_args
],
timeout_seconds=120,# 设置批次任务的超时时间
)

# 完成所有批次处理后,返回 End 节点
returnEnd(None)

这就是信息抽取(Information Extraction)的全部过程。

3. 集成 Neo4j

将提取出的三元组集成到 Neo4j 时,我们需要注意的另一点是创建节点和节点间关系有两种方式:CREATE(创建)或MERGE(合并)节点。CREATE模式会创建两个独立的节点,即使它们指向的是同一个概念;而MERGE模式则会在表面形式相同时仅保留一个节点,但这要求你确保它们确实指向的是同一个实体。

在实践中,在医学等某些领域存在许多歧义,一个术语可能指代不同的概念。我们需要一个链接(Linking)步骤,在一个词汇表/参考集中识别某个提及的唯一标识符。我们将在后续的文章中更深入地探讨这个话题。目前,我将使用MERGE方法来在提及之间构建桥接关系。

以下是一个将三元组导入 Neo4j 的示例代码片段。请注意,您需要根据实际的文件结构和三元组格式加载数据。

fromneo4jimportGraphDatabase, basic_auth
fromtqdmimporttqdm
importos
importglob
importjson
importre# 用于解析三元组字符串

# 假设 URI, AUTH, pattern 已定义。您需要根据实际情况替换或定义它们。
# 三元组的结构假定为一个列表,列表元素是字典,例如 {'subject': '<TYPE>text</TYPE>', 'predicate': 'RELATION_TYPE', 'object': '<TYPE>text</TYPE>'}
# pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>") # 用于解析主语和宾语字符串

URI ="bolt://localhost:7687"# Neo4j 数据库连接 URI 占位符
AUTH = basic_auth("neo4j","password")# Neo4j 认证信息占位符 - 请替换为您的实际凭据

# 示例:从之前生成的 JSON 文件中加载三元组
triplets = []# 存放加载的三元组
data_dir ="data/processed"# 之前信息抽取生成的 JSON 文件目录
json_files = glob.glob(os.path.join(data_dir,"*.json"))

# 用于解析三元组字符串的正则表达式
pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>")

forjson_fileintqdm(json_files, desc="加载三元组"):# 为文件加载添加进度条
withopen(json_file,'r', encoding='utf-8')asf:# 指定编码
try:
doc_data = json.load(f)
# 假设 doc_data 结构包含 units -> relationships
if'units'indoc_data:
forunitindoc_data['units']:
if'relationships'inunitandunit['relationships']:
# 遍历抽取出的关系列表
forrelinunit['relationships']:
# rel 应该是 Pydantic 模型实例的字典表示,其中 subject 和 object 已经是 <TYPE>text</TYPE> 格式的字符串
ifisinstance(rel,dict)and'subject'inreland'predicate'inreland'object'inrel:
triplets.append(rel)# 直接添加符合格式的字典
exceptExceptionase:
# print(f"加载文件 {json_file} 中的三元组失败: {e}") # 打印加载失败信息 (可选)
pass# 忽略加载单个文件失败的情况 (原代码行为)


# 连接 Neo4j 数据库并导入数据
withGraphDatabase.driver(URI, auth=AUTH)asdriver:
driver.verify_connectivity()
print("连接 Neo4j 数据库成功")# 翻译连接成功消息

fortripletintqdm(triplets, desc="导入三元组到 Neo4j"):# 为导入过程添加进度条
try:
# 解析主语和宾语字符串以获取类型和文本
subject_match = pattern.match(triplet.get('subject',''))
object_match = pattern.match(triplet.get('object',''))

ifnotsubject_matchornotobject_match:
# print(f"跳过无效三元组格式: {triplet}") # 打印跳过信息 (可选)
continue# 跳过格式不符的三元组

subject_type = subject_match.group("type")
subject_text = subject_match.group("text")
object_type = object_match.group("type")
object_text = object_match.group("text")
predicate = triplet.get('predicate')

ifnotsubject_typeornotsubject_textornotobject_typeornotobject_textornotpredicate:
# print(f"跳过不完整三元组: {triplet}") # 打印跳过信息 (可选)
continue# 跳过信息不完整的三元组


# 执行 MERGE 查询,创建或合并节点和关系
result = driver.execute_query(f"""
MERGE (a:{subject_type}{{name: $head}})
MERGE (b:{object_type}{{name: $tail}})
MERGE (a)-[:{predicate}]->(b)
""",
head=subject_text,
tail=object_text,
database="neo4j",# 如果需要,指定数据库名称
).summary
# print(f"导入三元组 ({subject_text})-[:{predicate}]->({object_text}) 成功") # 打印导入成功信息 (可选)

exceptExceptionase:
# print(f"导入三元组 {triplet} 失败: {e}") # 打印导入失败信息 (可选)
pass# 忽略导入单个三元组失败的情况 (原代码行为)

结果展示

我原本期望能提取到更多药物副作用的信息,但目前看来,语料库中关于副作用的提及较少。未来可以通过扩展数据集或优化模型来改善这一情况。

总结

在本文中,我展示了如何将“人工参与(Human-in-the-loop)”设计集成到用于知识图谱构建的多智能体系统中,并以 CORD-19 数据集为例进行了案例研究。通过结合领域图文档结构图,并借助封闭关系(Closed Relationship)强制执行本体约束,我们可以构建出更准确、结构化的知识图谱表示。


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ