链载Ai

标题: ragflow架构解析及性能优化方式 [打印本页]

作者: 链载Ai    时间: 前天 21:00
标题: ragflow架构解析及性能优化方式

目前好用的rag系统有非常多,而其中ragflow就是最有代表性的佼佼者,如果把ragflow用到自己的生产系统中,有哪些需要基本了解的点呢,趁着周末打开了很久之前写的一个内容,把我的理解和各位朋友分享一下。由于ragflow比较大,我把之前写的解析数据处理、Retriever和Embedding模块的设计与实现逻辑的这一部分给修改出来。感兴趣的朋友们可以基于此作性能优化或二开。

数据处理模块

模块职能与流程:ragflow 的数据处理模块负责将原始资料(文档、图片、音频等)加载并预处理成适合检索的文本“切片”。其核心流程包括:数据加载(读取文件内容)、解析分块(根据文件类型提取文本和相关信息,按策略切分成多个片段)、向量化嵌入(对每个片段生成语义向量)、以及持久化存储(将片段及向量写入向量索引或数据库)。这一流程主要由DocumentService等服务类配合各类型文件的解析器完成。

关键类与方法:DocumentServiceapi/db/services/document_service.py)是数据处理的核心调度者。它利用解析器工厂根据文档类型选择相应的解析器,对每个上传的文档进行并行处理。DocumentService维护一个解析器映射表(如FACTORY),将文件的parser_id映射到具体解析器模块,如Word/PDF使用通用文本解析,PPT使用presentation解析,图片使用picture解析,音频使用audio解析等。

FACTORY={ParserType.PRESENTATION.value:presentation,ParserType.PICTURE.value:picture,ParserType.AUDIO.value:audio,ParserType.EMAIL.value:email}parser_config={"chunk_token_num":4096,"delimiter":"\n!?;。;!?","layout_recognize":"lainText"}exe=ThreadPoolExecutor(max_workers=12)threads=[]...ford,blobinfiles:kwargs={"callback":dummy,"parser_config":parser_config,"from_page":0,"to_page":100000,"tenant_id":kb.tenant_id,"lang":kb.language}threads.append(exe.submit(FACTORY.get(d["parser_id"],naive).chunk,d["name"],blob,**kwargs))

每种解析器实现一个统一的chunk接口,负责将输入文件解析成若干带有元数据的文本块。例如,PPT 解析器会提取每页幻灯片的文本和缩略图,PDF 解析器则按照页码拆分文本并结合OCR提取扫描图片。

ifre.search(r"\.pptx?$", filename, re.IGNORECASE):    ppt_parser = Ppt()   forpn, (txt, img)inenumerate(ppt_parser(        filenameifnotbinaryelsebinary, from_page,1000000, callback)):      d = copy.deepcopy(doc)      pn += from_page      d["image"] = img      d["page_num_int"] = [pn +1]      d["top_int"] = [0]      d["position_int"] = [(pn +1,0, img.size[0],0, img.size[1])]      tokenize(d, txt, eng)      res.append(d)   returnreselifre.search(r"\.pdf$", filename, re.IGNORECASE):    pdf_parser = Pdf()   ifkwargs.get("layout_recognize","DeepDOC") =="lain Text":      pdf_parser = PlainParser()   forpn, (txt, img)inenumerate(pdf_parser(filename, binary,                         from_page=from_page, to_page=to_page, callback=callback)):      d = copy.deepcopy(doc)      pn += from_page     ifimg:        d["image"] = img      d["page_num_int"] = [pn +1]      d["top_int"] = [0]      d["position_int"] = [(pn +1,0, img.size[0]ifimgelse0,0, img.size[1]ifimgelse0)]      tokenize(d, txt, eng)      res.append(d)   returnres

