1. 引言
RAGFlow 是一个基于深度文档理解的开源 RAG(检索增强生成)引擎。接上文:关于 RAGFlow 项目中 RAG 技术的实现分析。本文将基于对 RAGFlow 源代码的分析,详细解析 RAGFlow 在检索召回(Retrieval)环节的实现机制。
2. RAGFlow 整体架构
根据 GitHub 仓库首页的介绍,RAGFlow 是一个"基于深度文档理解的开源 RAG 引擎",它提供了一个流线型的 RAG 工作流,结合大型语言模型(LLM)提供真实的问答能力,并通过各种复杂格式数据的引用支持回答。
RAGFlow 的核心特点包括:
- • 支持多种异构数据源(Word、幻灯片、Excel、txt、图像、扫描副本、结构化数据、网页等)
3. 检索找回机制的源码分析
通过分析 RAGFlow 的源代码rag/nlp/search.py、rag/nlp/query.py等核心文件,来详细了解其检索召回机制的实现。
3.1 检索流程概述
RAGFlow 的检索流程主要包括以下几个步骤:
3.2 查询处理与向量化
在rag/nlp/query.py中,FulltextQueryer类负责处理用户查询,将其转换为可用于检索的形式:
defquestion(self, txt, tbl="qa", min_match:float=0.6):
txt = FulltextQueryer.add_space_between_eng_zh(txt)
txt = re.sub(
r"[ :|\\r\\n\\t,,。??/`!!&^%%()\\[\\]{}<>]+"," ",
rag_tokenizer.tradi2simp(rag_tokenizer.strQ2B(txt.lower())),
).strip()
otxt = txt
txt = FulltextQueryer.rmWWW(txt)
ifnotself.isChinese(txt):
txt = FulltextQueryer.rmWWW(txt)
tks = rag_tokenizer.tokenize(txt).split()
keywords = [tfortintksift]
tks_w =self.tw.weights(tks, preprocess=False)
# ...处理权重和同义词
returnMatchTextExpr(self.query_fields, query,100, {"minimum_should_match": min_match}), keywords
这段代码展示了 RAGFlow 如何处理用户查询:
- 1. 对查询文本进行预处理(添加英中间的空格、繁体转简体、全角转半角等)
在rag/nlp/search.py中,Dealer类的get_vector方法负责将查询文本转换为向量:
defget_vector(self, txt, emb_mdl, topk=10, similarity=0.1):
qv, _ = emb_mdl.encode_queries(txt)
shape = np.array(qv).shape
iflen(shape) >1:
raiseException(
f"Dealer.get_vector returned array's shape{shape}doesn't match expectation(exact one dimension).")
embedding_data = [get_float(v)forvinqv]
vector_column_name =f"q_{len(embedding_data)}_vec"
returnMatchDenseExpr(vector_column_name, embedding_data,'float','cosine', topk, {"similarity": similarity})
这段代码展示了如何使用嵌入模型将查询文本转换为向量,并创建用于向量匹配的表达式。
3.3 混合检索机制
RAGFlow 采用了混合检索策略,结合了向量检索和全文检索。在rag/nlp/search.py的search方法中可以看到这一实现:
defsearch(self, req, idx_names:str|list[str], kb_ids:list[str], emb_mdl=None, highlight=False, rank_feature:dict|None=None):
# ...前置处理
ifnotqst:
# 处理无查询情况
# ...
else:
highlightFields = ["content_ltks","title_tks"]ifhighlightelse[]
matchText, keywords =self.qryr.question(qst, min_match=0.3)
ifemb_mdlisNone:
# 仅使用全文检索
matchExprs = [matchText]
res =self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)
# ...
else:
# 混合检索:结合向量检索和全文检索
matchDense =self.get_vector(qst, emb_mdl, topk, req.get("similarity",0.1))
q_vec = matchDense.embedding_data
src.append(f"q_{len(q_vec)}_vec")
fusionExpr = FusionExpr("weighted_sum", topk, {"weights":"0.05, 0.95"})
matchExprs = [matchText, matchDense, fusionExpr]
res =self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)
# ...
这段代码揭示了 RAGFlow 的混合检索策略:
- 2. 如果提供了嵌入模型,则创建向量检索表达式
matchDense - 3. 使用
FusionExpr融合两种检索结果,权重分别为 0.05 和 0.95,表明向量检索在混合检索中占主导地位 - 4. 如果初次检索结果为空,会尝试降低匹配阈值进行二次检索
3.4 重排序机制
RAGFlow 的重排序机制是其检索找回流程中的关键环节,主要通过rerank函数实现。在rag/nlp/search.py中,rerank函数负责对初步检索结果进行多维度重排序:
defrerank(self, sres, query, tkweight=0.3,
vtweight=0.7, rank_feature:dict|None=None):
"""
对检索结果进行重排序
参数:
- sres: 搜索结果
- query: 查询文本
- tkweight: 词元相似度权重
- vtweight: 向量相似度权重
- rank_feature: 排序特征
返回:
- 重排序后的结果
"""
_, keywords =self.qryr.question(query)
vector_size =len(sres.query_vector)
vector_column =f"q_{vector_size}_vec"
zero_vector = [0.0] * vector_size
ins_embd = []
# 提取文档向量
forchunk_idinsres.ids:
vector = sres.field[chunk_id].get(vector_column, zero_vector)
ifisinstance(vector,str):
vector = [get_float(v)forvinvector.split("\t")]
ins_embd.append(vector)
ifnotins_embd:
return[], [], []
# 处理重要关键词
foriinsres.ids:
ifisinstance(sres.field[i].get("important_kwd", []),str):
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
# 提取文本特征
ins_tw = []
foriinsres.ids:
content_ltks =list(OrderedDict.fromkeys(sres.field[i]["field"].split()))
title_tks = [tfortinsres.field[i].get("title_tks","").split()ift]
question_tks = [tfortinsres.field[i].get("question_tks","").split()ift]
important_kwd = sres.field[i].get("important_kwd", [])
# 计算加权分数
tks = content_ltks + title_tks *2+ important_kwd *5+ question_tks *6
ins_tw.append(tks)
# 计算排序特征分数
rank_fea =self._rank_feature_scores(rank_feature, sres)
# 计算混合相似度
sim, tksim, vtsim =self.qryr.hybrid_similarity(
sres.query_vector,
ins_embd,
keywords,
ins_tw, tkweight, vtweight)
# 返回综合排序分数
returnsim + rank_fea, tksim, vtsim
这个函数展示了 RAGFlow 的重排序机制如何工作:
- • 文本特征:提取内容、标题、问题和重要关键词等文本特征
- • 排序特征:通过
_rank_feature_scores函数计算额外的排序特征分数
def_rank_feature_scores(self, query_rfea, search_res):
"""计算排序特征分数"""
## For rank feature(tag_fea) scores.
rank_fea = []
pageranks = []
# 提取PageRank分数
forchunk_idinsearch_res.ids:
pageranks.append(search_res.field[chunk_id].get(PAGERANK_FLD,0))
pageranks = np.array(pageranks, dtype=float)
# 如果没有查询特征,直接返回PageRank分数
ifnotquery_rfea:
returnnp.array([0for_inrange(len(search_res.ids))]) + pageranks
# 计算查询特征与文档标签的相似度
q_denor = np.sqrt(np.sum([s*sfors,tinquery_rfea.items()ift != PAGERANK_FLD]))
foriinsearch_res.ids:
nor, denor =0,0
# 如果文档没有标签,添加0分
ifnotsearch_res.field[i].get(TAG_FLD):
rank_fea.append(0)
continue
# 计算标签相似度分数
fort, scineval(search_res.field[i].get(TAG_FLD,"{}"))).items():
iftinquery_rfea:
nor += query_rfea[t] * sc
denor += sc * sc
# 归一化分数
ifdenor ==0:
rank_fea.append(0)
else:
rank_fea.append((nor/np.sqrt(denor)/q_denor))
# 返回标签相似度分数与PageRank分数的加权和
returnnp.array(rank_fea)*10.+ pageranks
此外,RAGFlow 还提供了基于模型的重排序功能,通过rerank_by_model函数实现:
defrerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3,
vtweight=0.7, cfield="content_ltks",
rank_feature:dict|None=None):
"""
使用外部模型进行重排序
参数:
- rerank_mdl: 重排序模型
- sres: 搜索结果
- query: 查询文本
- tkweight: 词元相似度权重
- vtweight: 向量相似度权重
- cfield: 内容字段名
- rank_feature: 排序特征
返回:
- 重排序后的结果
"""
# 提取文档内容
contents = []
foriinsres.ids:
contents.append(sres.field[i].get(cfield,""))
# 使用模型计算相似度分数
scores = rerank_mdl.compute_score(query, contents)
# 计算基本重排序分数
sim, tksim, vtsim =self.rerank(sres, query, tkweight, vtweight, rank_feature)
# 返回模型分数与基本分数的加权和
returnscores *0.7+ sim *0.3, tksim, vtsim
这个函数展示了 RAGFlow 如何结合外部重排序模型与内部重排序机制,进一步提高检索结果的相关性。
3.5 引用插入与来源追踪
RAGFlow 的一个重要特性是能够在生成的回答中插入引用,追踪信息来源。这在insert_citations方法中实现:
definsert_citations(self, answer, chunks, chunk_v, embd_mdl, tkweight=0.1, vtweight=0.9):
assertlen(chunks) ==len(chunk_v)
ifnotchunks:
returnanswer,set([])
# 将回答分割成片段
pieces = re.split(r"(```)", answer)
# ...处理代码块和句子分割
# 计算每个片段与知识库块的相似度
ans_v, _ = embd_mdl.encode(pieces_)
# 计算混合相似度并插入引用
fori, ainenumerate(pieces_):
sim, tksim, vtsim =self.qryr.hybrid_similarity(
ans_v[i], chunk_v,
rag_tokenizer.tokenize(self.qryr.rmWWW(pieces_[i])).split(),
chunks_tks, tkweight, vtweight
)
# ...根据相似度插入引用
这段代码展示了 RAGFlow 如何为生成的回答添加引用:
- 2. 计算每个片段与知识库块的向量相似度和词元相似度
4. 文档存储与检索接口
RAGFlow 使用DocStoreConnection类作为文档存储和检索的接口,支持多种检索表达式:
fromrag.utils.doc_store_connimportDocStoreConnection,MatchDenseExpr,FusionExpr,OrderByExpr
主要的检索表达式包括:
这些表达式被传递给dataStore.search方法执行实际的检索操作。
5. 检索结果处理
检索结果通过SearchResult数据类进行封装:
@dataclass
classSearchResult:
total:int
ids:list[str]
query_vector:list[float] |None=None
field:dict|None=None
highlight:dict|None=None
aggregation:list|dict|None=None
keywords:list[str] |None=None
group_docs:list[list] |None=None
这个数据结构包含了检索结果的各种信息,包括总数、文档 ID、查询向量、高亮显示、聚合结果等。
6. 总结
RAGFlow 提供了"基于模板的分块"和"减少幻觉的引用"等特性。通过源码分析,我们可以看到这些特性在检索召回机制中的具体实现:
- 1.模板化分块:虽然在分析的源码中没有直接看到分块模板的实现细节,但从检索代码可以看出,系统支持对不同字段(如标题、内容)设置不同的权重,这与模板化分块的理念一致。
- 2.减少幻觉的引用:通过
insert_citations方法,RAGFlow 能够为生成的回答添加引用,追踪信息来源,从而减少幻觉。 - 3.混合检索策略:RAGFlow 结合了向量检索和全文检索,并通过可配置的权重进行融合,这与官方文档中提到的"多重召回配对融合重排序"相符。
通过对 RAGFlow 源码的分析,我们可以看到其检索找回机制的核心实现包括:
- 1.查询处理:对用户查询进行预处理、分词、权重计算和同义词扩展
- 2.混合检索:结合向量检索和全文检索,通过可配置的权重进行融合
- 3.重排序:通过
rerank函数实现多维度特征融合的重排序机制,包括:
- • 文本特征(内容、标题、问题、关键词)的加权处理
RAGFlow 的检索找回机制设计全面而灵活,能够有效地从知识库中检索相关信息,并通过多维度重排序提高检索结果的相关性,通过引用机制减少幻觉,提高回答的可靠性。