概述本文介绍LightRAG文档插入的实现逻辑。通过本文的分析,可以对文档插入的实现过程有一个比较清晰的理解。可以根据本文的分析,进一步分析文档插入的细节。
文档查询的基本使用下面的代码是使用LightRAG的最简单的流程。打开一个文本文件,插入到LightRAG中,然后对文档进行对话。 WORKING_DIR="./dickens"
ifnotos.path.exists(WORKING_DIR): os.mkdir(WORKING_DIR) # 构建LightRAG对象 rag=LightRAG( working_dir=WORKING_DIR, llm_model_func=gpt_4o_mini_complete, # Use gpt_4o_mini_complete LLM model # llm_model_func=gpt_4o_complete # Optionally, use a stronger model ) # 打开文档,并插入到LightRAG中 withopen("./dickens/book.txt","r",encoding="utf-8")asf: rag.insert(f.read())
# 执行原始的本地查询 print( rag.query("What are the top themes in this story?",param=QueryParam(mode="naive")) )
文档插入过程实现分析文档插入的实现函数如下: classLightRAG: definsert(self,string_or_strings): loop=always_get_an_event_loop() returnloop.run_until_complete(self.ainsert(string_or_strings))
ainsert()函数实现分析该函数总体实现逻辑: (1)为每个文档生成唯一ID并创建文档字典,移除首尾空白 (2)检查哪些文档是最新的,只保留最新的文档,若没有最新的文档,则告警并返回 (3)对文件内容进行分块,并检查哪些分块是最新的,若没有最新的分块,则直接返回; (4)将分块保存到分块存储字典中,并将分块向量化后保存到向量数据库中 (5)在所有分块中提取实体和关系,若没有提取到最新的实体和关系,则直接返回。 (6)将实体和实体的关系保存到图数据库中。
实现代码如下asyncdefainsert(self,string_or_strings): # 定义异步插入方法,接收字符串或字符串列表参数 update_storage=False # 初始化存储更新标志为False try: ifisinstance(string_or_strings,str): string_or_strings=[string_or_strings] # 如果输入是单个字符串,转换为列表形式
new_docs={ compute_mdhash_id(c.strip(),prefix="doc-"): {"content":c.strip()} forcinstring_or_strings } # 为每个文档生成唯一ID并创建文档字典,移除首尾空白
_add_doc_keys=awaitself.full_docs.filter_keys(list(new_docs.keys())) # 检查哪些文档ID是新的(未存储过的)
new_docs={k:vfork,vinnew_docs.items()ifkin_add_doc_keys} # 只保留新文档
ifnotlen(new_docs): logger.warning("All docs are already in the storage") return # 如果没有新文档,记录警告并返回
update_storage=True logger.info(f"[New Docs] inserting{len(new_docs)}docs") # 设置更新标志,记录新文档数量
inserting_chunks={} # 初始化分块字典
fordoc_key,docintqdm_async(new_docs.items(),desc="Chunking documents",unit="doc"): # 遍历每个文档,显示进度条 chunks={ compute_mdhash_id(dp["content"],prefix="chunk-"): { **dp, "full_doc_id":doc_key, } fordpinchunking_by_token_size( doc["content"], overlap_token_size=self.chunk_overlap_token_size, max_token_size=self.chunk_token_size, tiktoken_model=self.tiktoken_model_name, ) } # 对文档内容进行分块,为每个分块生成ID,并关联原始文档ID
inserting_chunks.update(chunks) # 将当前文档的分块添加到总分块字典中
_add_chunk_keys=awaitself.text_chunks.filter_keys(list(inserting_chunks.keys())) # 检查哪些分块ID是新的
inserting_chunks={k:vfork,vininserting_chunks.items()ifkin_add_chunk_keys} # 只保留新分块
ifnotlen(inserting_chunks): logger.warning("All chunks are already in the storage") return # 如果没有新分块,记录警告并返回
logger.info(f"[New Chunks] inserting{len(inserting_chunks)}chunks") # 记录新分块数量
awaitself.chunks_vdb.upsert(inserting_chunks) # 将分块存入向量数据库
logger.info("[Entity Extraction]...") maybe_new_kg=awaitextract_entities( inserting_chunks, knowledge_graph_inst=self.chunk_entity_relation_graph, entity_vdb=self.entities_vdb, relationships_vdb=self.relationships_vdb, global_config=asdict(self), ) # 从分块中提取实体和关系
ifmaybe_new_kgisNone: logger.warning("No new entities and relationships found") return # 如果没有找到新实体和关系,记录警告并返回
self.chunk_entity_relation_graph=maybe_new_kg # 更新知识图谱
awaitself.full_docs.upsert(new_docs) awaitself.text_chunks.upsert(inserting_chunks) # 将完整文档和分块存入各自的存储
finally: ifupdate_storage: awaitself._insert_done() # 如果进行了存储更新,执行清理工作
小结本文分析了LightRAG的文档插入过程的总体实现过程。通过本文的分析可知,LightRAG首先会对文档进行切割并计算嵌入向量,除此之外,还会从文档中提取实体和实体之间的关系。 |