在处理流程中,DocumentService会启动一个线程池(默认最多12线程)并发执行多个文档的解析。每个线程调用相应解析器的chunk方法将文档切分成片段列表,并返回包含文本内容(以及可能的权重、图像等)的字典结构。例如,对于文本型文档,解析器会对段落或页进行切分,并调用rag_tokenizer.tokenize进行中文分词细粒度标记。解析结果中还包含文档ID、知识库ID等元数据,用于后续索引。对于解析出的图片,如PPT页面截图或PDF扫描图片,系统会将其二进制数据存入STORAGE_IMPL指定的存储(如本地或云存储,默认是MINIO),并在片段数据中仅保留引用(如img_id)以减少向量库负担。

for (docinfo, _),thin zip(files, threads):    docs = []    doc = {      "doc_id": docinfo["id"],     "kb_id": [kb.id]    }    for ck inth.result():      d =deepcopy(doc)      d.update(ck)      d["id"] = xxhash.xxh64((ck["content_with_weight"] +str(d["doc_id"])).encode("utf-8")).hexdigest()      d["create_time"] =str(datetime.now()).replace("T"," ")[:19]      d["create_timestamp_flt"] = datetime.now().timestamp()      if not d.get("image"):        docs.append(d)        continue
output_buffer =BytesIO() ifisinstance(d["image"], bytes): output_buffer =BytesIO(d["image"]) else: d["image"].save(output_buffer, format='JPEG')
STORAGE_IMPL.put(kb.id, d["id"], output_buffer.getvalue()) d["img_id"] ="{}-{}".format(kb.id, d["id"]) d.pop("image", None) docs.append(d)
classStorageFactory:  storage_mapping = {   Storage.MINIO:RAGFlowMinio,   Storage.AZURE_SPN:RAGFlowAzureSpnBlob,   Storage.AZURE_SAS:RAGFlowAzureSasBlob,   Storage.AWS_S3:RAGFlowS3,   Storage.OSS:RAGFlowOSS,  }
@classmethod defcreate(cls,storage:Storage): returncls.storage_mapping[storage]()

STORAGE_IMPL_TYPE= os.getenv('STORAGE_IMPL','MINIO')STORAGE_IMPL=StorageFactory.create(Storage[STORAGE_IMPL_TYPE])

潜在瓶颈:当前数据处理流程虽然通过线程池并行解析多个文档,但仍有性能瓶颈和改进空间:

@classmethod@DB.connection_context()defremove_document(cls,doc,tenant_id):cls.clear_chunk_num(doc.id)try:settings.docStoreConn.delete({"doc_id":doc.id},search.index_name(tenant_id),doc.kb_id)graph_source=settings.docStoreConn.getFields(settings.docStoreConn.search(["source_id"],[],{"kb_id":doc.kb_id,"knowledge_graph_kwd":["graph"]},[],OrderByExpr(),0,1,search.index_name(tenant_id),[doc.kb_id]),["source_id"])iflen(graph_source)>0anddoc.idinlist(graph_source.values())[0]["source_id"]:settings.docStoreConn.update({"kb_id":doc.kb_id,"knowledge_graph_kwd":["entity","relation","graph","subgraph","community_report"],"source_id":doc.id},{"remove":{"source_id":doc.id}},search.index_name(tenant_id),doc.kb_id)settings.docStoreConn.update({"kb_id":doc.kb_id,"knowledge_graph_kwd":["graph"]},{"removed_kwd":"Y"},search.index_name(tenant_id),doc.kb_id)settings.docStoreConn.delete({"kb_id":doc.kb_id,"knowledge_graph_kwd":["entity","relation","graph","subgraph","community_report"],"must_not":{"exists":"source_id"}},search.index_name(tenant_id),doc.kb_id)exceptException:passreturncls.delete_by_id(doc.id)
defembedding(doc_id,cnts,batch_size=16):nonlocalembd_mdl,chunk_counts,token_countsvects=[]foriinrange(0,len(cnts),batch_size):vts,c=embd_mdl.encode(cnts[i:i+batch_size])vects.extend(vts.tolist())chunk_counts[doc_id]+=len(cnts[i:i+batch_size])token_counts[doc_id]+=creturnvects

完成解析和嵌入后,DocumentService通过docStoreConn接口将片段数据持久化到向量存储。它先检查或创建索引(index)结构,然后按批次(默认每批64条)插入片段。每个片段包含文本、元数据以及生成的向量(字段名形如q_<dim>_vec)。同时,DocumentService会更新关系型数据库中的文档记录(如总切片数、总Token数等)以供监控。整个数据入库流程确保了解析 -> 嵌入 -> 存储的顺序执行,并在出现错误时回滚或记录异常。

fori, d in enumerate(cks):  v = vects[i]  d["q_%d_vec"%len(v)] = vfor b inrange(0,len(cks), es_bulk_size=64):  if try_create_idx:    if not settings.docStoreConn.indexExist(idxnm, kb_id):      settings.docStoreConn.createIdx(idxnm, kb_id,len(vects[0]))    try_create_idx = False  settings.docStoreConn.insert(cks[b:b + es_bulk_size], idxnm, kb_id)

Retriever 模块

结构设计:ragflow的检索模块采用面向接口的设计,屏蔽了不同向量库实现的差异。核心抽象为DocStoreConnectionrag/utils/doc_store_conn.py),以及基于它构建的search.Dealer检索器(位于rag/nlp/search.py)。系统在初始化时根据配置的后端向量引擎类型(DOC_ENGINE)创建相应的DocStoreConnection实例,并赋给全局的settings.docStoreConn

globalDOC_ENGINE, docStoreConn, retrievaler, kg_retrievalerDOC_ENGINE = os.environ.get("DOC_ENGINE","elasticsearch")
lower_case_doc_engine = DOC_ENGINE.lower()iflower_case_doc_engine =="elasticsearch": docStoreConn = rag.utils.es_conn.ESConnection()eliflower_case_doc_engine =="infinity": docStoreConn = rag.utils.infinity_conn.InfinityConnection()eliflower_case_doc_engine =="opensearch": docStoreConn = rag.utils.opensearch_coon.OSConnection()else: raiseException(f"Not supported doc engine:{DOC_ENGINE}")
retrievaler = search.Dealer(docStoreConn)kg_retrievaler = kg_search.KGSearch(docStoreConn)

支持的引擎包括:Elasticsearch(利用其内置稠密向量索引)、OpenSearch(偏传统搜索引擎的向量支持),以及InfiniFlow团队自研的“Infinity”引擎。每种引擎在rag/utils目录下有对应实现类(如ESConnectionOSConnectionInfinityConnection),它们都提供统一的方法,例如:createIdx创建索引、insert插入文档、search执行搜索、getTotal获取结果总数等。在代码中可以看到,根据配置选择不同引擎,否则抛出异常“Not supported doc engine”。这种设计使得更换底层向量库(如从Elasticsearch换成Milvus或FAISS)对上层检索流程影响较小,只需实现新的DocStoreConnection子类并配置启用即可。

增加新的向量库类型:尽管还有FAISS、Weaviate、Milvus等向量库,当前源码里主要直接支持的还是上述三种后端。ragflow架构上具备扩展能力:开发者可以参考现有DocStoreConnection接口封装新的后端(例如利用FAISS在本地内存建立索引,或通过Weaviate/Milvus的Python客户端发送查询)。引入这些向量库有望在纯向量检索性能和分布式扩展上提供优势。

检索策略:ragflow 的 Retriever 模块采用多路召回融合的策略,即同时执行稠密向量检索和稀疏关键字检索,并融合结果进行重排序。在search.Dealer.search方法中,这一逻辑清晰可见:首先,对用户查询进行处理,生成语义向量查询文本关键词查询。语义向量查询通过Embedding模块将用户问题编码成向量(使用emb_mdl.encode_queries得到查询向量);

defget_vector(self, txt, emb_mdl, topk=10, similarity=0.1):    qv, _ = emb_mdl.encode_queries(txt)    shape = np.array(qv).shape   iflen(shape) >1:     raiseException(       f"Dealer.get_vector returned array's shape{shape}doesn't match expectation(exact one dimension).")    embedding_data = [get_float(v)forvinqv]    vector_column_name =f"q_{len(embedding_data)}_vec"   returnMatchDenseExpr(vector_column_name, embedding_data,'float','cosine', topk, {"similarity": similarity})

关键词查询则使用FulltextQueryer将问题转换为搜索式(提取关键词并设定匹配度)。接着,Dealer构造一个融合查询,将稠密向量匹配(MatchDenseExpr)和稀疏文本匹配(MatchTextExpr)组合(加权融合则通过FusionExpr实现)。最终通过docStoreConn.search接口提交给底层引擎执行。例如,当底层是ElasticSearch,会同时在索引中进行match文本查询和向量最近邻查询,再根据预设的相似度权重融合得分。当初次检索结果为空或不足时,系统降低文本匹配阈值(min_match)并再次发起结合关键词匹配(matchText) +向量匹配(matchDense) +融合表达式(fusionExpr)的检索,以最大限度召回相关片段。

matchText,keywords=self.qryr.question(qst,min_match=0.3)ifemb_mdlisNone:matchExprs=[matchText]res=self.dataStore.search(src,highlightFields,filters,matchExprs,orderBy,offset,limit,idx_names,kb_ids,rank_feature=rank_feature)total=self.dataStore.getTotal(res)logging.debug("Dealer.searchTOTAL:{}".format(total))else:matchDense=self.get_vector(qst,emb_mdl,topk,req.get("similarity",0.1))q_vec=matchDense.embedding_datasrc.append(f"q_{len(q_vec)}_vec")fusionExpr=FusionExpr("weighted_sum",topk,{"weights":"0.05,0.95"})matchExprs=[matchText,matchDense,fusionExpr]res=self.dataStore.search(src,highlightFields,filters,matchExprs,orderBy,offset,limit,idx_names,kb_ids,rank_feature=rank_feature)total=self.dataStore.getTotal(res)logging.debug("Dealer.searchTOTAL:{}".format(total))#Ifresultisempty,tryagainwithlowermin_matchiftotal==0:iffilters.get("doc_id"):res=self.dataStore.search(src,[],filters,[],orderBy,offset,limit,idx_names,kb_ids)total=self.dataStore.getTotal(res)else:matchText,_=self.qryr.question(qst,min_match=0.1)filters.pop("doc_id",None)matchDense.extra_options["similarity"]=0.17res=self.dataStore.search(src,highlightFields,filters,[matchText,matchDense,fusionExpr],orderBy,offset,limit,idx_names,kb_ids,rank_feature=rank_feature)total=self.dataStore.getTotal(res)logging.debug("Dealer.search2TOTAL:{}".format(total))

除了 dense/sparse 双检索外,ragflow 还支持知识图谱检索(KGSearch)等多模态场景,代码中KGSearch(docStoreConn)表明针对结构化知识使用知识图谱检索器。但就文本QA场景而言,当前主要是稠密与稀疏信息的互补。这样设计的好处是:召回率更高且结果有解释性。稠密向量确保语义相关但可能引入模糊匹配,稀疏关键词确保精确匹配但可能漏掉语义改述,通过融合可以综合两者优点。同时,Dealer在返回结果时也提供了高亮和聚合等信息用于结果呈现。

classSearchResult:total:intids:list[str]query_vector:list[float]|None=Nonefield:dict|None=Nonehighlight:dict|None=Noneaggregation:list|dict|None=Nonekeywords:list[str]|None=Nonegroup_docs:list[list]|None=None

检索结果返回后,ragflow还可以执行重排序(rerank)操作。代码中预留了RERANK_MDL配置以及rag/llm/rerank_model.py模块用于加载例如BCE、BGE这样的cross-encoder重排序模型。在问答流程中,当Retriever找回N条候选片段后,调用similarity(query, texts)对片段与原问句打分,再重新排序以提升相关性。这种两阶段检索在文档中被称为“融合重排序”,可有效提升最终答案依据的准确性。

潜在问题:

Retriever模块通过接口封装实现对不同向量存储的支持,采用稠密+稀疏融合提升召回和准确率,搭配可选的重排序模型以进一步优化结果排序。为让检索模块更为健壮,后续可以:扩展支持更丰富的后端(Faiss/Milvus 等)以提高性能和部署灵活性;引入Agent式检索调优(例如动态调整查询或多跳检索);加强并行与分布式能力,确保在大规模知识库下也能快速稳定地返回结果。

嵌入(Embedding)模块

支持的嵌入模型:ragflow的嵌入模块设计非常灵活,集成了多种来源的Embedding模型,包括本地和云端模型。例如:

Embedding模块通过不同子类囊括了主流的嵌入方案:既支持开源模型离线推理,也支持各大厂商在线服务。配置上,系统允许用户设置EMBEDDING_MDL,选择默认的embedding模型。例如默认embedding模型列表里包括BGE-largeYoudao BCE。根据部署模式(LIGHTEN配置)可以决定是否加载大模型权重还是调用轻量服务。

BUILTIN_EMBEDDING_MODELS=["BAAI/bge-large-zh-v1.5@BAAI","maidalun1020/bce-embedding-base_v1@Youdao"]

Token限制与批处理策略:由于不同模型对输入文本长度有限制,代码对长文本都做了截断处理,避免超过模型支持的token数。比如OpenAI的ada-002模型最多支持8191个token,OpenAIEmbed.encode对每段文本截断至8191长度;智谱embedding-3模型上限3072token,代码中据此截断输入。本地FlagEmbedding模型也将文本裁剪在2048字符左右以控制计算量。这种截断保证了对任意长文段都能安全编码,但也可能丢失部分信息。优化上,未来可考虑对超长文本分段嵌入(如对长段落进一步切分取均值)或者引入长上下文embedding模型。

classZhipuEmbed(Base): def__init__(self, key, model_name="embedding-2", **kwargs):    self.client = ZhipuAI(api_key=key)    self.model_name = model_name
defencode(self, texts:list): arr = [] tks_num =0 MAX_LEN = -1 ifself.model_name.lower() =="embedding-2": MAX_LEN =512 ifself.model_name.lower() =="embedding-3": MAX_LEN =3072 ifMAX_LEN >0: texts = [truncate(t, MAX_LEN)fortintexts]
fortxtintexts: res = self.client.embeddings.create(input=txt, model=self.model_name) arr.append(res.data[0].embedding) tks_num += self.total_token_count(res) returnnp.array(arr), tks_num
defencode_queries(self, text): res = self.client.embeddings.create(input=text, model=self.model_name) returnnp.array(res.data[0].embedding), self.total_token_count(res)

在批处理方面,多数实现将请求按批大小16拆分,以兼顾效率和API限制。例如OpenAI每次请求最多16条输入,代码中显式设置batch_size=16;本地模型也通常16条一批以充分利用矩阵并行。也有特殊情况,如Qwen的DashScope接口批量能力较弱,仅用batch_size=4。批处理逻辑多采用简单的for循环累积结果。例如对一组文本列表,按16条一组调用底层模型获取embedding,并将结果扩展成一个numpy数组返回。每批次调用后,有的实现如OpenAI会累加已处理的token数方便后续记录。另外,有些会提供encode_queries专门处理单条查询的情况,有的则会直接调用encode([text])简化实现。总的来看,当前批处理策略较为保守,没有动态调整批大小或并行发出多个批次请求。

classLocalAIEmbed(Base): def__init__(self, key, model_name, base_url):   ifnotbase_url:     raiseValueError("Local embedding model url cannot be None")   ifbase_url.split("/")[-1] !="v1":      base_url = os.path.join(base_url,"v1")   self.client =OpenAI(api_key="empty", base_url=base_url)   self.model_name = model_name.split("___")[0]
defencode(self,texts:list): batch_size =16 ress = [] foriinrange(0, len(texts), batch_size): res =self.client.embeddings.create(input=texts[i:i + batch_size], model=self.model_name) ress.extend([d.embeddingfordinres.data]) # local embedding for LmStudio donot count tokens returnnp.array(ress),1024
defencode_queries(self, text): embds, cnt =self.encode([text]) returnnp.array(embds[0]), cnt

