ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">本文将介绍如何通过引入AI代理(Agent)来简化并模块化这一流程。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">使用代理的一个关键优势在于它能够生成结构化输出,这极大地简化并优化了层级文档的解析和处理过程。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">我们先回顾一下本体(Ontology)模式: ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;margin: 0.1em auto 0.5em;border-radius: 8px;" title="null"/>
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">文章中定义了四个模型:单元(Unit)、章节(Section)、子章节(Sub Section)以及实体(Entity)。但出于简化考虑,本文仅讨论其中的单元、章节和实体三种模型。
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">这是此前的方案: ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;color: rgb(63, 63, 63);"> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;margin: 0.1em auto 0.5em;border-radius: 8px;" title="null"/> ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 12px;color: rgb(63, 63, 63);border-radius: 6px;background: color-mix(in srgb, rgb(1, 155, 252) 8%, transparent);">原始数据我的数据是一系列.txt文件,每个文件都对应一本书中的一个单元,这些单元是通过光学字符识别(OCR)过程提取出来的。例如:
Political, Economic, and Cultural Situation under the Nguyễn Dynasty (First Half of the 19th Century) Introduction In 1802, the Nguyễn Dynasty was established. During its half-century of rule over a country that had just emerged from major upheavals, the Nguyễn court focused on consolidating power, restoring the economy, and reorganizing cultural affairs. However, entrenched in conservative feudal ideology, the Nguyễn regime failed to lay the foundation for a new phase of development. Government Organization and Foreign Policy After defeating the Tây Sơn Dynasty, Nguyễn Ánh proclaimed himself emperor in 1802, taking the reign title Gia Long and founding the Nguyễn Dynasty, with the capital in Phú Xuân (present-day Huế). In 1804, the country was officially renamed "Việt Nam," though it was later changed to "Đại Nam." The central government followed the Lê Dynasty model but increased the emperor's authority. To manage the newly unified country, Gia Long initially divided the territory into three regions: Bắc Thành (Northern provinces), Gia Định Thành (Southern provinces), and Trực Doanh (Central provinces under direct royal administration). Each region had a Governor-General. In 1831–1832, Emperor Minh Mạng abolished these divisions and reorganized the country into 30 provinces and 1 special prefecture (Thừa Thiên), with officials such as Tổng đốc (Governor-General) and Tuần phủ (Provincial Governor) appointed by the royal court. Subdivisions like districts, counties, and villages remained unchanged. Initially, officials were chosen from Nguyễn Ánh's former followers. Later, Confucian education and civil examinations became the main source of recruitment. Salaries were set, but officials were not granted land, and corruption gradually emerged among many bureaucrats. A new legal code called Hoàng Việt Luật Lệ (also known as the Gia Long Code) was issued, containing nearly 400 articles that strictly protected the monarchy and feudal order. ... Economic Situation and Nguyễn Policies In the early 19th century, the country experienced temporary peace and unity. While the economy had favorable conditions, it also faced many difficulties. Agriculture remained backward and unchanged. Much land was left fallow. In 1804, the state reinstated the "quân điền" (equal-field system), but public land only accounted for 20% of the total. Moreover, land distribution prioritized nobles, officials, and soldiers. The court encouraged land reclamation through different means: allowing free settlement or providing funds and equipment (tools, buffalo) for farming. Despite efforts, land expansion was modest. The state invested annually in irrigation, dredging canals, and reinforcing dykes, but these efforts were insufficient to prevent flooding.环境搭建 PydanticAI
我将使用 PydanticAI 框架来构建一个多代理系统。如果您不熟悉该框架,建议查阅其官方文档。
大型语言模型(LLMs)
本项目中,我将在 Google Colab 上部署 Ollama,并通过 Ngrok 在本地进行连接。感兴趣的读者可参考以下链接获取详细操作指南:https://medium.com/@jupyter267/learn-to-deploy-llm-apps-run-ollama-on-google-colab-and-connect-locally-with-ngrok-4b84a1106270。具体来说,我将使用Qwen3模型。
项目结构:
src/ │ ├── agents/ # 负责处理流程中特定任务的AI代理 │ ├── __init__.py │ ├── _agent_registry.py # 用于管理代理实例的注册器 │ ├── _base.py # 所有代理的基础LLM模型 │ ├── doc_parser_agent.py # 用于文档解析的代理 │ ├── dynamic_relation_extractor_agent.py # 用于动态提取关系的代理 │ ├── section_distiller_agent.py # 用于提炼文档章节的代理 │ └── single_distiller_agent.py # 单代理提炼系统 │ ├── config/ # Ollama配置设置 │ ├── __init__.py │ ├── models/ # 数据模型、枚举和模式定义 │ ├── __init__.py │ ├──enum.py # 项目中使用的自定义枚举 │ ├──schema.py # Pydantic或自定义模式定义 │ └── state.py # 状态管理模型 │ ├── nodes/ # 代理或管道使用的模块化处理节点 │ ├── __init__.py │ ├── doc_parser.py # 文档解析的节点逻辑 │ ├── section_distiller.py # 章节提炼的节点逻辑 │ └── single_distiller.py # 单代理提炼的节点逻辑 │ ├── prompts/ # LLM代理使用的提示模板 │ ├── __init__.py │ ├── doc_parser_prompt.py # 文档解析代理的提示 │ ├── dynamic_relation_extractor_prompt.py# 关系提取器的提示 │ ├── section_distiller_prompt.py# 章节提炼器的提示 │ └── single_distiller_agent.py # 单代理(初步方案)的提示 │ ├── __init__.py ├── _utils.py # 实用函数 └── distill.py # 运行提炼流程的主脚本或入口点一步到位(One-shot)方案 首先,我将尝试使用一种简单的一步(One-shot)解决方案来处理这个问题。如前所述,代理框架能够生成结构化输出,这使其非常适合此任务。我们的主要任务是设计一个与本体模式对齐的输出结构。
在models/schema.py文件中,我将定义模式中的模型:
classMention(BaseModel): type iteral["LOCATION","TIME","PERSON","EVENT","ORGANIZATION"] string:str= Field(..., description="命名实体(如地点、时间、人物或事件)在原文中出现的精确字符串。") classRelation(BaseModel): head:str= Field(..., description="主实体(提及)。") tail:str= Field(..., description="尾实体(提及)。") relation_description:str= Field(..., description="对主实体和尾实体之间关系的简要描述。") classSection(BaseModel): title:str= Field(..., description="章节标题") summary:str= Field(..., description="一段简要的总结(150-300字),突出本节涵盖的要点。") mentions ist[Mention] = Field(..., description="本节中发现的实体的完整列表。") relations ist[Relation] = Field(..., description="本节中提及实体之间关系的完整列表。") @dataclass classDistilledUnit: title:str= Field(..., description="单元标题") summary:str= Field(..., description="单元内容的简洁摘要,字数在150到300字之间。") sections ist[Section] = Field(..., description="单元内容中包含的“章节”列表。")注意 :
• 我们应该为每个属性编写描述,因为这些描述将被输入到LLMs中,从而使指令更清晰。 • 此外,这种一步到位的方案中包含大量嵌套对象,这会增加LLMs正确生成的难度。 接着在agent/single_distiller_agent.py文件中:
fromsrc.modelsimportAgentName, DistilledUnit fromsrc.promptsimportSINGLE_DISTILLER_AGENT frompydantic_aiimportAgent from._baseimportollama_model agent = Agent( name=AgentName.single_distiller_agent.value, model=ollama_model, system_prompt=SINGLE_DISTILLER_AGENT, result_type=DistilledUnit, retries=3# 若输出无法被解析为 DistilledUnit 类型,则最多尝试重试三次 )运行代理:
result=awaitagent.run(raw_document) 为什么不选择这种方法:
• 一步到位的方案往往会遗漏许多实体提及(mentions)及其对应的关系。由于所有信息都在单次处理中提取,隐晦或次要的实体可能会被错过,尤其是在复杂或信息密集的文档中。 • 在我的案例中,更重要的是每个Relation对象中的head和tail字段必须与Mention列表中找到的字符串完全匹配。这一严格要求至关重要,因为在知识图谱构建阶段,我需要将每条关系映射回其对应的提及,以准确识别所涉实体。在许多现有方法中,通常是先提取关系,然后再从head和tail中派生出提及。然而,仅仅依赖关系提取可能会导致遗漏文本中出现的许多实体提及。这在历史 或医学 等领域尤其成问题,因为全面提取提及可以显著提高信息检索等下游任务的性能。 多代理(Multi-agent)方案 鉴于一步到位方案存在局限性,本文提出了一种多阶段处理流程,其中每个阶段都由一个专门的代理负责。在这种设计中,每个代理负责流程中的一个特定子任务(例如,提及提取、章节提炼、关系提取),这使得整个过程更具模块化、可控性和可解释性。
系统包含以下三个核心代理:
•文档解析代理(Doc Parser Agent): 接收原始文档,纠正拼写错误,并将其拆分为语义块(章节)。 •章节提炼代理(Section Distiller Agent): 处理每个章节,提取摘要和实体提及等结构化信息。 •关系提取代理(Relation Extractor Agent): 给定文本和已提取的提及,该代理识别提及之间的关系。它执行动态关系提取,以确保关系是专门在提供的提及之间建立的。 实用函数 # ._utils.py importanyio fromsrc.modelsimportMyUsage T = TypeVar("T") asyncdeftask_group_gather(tasks:Sequence[Callable[[], Awaitable[T]]]): """异步运行任务""" results:list[T] = [None] *len(tasks) print('len result',len(results)) asyncdef_run_task(tsk:Callable[[], Awaitable[T]], index:int): """运行任务并将结果存储到正确索引的辅助函数。""" results[index] =awaittsk() asyncwithanyio.create_task_group()astg: fori, taskinenumerate(tasks): tg.start_soon(_run_task, task, i) returnresults defupdate_usage(current_usage: MyUsage, new_usage: Usage): current_usage.requests += new_usage.requests current_usage.request_tokens += new_usage.request_tokens current_usage.response_tokens += new_usage.response_tokens current_usage.total_tokens += new_usage.total_tokens returncurrent_usage步骤 1:定义数据模型 # models/enum.py fromenumimportEnum # 创建一个枚举对象来管理代理,这样我们就不必硬编码 classAgentName(Enum): single_distiller_agent:str="single_distiller_agent" doc_parser_agent:str="doc_parser_agent" section_distiller_agent:str="section_distiller_agent" # models/schema.py # 最终目标数据对象是 `DistilledUnit` frompydanticimportBaseModel, Field, create_model fromdataclassesimportdataclass fromtypingimportLiteral,List,Optional,Any,Type classMention(BaseModel): type iteral["LOCATION","TIME","PERSON","EVENT","ORGANIZATION"] string:str= Field(..., description="命名实体(如地点、时间、人物或事件)在原文中出现的精确字符串。") classRelation(BaseModel): head:str= Field(..., description="主实体(提及)。") tail:str= Field(..., description="尾实体(提及)。") relation_description:str= Field(..., description="对主实体和尾实体之间关系的简要描述。") classSection(BaseModel): title:str= Field(..., description="章节标题") summary:str= Field(..., description="一段简要的总结(150-300字),突出本节涵盖的要点。") mentions ist[Mention] = Field(..., description="本节中发现的实体的完整列表。") relations:Optional[List[Any]] =None classSectionContent(BaseModel): title:str= Field(..., description="章节标题") content:str= Field(..., description="章节内容") classUnit(BaseModel): title:str= Field(..., description="单元标题") summary:str= Field(..., description="单元内容的简洁摘要,字数在150到300字之间。") sections ist[SectionContent] = Field(..., description="单元内容中包含的“章节内容”列表(按顺序)。") @dataclass classDistilledUnit: title:str= Field(..., description="单元标题") summary:str= Field(..., description="单元内容的简洁摘要,字数在150到300字之间。") sections ist[Section] = Field(..., description="单元内容中包含的“章节”列表。") # 该方法适用于后续场景,通过创建一个动态数据模型,其中主实体(head)和尾实体(tail)均取自提及列表的实际值,从而确保代理能够生成一致的提及字符串。 defbuild_dynamic_relation_model(mention_strings ist[str]) -> BaseModel: mention_literals =Literal[tuple(mention_strings)] DynamicRelation = create_model( "DynamicRelation", head=(mention_literals, Field(..., description="提及的实体(主实体)。")), tail=(mention_literals, Field(..., description="提及的实体(尾实体)。")), relation_description=(str, Field(..., description="对主实体和尾实体之间关系的简要描述。")), ) returnDynamicRelation # models/state.py fromdataclassesimportdataclass @dataclass classMyUsage: requests:int|None=0 request_tokens:int|None=0 response_tokens:int|None=0 total_tokens:int|None=0 # 实际上,我第一次使用 PydanticAI 时,也曾疑惑何时使用 Dependencies/State。 # 我认为,对于需要在代理之间保存/共享的数据,您可能需要使用 State。 # 因为在运行流程结束后,您仍然可以访问 State 的值。 # 其他例如连接/数据库等,您可能需要使用 Dependencies。步骤 2:定义代理 为了管理所有代理,我按照单例工厂模式创建了一个AgentRegistry(代理注册器):
# agents/_agent_registrt.py frompydantic_aiimportAgent fromtypingimportCallable fromsrc.modelsimportAgentName classAgentRegistry: _agents:dict[AgentName, Agent] = {} @classmethod defregister(cls, agent_name: AgentName): """一个用于自动注册代理的装饰器""" defdecorator(func:Callable) ->Callable: cls._agents[agent_name] = func() print("已注册代理", agent_name) returnfunc returndecorator @classmethod defget(cls, agent_name: AgentName) -> Agent: returncls._agents[agent_name]# agents/doc_parser_agent.py from._agent_registryimportAgentRegistry fromsrc.modelsimportAgentName, Unit fromsrc.promptsimportDOC_PARSER_PROMPT frompydantic_aiimportAgent from._baseimportollama_model @AgentRegistry.register(AgentName.doc_parser_agent) defcreate_doc_parser_agent() -> Agent: agent = Agent( name=AgentName.doc_parser_agent.value, model=ollama_model, system_prompt=DOC_PARSER_PROMPT, result_type=Unit, retries=3 ) returnagent# agents/section_distiller_agent.py from._agent_registryimportAgentRegistry fromsrc.modelsimportAgentName, Section fromsrc.promptsimportSECTION_DISTILLER_AGENT frompydantic_aiimportAgent from._baseimportollama_model @AgentRegistry.register(AgentName.section_distiller_agent) defcreate_section_distiller_agent() -> Agent: agent = Agent( name=AgentName.section_distiller_agent.value, model=ollama_model, system_prompt=SECTION_DISTILLER_AGENT, result_type=Section, retries=3 ) returnagent# agents/dynamic_relation_extractor_agent frompydanticimportBaseModel fromsrc.promptsimportDYNAMIC_RELATION_EXTRACTOR_PROMPT frompydantic_aiimportAgent fromtypingimportList from._baseimportollama_model defcreate_dynamic_relation_extractor_agent( relation_model: BaseModel, mention_strings ist[str] )->Agent: relation_extractor_agent = Agent( name="relation_extractor_agent", model=ollama_model, retries=5, output_type=List[relation_model], system_prompt=DYNAMIC_RELATION_EXTRACTOR_PROMPT.format(mentions=mention_strings) ) returnrelation_extractor_agent步骤 3:在图(Graph)中创建节点和工作流 文档解析节点(Doc Parser Node):
# nodes/doc_parser.py fromdataclassesimportdataclass frompydantic_graphimportBaseNode, GraphRunContext, End fromsrc.modelsimportMyUsage, AgentName, Unit, DistilledUnit fromsrc.agentsimportAgentRegistry fromsrc.nodes.section_distillerimportSectionDistillerNode fromsrc._utilsimportupdate_usage fromtypingimportAny fromsrc._utilsimporttask_group_gather @dataclass classDocParserNode(BaseNode[MyUsage,None,Any]):# Generic[StateT, DepsT, NodeRunEndT]) raw_document:str asyncdefrun( self, ctx: GraphRunContext[MyUsage,None], ) -> End: result =awaitAgentRegistry.get( AgentName.doc_parser_agent ).run(self.raw_document, model_settings={"temperature":0}) # 每次调用代理时更新总使用量 usage = result.usage() ctx.state = update_usage(ctx.state, usage) unit: Unit = result.output sections = unit.sections # 异步处理多个章节 results =awaittask_group_gather( [ lambdasection=section: SectionDistillerNode(section=section).run(ctx) forsectioninsections ] ) # 最终结果 distilled_unit = DistilledUnit( title=unit.title, summary=unit.summary, sections=[item.dataforiteminresults], ) returnEnd(data=distilled_unit)章节提炼节点(Section Distiller Node):
# nodes/section_distiller.py fromdataclassesimportdataclass frompydantic_graphimportBaseNode, GraphRunContext, End fromsrc.modelsimportMyUsage, AgentName, Section, SectionContent, Mention, build_dynamic_relation_model fromsrc.agentsimportAgentRegistry, create_dynamic_relation_extractor_agent fromsrc._utilsimportupdate_usage fromtypingimportAny @dataclass classSectionDistillerNode(BaseNode[MyUsage,None,Any]): section: SectionContent asyncdefrun(self, ctx: GraphRunContext[MyUsage,None]) -> End: # 提取信息:摘要、提及、标题 distilled_result =awaitAgentRegistry.get( AgentName.section_distiller_agent ).run(user_prompt=f"section_title:{self.section.title}\n section_content:{self.section.content}", model_settings={"temperature":0}) usage = distilled_result.usage() ctx.state = update_usage(ctx.state, usage) distilled_section: Section = distilled_result.output mentions = distilled_section.mentions mention_strings = [item.stringforiteminmentions] # 调用关系提取代理提取关系 # 构建关系数据模型 relation_model = build_dynamic_relation_model(mention_strings=mention_strings) # 构建关系提取代理 relation_extractor_agent = create_dynamic_relation_extractor_agent( relation_model=relation_model, mention_strings=mention_strings, ) relation_result =awaitrelation_extractor_agent.run( user_prompt=f"section_title:{self.section.title}\n section_content:{self.section.content}", model_settings={"temperature":0} ) usage = relation_result.usage() ctx.state = update_usage(ctx.state, usage) distilled_section.relations = relation_result.output returnEnd(distilled_section)至此,我们便可以创建 Graph 对象来运行工作流了。
importasyncio importjson frompydantic_graphimportGraph fromsrc.nodesimportSectionDistillerNode, DocParserNode fromsrc.modelsimportMyUsage graph = Graph(nodes=[DocParserNode, SectionDistillerNode]) state=MyUsage() asyncdefmain(): result =awaitgraph.run(DocParserNode(sample), state=state) print(result)召回率(Recall) print("=========") print(result.state) asyncio.run(main())实验 为了评估和比较不同框架或方法的性能,我们需要标注过的测试用例。对于这类信息提取任务,召回率(Recall) 是衡量系统检测或提取信息准确性的有效指标。然而,在本文中,我将只分析上述两种方法在一个单一示例上的输出。
•一步到位(One-shot)方案: 这种方法使用并生成了更少的令牌(token),因此执行速度更快。然而,它在每个章节中仅检测到有限数量的提及和关系,通常在5到7个之间。有趣的是,当模型未能生成预定义的结构化输出时,它常常会出现幻觉(hallucination) ,产生与输入无关的内容。 state=MyUsage(requests=1,request_tokens=2803,response_tokens=3276,total_tokens=6079)) •多阶段 / 多代理方案: 这种方法消耗了更多的令牌,运行时间也更长,但它生成了更详细、更高质量的输出。每个章节检测到的提及和关系数量显著更多,在我的测试样本上通常在13到17个之间。 state=MyUsage(requests=11,request_tokens=17836,response_tokens=24076,total_tokens=41912)) 结论 在本文中,我介绍了如何通过用模块化、基于代理的设计取代繁重的LLM提示工程,来简化文档到图谱的构建流程。通过使用能够生成结构化输出的代理,我们使层级文档的解析和处理过程更加可靠和易于维护。这一转变,为构建更清晰、更具可扩展性的文档中心知识图谱奠定了基础。
至此,我们完成了对文档知识图谱构建流程的改进介绍。感谢您的阅读!