链载Ai

标题: LightRAG实现原理分析-文档的解析和插入 [打印本页]

作者: 链载Ai    时间: 前天 12:05
标题: LightRAG实现原理分析-文档的解析和插入

概述

本文介绍LightRAG文档插入的实现逻辑。通过本文的分析,可以对文档插入的实现过程有一个比较清晰的理解。可以根据本文的分析,进一步分析文档插入的细节。


文档查询的基本使用

下面的代码是使用LightRAG的最简单的流程。打开一个文本文件,插入到LightRAG中,然后对文档进行对话。

WORKING_DIR="./dickens"

ifnotos.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
# 构建LightRAG对象
rag=LightRAG(
working_dir=WORKING_DIR,
llm_model_func=gpt_4o_mini_complete, # Use gpt_4o_mini_complete LLM model
# llm_model_func=gpt_4o_complete # Optionally, use a stronger model
)
# 打开文档,并插入到LightRAG中
withopen("./dickens/book.txt","r",encoding="utf-8")asf:
rag.insert(f.read())

# 执行原始的本地查询
print(
rag.query("What are the top themes in this story?",param=QueryParam(mode="naive"))
)


文档插入过程实现分析

文档插入的实现函数如下:

classLightRAG:
definsert(self,string_or_strings):
loop=always_get_an_event_loop()
returnloop.run_until_complete(self.ainsert(string_or_strings))


ainsert()函数实现分析

该函数总体实现逻辑:

(1)为每个文档生成唯一ID并创建文档字典,移除首尾空白

(2)检查哪些文档是最新的,只保留最新的文档,若没有最新的文档,则告警并返回

(3)对文件内容进行分块,并检查哪些分块是最新的,若没有最新的分块,则直接返回;

(4)将分块保存到分块存储字典中,并将分块向量化后保存到向量数据库中

(5)在所有分块中提取实体和关系,若没有提取到最新的实体和关系,则直接返回。

(6)将实体和实体的关系保存到图数据库中。


实现代码如下

asyncdefainsert(self,string_or_strings):
# 定义异步插入方法,接收字符串或字符串列表参数

update_storage=False
# 初始化存储更新标志为False

try:
ifisinstance(string_or_strings,str):
string_or_strings=[string_or_strings]
# 如果输入是单个字符串,转换为列表形式

new_docs={
compute_mdhash_id(c.strip(),prefix="doc-"): {"content":c.strip()}
forcinstring_or_strings
}
# 为每个文档生成唯一ID并创建文档字典,移除首尾空白

_add_doc_keys=awaitself.full_docs.filter_keys(list(new_docs.keys()))
# 检查哪些文档ID是新的(未存储过的)

new_docs={k:vfork,vinnew_docs.items()ifkin_add_doc_keys}
# 只保留新文档

ifnotlen(new_docs):
logger.warning("All docs are already in the storage")
return
# 如果没有新文档,记录警告并返回

update_storage=True
logger.info(f"[New Docs] inserting{len(new_docs)}docs")
# 设置更新标志,记录新文档数量

inserting_chunks={}
# 初始化分块字典

fordoc_key,docintqdm_async(new_docs.items(),desc="Chunking documents",unit="doc"):
# 遍历每个文档,显示进度条

chunks={
compute_mdhash_id(dp["content"],prefix="chunk-"): {
**dp,
"full_doc_id":doc_key,
}
fordpinchunking_by_token_size(
doc["content"],
overlap_token_size=self.chunk_overlap_token_size,
max_token_size=self.chunk_token_size,
tiktoken_model=self.tiktoken_model_name,
)
}
# 对文档内容进行分块,为每个分块生成ID,并关联原始文档ID

inserting_chunks.update(chunks)
# 将当前文档的分块添加到总分块字典中

_add_chunk_keys=awaitself.text_chunks.filter_keys(list(inserting_chunks.keys()))
# 检查哪些分块ID是新的

inserting_chunks={k:vfork,vininserting_chunks.items()ifkin_add_chunk_keys}
# 只保留新分块

ifnotlen(inserting_chunks):
logger.warning("All chunks are already in the storage")
return
# 如果没有新分块,记录警告并返回

logger.info(f"[New Chunks] inserting{len(inserting_chunks)}chunks")
# 记录新分块数量

awaitself.chunks_vdb.upsert(inserting_chunks)
# 将分块存入向量数据库

logger.info("[Entity Extraction]...")
maybe_new_kg=awaitextract_entities(
inserting_chunks,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=asdict(self),
)
# 从分块中提取实体和关系

ifmaybe_new_kgisNone:
logger.warning("No new entities and relationships found")
return
# 如果没有找到新实体和关系,记录警告并返回

self.chunk_entity_relation_graph=maybe_new_kg
# 更新知识图谱

awaitself.full_docs.upsert(new_docs)
awaitself.text_chunks.upsert(inserting_chunks)
# 将完整文档和分块存入各自的存储

finally:
ifupdate_storage:
awaitself._insert_done()
# 如果进行了存储更新,执行清理工作


小结

本文分析了LightRAG的文档插入过程的总体实现过程。通过本文的分析可知,LightRAG首先会对文档进行切割并计算嵌入向量,除此之外,还会从文档中提取实体和实体之间的关系。






欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5