多线程/并发实现:在Embedding模块内部,各模型的encode实现大多是同步的,并未显式使用多线程。但ragflow利用上层流程并发处理不同文档的嵌入来提高吞吐:DocumentService中对不同文档的chunk是并行的,多个文档的向量生成可以借助线程池同时进行(受限于GPU算力)。不过对于单个大文档,其所有chunk向量还是在单线程顺序计算。目前也没有看到使用GPU多卡并行计算同一批次embedding的机制。而且DefaultEmbedding类初始化时将环境变量CUDA_VISIBLE_DEVICES强制设为0锁定使用第一块GPU,这在多GPU机器上可能造成闲置。改进建议:考虑引入多线程/异步的编码方式。例如,对于需要调用外部API(OpenAI等),可以使用asyncio.gather同时发出多个请求批次,充分利用网络带宽;对于本地模型,如PyTorch的embedding模型,可采用多线程批次推理或OpenMP并行(前提是模型计算瓶颈在CPU)。此外,可以让用户配置使用哪些GPU设备,或在初始化时检测多GPU并对模型和数据进行切分,做到多卡并行嵌入(例如FlagEmbedding模型实例化多份各占一卡,分摊批次)。这些措施对大规模文档批量入库时的嵌入吞吐将有明显提升。

效率和可维护性评估:ragflow 的Embedding模块代码虽长但结构清晰:使用抽象基类Base定义接口,然后为不同服务商/模型各建一个子类实现。这种方式在增加新模型支持时需要修改代码增添类,但也提供了高度的控制力。目前来看,可维护性方面,每个子类基本独立,新增其它平台支持(例如TogetherAI、PerfX等,在代码中已经有一些雏形)很方便。如果后续模型种类继续增加,考虑采用插件机制或配置驱动会更灵活,例如通过配置文件列出可用的embedding后端及参数,运行时反射加载对应实现,避免主代码不断膨胀。

大规模文档嵌入情境下,当前结构可能暴露一些效率瓶颈:如所有文档embedding共用单一线程顺序执行,难以充分利用多核;本地大型模型(BGE large)占用显存大且推理速度相对慢,批量处理时GPU成为瓶颈。如果要提升吞吐,可考虑以下建议:

ragflow的Embedding模块具有灵活易扩展的优点。在后续扩展中,可以引入配置化和插件化降低新增模型的代码改动,并通过并行优化模型加速方案,使大规模文档嵌入过程更高效。随着开源社区新的embedding模型出现(如多模态embedding,将图片、音频也编码到同一向量空间),ragflow也可以通过类似架构无缝接入,从而增强系统在多模态检索上的能力。

模块依赖关系概览:数据处理模块负责将原始数据转化为向量库可用的结构,内部依赖解析器(如DeepDoc等库)提取文本,并调用 Embedding 模块完成向量生成;Embedding模块对接外部模型或服务,将文本转为语义向量;生成的文本片段和向量通过DocStore接口持久化到检索引擎。检索查询发生时,Retriever模块调用 Embedding 模块将用户查询转为向量,并结合关键词匹配通过 DocStore 接口检索相关片段;Retriever返回的结果可再送入重排序模型(二次调用Embedding或独立的Rerank模型)以优化顺序,最终将结果提供给上层问答流程使用。因此,数据处理 -> 向量存储 <- 检索查询这一闭环确保了知识库内容与用户问题在同一向量空间匹配。

各模块解耦明确:解析与嵌入模块以配置和接口衔接,检索模块通过统一接口适配不同存储,实现了较好的模块化和可扩展性。今后在此架构上进行二次开发和性能优化,只需针对瓶颈模块局部改进,而不影响整体流程。各改进方案如上所述,将有助于ragflow更高效地处理海量异构数据,服务大规模问答应用。






欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5