今天分享一篇PaperAgent-RAG专栏技术交流群小伙伴(@知乎 奔跑的日月)关于微软最新开源的GraphRAG框架源码解读文章(已授权转载)。
https://zhuanlan.zhihu.com/p/707759736
该数据集的主题是什么这种high level的总结性问题, 作者认为, 这种应用场景本质上一种聚焦于查询的总结性(QueryFocused Summarization, QFS)任务, 单纯只做数据检索是无法解决的. 相应的, 其解决思路也在论文中清楚地描述出来了:ingFang SC", "Microsoft YaHei", "Source Han Sans SC", "Noto Sans CJK SC", "WenQuanYi Micro Hei", sans-serif;font-size: medium;letter-spacing: normal;text-align: start;text-wrap: wrap;background-color: rgb(255, 255, 255);">In contrast with related work that exploits the structured retrieval and traversal affordances of graph indexes (subsection 4.2), we focus on a previously unexplored quality of graphs in this context: their inherent modularity (Newman, 2006) and the ability of community detection algorithms to partition graphs into modular communities of closely-related nodes (e.g., Louvain, Blondel et al., 2008; Leiden, Traag et al., 2019). LLM-generated summaries of these community descriptions provide complete coverage of the underlying graph index and the input documents it represents. Query-focused summarization of an entire corpus is then made possible using a map-reduce approach: first using each community summary to answer the query independently and in parallel, then summarizing all relevant partial answers into a final global answer.ingFang SC", "Microsoft YaHei", "Source Han Sans SC", "Noto Sans CJK SC", "WenQuanYi Micro Hei", sans-serif;font-size: medium;letter-spacing: normal;text-align: start;text-wrap: wrap;background-color: rgb(255, 255, 255);">利用社区检测算法(如Leiden算法)将整个知识图谱划分模块化的社区(包含相关性较高的节点), 然后大模型自下而上对社区进行摘要, 最终再采取map-reduce方式实现QFS: 每个社区先并行执行Query, 最终汇总成全局性的完整答案.ingFang SC", "Microsoft YaHei", "Source Han Sans SC", "Noto Sans CJK SC", "WenQuanYi Micro Hei", sans-serif;font-optical-sizing: inherit;font-kerning: inherit;font-feature-settings: inherit;font-variation-settings: inherit;margin-top: calc(1.90909em);margin-bottom: calc(1.27273em);clear: left;color: rgb(25, 27, 31);letter-spacing: normal;text-align: start;text-wrap: wrap;background-color: rgb(255, 255, 255);">实现方式是什么(How)?论文中给出了解决问题的基本思路, 与其他RAG系统类似, GraphRAG整个Pipeline也可划分为索引(Indexing)与查询(Query)两个阶段。索引过程利用LLM提取出节点(如实体)、边(如关系)和协变量(如 claim),然后利用社区检测技术对整个知识图谱进行划分,再利用LLM进一步总结。最终针对特定的查询,可以汇总所有与之相关的社区摘要生成一个全局性的答案.
官方文档说实话写得已经很清楚了, 不过想要理解一些实现上的细节, 还得深入到源码当中. 接下来, 一块看下代码的具体实现. 项目源码结构树如下:
.├──cache├──config├──emit├──graph│├──embedding│├──extractors││├──claims││├──community_reports││├──graph││└──summarize│├──utils│└──visualization├──input├──llm├──progress├──reporting├──storage├──text_splitting├──utils├──verbs│├──covariates││└──extract_covariates││└──strategies││└──graph_intelligence│├──entities││├──extraction│││└──strategies│││└──graph_intelligence││└──summarize││└──strategies││└──graph_intelligence│├──graph││├──clustering│││└──strategies││├──embed│││└──strategies││├──layout│││└──methods││├──merge││└──report││└──strategies││└──graph_intelligence│├──overrides│└──text│├──chunk││└──strategies│├──embed││└──strategies│├──replace│└──translate│└──strategies└──workflows└──v1
这个文件中的很多文档都值得仔细研究, 后续将结合代码详细说明.
Workflows
此外, console 中会打印很多运行日志, 其中比较重要的一条就是完整的workflows, 会涉及到完整pipeline的编排:
⠹GraphRAGIndexer├──LoadingInput(InputFileType.text)-1filesloaded(0filtered)━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━100%0:00:000:00:00├──create_base_text_units├──create_base_extracted_entities├──create_summarized_entities├──create_base_entity_graph├──create_final_entities├──create_final_nodes├──create_final_communities├──join_text_units_to_entity_ids├──create_final_relationships├──join_text_units_to_relationship_ids├──create_final_community_reports├──create_final_text_units├──create_base_documents└──create_final_documentsAllworkflowscompletedsuccessfully.
索引阶段整体看下来应该算是整个项目的核心, 整体流程还是比较复杂的. 执行indexing的语句如下:
python -m graphrag.index --init --root ./ragtest
python -m graphrag.index --root ./ragtest简单跟一下, 发现实际调用的是graphrag/index/__main__.py文件中的主函数, 使用 argparse 解析输入参数, 实际调用的是graphrag/index/cli.py中的index_cli函数.
继续解读源码前, 先简单看下相关函数的调用链路, 如上图所示, 其中灰色标记的函数是我们需要重点关注的.
cli.py::index_cli()函数首先根据用户输入参数, 如--init, 确定是否需要初始化当前文件夹, 这部分具体由cli.py::index_cli()执行, 其具体实现逻辑比较简单, 检查目录下的一些配置、prompt、.env等文件是否存在, 没有则创建, 具体文件内容, 就是上一节目录中展示的 settings.yaml, prompts等;
真正执行 indexing 操作时, 实际上会执行一个内部函数cli.py::index_cli()._run_workflow_async(), 主要会涉及到cli.py::_create_default_config()与run.py::run_pipeline_with_config()两个函数.
限于篇幅, 我们在此只讨论默认配置运行流程, 大致梳理清楚相关逻辑后, 可自行修改相关配置.
默认配置生成逻辑:cli.py::_create_default_config()首先检查根目录以及配置文件 settings.yaml, 然后执行cli.py::_read_config_parameters()读取系统配置, 如 llm, chunks, cache, storage等, 接下来的操作比较关键, 后续会根据当前参数创建一个pipeline的配置, 具体代码位于create_pipeline_config.py::create_pipeline_config(), 这块绕了蛮久, 可以说是整个项目中逻辑比较复杂的模块了.
如果深入create_pipeline_config.py::create_pipeline_config()的代码, 可以发现其核心逻辑如下
result = PipelineConfig(
root_dir=settings.root_dir,
input=_get_pipeline_input_config(settings),
reporting=_get_reporting_config(settings),
storage=_get_storage_config(settings),
cache=_get_cache_config(settings),
workflows=[
*_document_workflows(settings, embedded_fields),
*_text_unit_workflows(settings, covariates_enabled, embedded_fields),
*_graph_workflows(settings, embedded_fields),
*_community_workflows(settings, covariates_enabled, embedded_fields),
*(_covariate_workflows(settings) if covariates_enabled else []),
],
)这段代码的基本逻辑就是根据不同的功能生成完整的workflow序列, 此处需要注意的是这里并不考虑workflow之间的依赖关系, 单纯基于workflows/v1目录下的各workflow的模板生成一系列的workflow.
Pipeline执行逻辑:run.py::run_pipeline_with_config()首先根据load_pipeline_config.py::load_pipeline_config()加载现有pipeline的配置(由上一步中cli.py::_create_default_config()生成), 然后创建目标文件中的其他子目录, 如cache, storage, input, output等(详见上一节目录结构树), 然后再利用run.py::run_pipeline()依次执行每一个工作流, 并返回相应结果.
这里有必要再单独说明一下run.py::run_pipeline(), 该函数用于真正的执行所有pipeline, 其核心逻辑包含以下两部分:
加载workflows:workflows/load.py::load_workflows(), 除了常规工作流的创建外, 还会涉及到拓扑排序问题.
workflows/load.py::create_workflow(): 利用已有模板, 创建不同的工作流
graphlib::topological_sort(): 根据workflow之间的依赖关系, 计算DAG的拓扑排序
进一步执行inject_workflow_data_dependencies(),write_workflow_stats(),emit_workflow_output等操作, 分别用于依赖注入, 数据写入以及保存, 真正的workflow.run操作会在write_workflow_stats()之前执行, 此处的核心逻辑可参考以下代码
await dump_stats()
for workflow_to_run in workflows_to_run:
# Try to flush out any intermediate dataframes
gc.collect()
workflow = workflow_to_run.workflow
workflow_name: str = workflow.name
last_workflow = workflow_name
log.info("Running workflow: %s...", workflow_name)
if is_resume_run and await storage.has(
f"{workflow_to_run.workflow.name}.parquet"
):
log.info("Skipping %s because it already exists", workflow_name)
continue
stats.workflows[workflow_name] = {"overall": 0.0}
await inject_workflow_data_dependencies(workflow)
workflow_start_time = time.time()
result = await workflow.run(context, callbacks)
await write_workflow_stats(workflow, result, workflow_start_time)
# Save the output from the workflow
output = await emit_workflow_output(workflow)
yield PipelineRunResult(workflow_name, output, None)
output = None
workflow.dispose()
workflow = None
stats.total_runtime = time.time() - start_time
await dump_stats()根据以上信息, 我们可以大致梳理出索引环节的完整工作流
初始化: 生成必要的配置文件, 缓存, input/output目录等
索引: 根据配置文件, 利用workflow模板创建一系列的pipeline, 并依据依赖关系, 调整实际执行顺序, 再依次执行
截至目前, 我们实际上还没有真正分析index阶段的业务逻辑, 只是搞清楚了GraphRAG内置的这套pipeline编排系统该如何工作. 这里以index/workflows/v1/create_final_entities.py为例, 一起看下具体的一个workflow是如何运行的.
DataShaper: 讨论workflow之前, 先简单了解下项目使用的另一个框架:datashaper是微软开源的一款用于执行工作流处理的库, 还内置了很多组件, 内置了很多组件(专业名词叫做`Verb`). 通俗来讲, datashaper就像一条流水线, 每一步定义了作用于input数据的一种操作, 类似pytorch图像变换中clip, rotate, scale等操作, 如果还是不能理解, 建议直接跑一下官方文档中的demo程序examples/single_verb, 应该就大致清楚怎么回事了. 从功能上来讲, 个人感觉有点像Prefect.
create_final_entities.py, 翻阅源码可以发现, 该workflow会依赖于workflow:create_base_extracted_entities, 同时定义了cluster_graph,embed_graph等操作, 其中cluster_graph采用了leiden策略, 具体代码位于index/verbs/graph/clustering/cluster_graph.py,from datashaper import TableContainer, VerbCallbacks, VerbInput, progress_iterable, verb
@verb(name="cluster_graph")
def cluster_graph(
input: VerbInput,
callbacks: VerbCallbacks,
strategy: dict[str, Any],
column: str,
to: str,
level_to: str | None = None,
**_kwargs,
) -> TableContainer:可以看出, 实际上就是加了一个verb装饰器而已, 进一步跟进 strategy 的实现可以发现, 这里的leiden算法实际上也是源自另一个图算法库graspologic-org/graspologic: Python package for graph statistics (github.com)
Pipeline: 搞清楚了workflow的执行逻辑, 再根据上节最后提到的编排日志, 或者artifacts/stats.json文件, 就可以整理出完整工作流了. 官方文档也放出了非常详细的说明, 参见Indexing Dataflow (microsoft.github.io)不过我这边依据源码提取出来的pipeline好像还是有些差异, 有做过相关工作的可以留言探讨下.
查询阶段的pipeline相对而言要简单些, 执行query的语句如下:
# Global search
python -m graphrag.query \
--root ./ragtest \
--method global \
"What are the top themes in this story?"
# Local search
python -m graphrag.query \
--root ./ragtest \
--method local \
"Who is Scrooge, and what are his main relationships?"这里需要注意的是有Global/Local search两种模式. 还是先生成函数调用关系图, 对整体结构能有个大致了解.
graphrag/query/__main__.py中的主函数会依据参数不同, 分别路由至cli::run_local_search()以及cli::run_global_search()
cli::run_global_search()主要调用了factories.py::get_global_search_engine()函数, 返回一个详细定义的GlobalSearch类, 进一步跟进去, 发现该类跟LocalSearch类相似, 都是基于工厂模式创建, 其核心方法为structured_search/global_search/search.py::GlobalSearch.asearch(), 具体使用了map-reduce方法, 首先使用大模型并行地为每个社区的summary生成答案, 然后再汇总所有答案生成最终结果, 此处建议参考map/reduce的相关prompts, 作者给出了非常详细的说明.
也正是因为这种map-reduce的机制, 导致global search对token的消耗量极大.
与全局搜索类似,cli::run_local_search()函数主要也是调用了factories.py::get_local_search_engine(), 返回一个LocalSearch类, 这里的asearch()相对比较简单, 会直接根据上下文给出回复, 这种模式更接近于常规的RAG语义检索策略, 所消耗的Token也比较少.
与全局搜索不同的地方在于, Local 模式综合了nodes, community_reports, text_units, relationships, entities, covariates等多种源数据, 这一点在官方文档中也给出了非常详细的说明, 不再赘述.
GraphRAG最核心的卖点就在于一定程度上解决了聚焦于查询的总结性(QueryFocused Summarization, QFS)任务, 这一点就个人了解到的, 应该还是首创, 在此之前, 思路上最接近的应该就是 [parthsarthi03/raptor: The official implementation of RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval (http://github.com)](https://github.com/parthsarthi03/raptor), 不过后者并非针对知识图谱. QFS与Multi-Hop Q&A应该是现阶段RAG系统都暂时无法解决的难点, 但是对于很多场景, 尤其是数据分析领域, 却有着广泛的应用. 虽然当下GraphRAG的使用成本还很高, 不过至少提供了一种可能性.
我个人觉得目前的GraphRAG也仍旧还有很多值得改进的地方, 比如搞了很多让人云里雾里的名词, 诸如 Emit, Claim, Verbs, Reporting 之类, 同时夹带私货, 用了一些微软自家的相对比较小众的库, 这也进一步加大了理解上的难度, 此外, 模块化方面应该也有待加强, 尤其是OpenAI那块, 耦合严重.
综合来看, 微软本次放出的GraphRAG框架确实有不少干货, 值得花些时间去仔细研读和思考.
Welcome to GraphRAG (microsoft.github.io)
GraphRAG: LLM-Derived Knowledge Graphs for RAG (youtube.com)
GraphRAG is now on GitHub | Hacker News (ycombinator.com)
GraphRAG & Ollama - intelligent Search of local data : r/LocalLLaMA (reddit.com)
GraphRAG on narrative private data : r/LocalLLaMA (reddit.com)
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |