返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?

[复制链接]
链载Ai 显示全部楼层 发表于 1 小时前 |阅读模式 打印 上一主题 下一主题

一、整体架构

1.1 架构概述

dify 的 RAG(检索增强生成)架构是一个完整的文档处理、索引和检索系统,旨在提高大语言模型生成内容的准确性和相关性。该架构由三个主要模块组成:文档处理模块、向量化与索引模块、检索与重排模块。

整个系统的数据流如下:

  1. 用户上传文档或提供URL
  2. 文档处理模块提取文本内容并进行清洗
  3. 文本被分割成适当大小的段落(chunks)
  4. 向量化模块将这些段落转换为向量表示并存储在向量数据库中
  5. 同时创建关键词索引以支持混合搜索
  6. 用户查询时,检索模块根据配置的检索方法找到相关段落
  7. 重排模块对检索结果进行排序优化
  8. 最终将优化后的上下文提供给大语言模型生成回答

1.2 数据模型关系

Dify RAG 系统的核心数据模型包括:

  • Dataset:知识库,包含多个文档,是 RAG 的基本单位
  • Document:文档,包含元数据和处理规则
  • DocumentSegment:文档分段,是实际被索引和检索的最小单位
  • DatasetKeywordTable:关键词表,用于支持关键词搜索

这些实体之间的关系是:一个 Dataset 包含多个 Document,一个 Document 包含多个 DocumentSegment。Dataset 通过index_struct 字段存储向量数据库的配置信息。

二、文档处理模块

2.1 设计思路

文档处理模块负责将各种格式的文档转换为可被索引的文本段落。该模块采用 ETL(提取-转换-加载)模式设计,具有高度的可扩展性和灵活性。

主要功能包括:

  • 支持多种文档格式的文本提取
  • 文本清洗和预处理
  • 文本分段(chunking)
  • 元数据提取和管理

2.1 文档处理架构

2.3 核心组件

ExtractProcessor

ExtractProcessor是文档处理的入口,负责根据文档类型选择合适的提取器进行文本提取。它支持多种文档格式,包括:

  • 文本文件(TXT)
  • PDF文档
  • Word文档(DOCX、DOC)
  • Excel表格(XLSX、XLS)
  • PowerPoint演示文稿(PPTX、PPT)
  • HTML网页
  • Markdown文档
  • 电子邮件(EML、MSG)
  • 电子书(EPUB)
  • XML文件
  • CSV数据
  • Notion导出文件

提取器采用策略模式设计,可以轻松扩展以支持新的文档格式。

ExtractProcessor类实现
classExtractProcessor:
@classmethod
defextract(cls, extract_setting: ExtractSetting, is_automatic: bool = False,
file_path: str = None)-> list[Document]:
# 根据数据源类型和文件类型选择合适的提取器
ifextract_setting.datasource_type == DatasourceType.FILE.value:
# 根据文件扩展名选择不同的提取器
iffile_extension =='.pdf':
extractor = PdfExtractor(file_path)
eliffile_extensionin['.md','.markdown']:
extractor = UnstructuredMarkdownExtractor(file_path) \
ifis_automaticelseMarkdownExtractor(file_path)
# ... 其他文件类型的处理

returnextractor.extract()
文档提取流程

TextSplitter

TextSplitter负责将长文本分割成适当大小的段落,是 RAG 系统性能的关键组件。它提供以下功能:

  • 基于不同分隔符的文本分割
  • 支持设置最大 token 数量
  • 支持段落重叠以保持上下文连贯性
  • 支持不同的分词器(Tiktoken、HuggingFace)进行准确的 token 计算

分割策略可以通过处理规则进行配置,允许用户根据不同类型的文档调整分割参数。

TextSplitter实现类
classTextSplitter(BaseDocument, ABC):
def__init__(
self,
chunk_size: int =4000,
chunk_overlap: int =200,
length_function: Callable[[str], int] = len,
keep_: bool = False,
add_start_index: bool = False,
)->None:
# 初始化分割器参数
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function
self._keep_separator = keep_separator
self._add_start_index = add_start_index

@abstractmethod
defsplit_text(self, text: str)-> list[str]:
"""Split text into multiple components."""

defcreate_documents(
self, texts: list[str], metadatas: Optional[list[dict]] = None
)-> list[Document]:
# 从分割后的文本创建文档对象

文档切块是RAG中的关键步骤,Dify 使用TextSplitter类及其子类来实现不同的切块策略:

  • FixedRecursiveCharacterTextSplitter:固定大小的递归字符分割
  • EnhanceRecursiveCharacterTextSplitter:增强的递归字符分割

切块后的文档片段会保留原始文档的元数据,并添加唯一标识符(doc_id)和内容哈希值(doc_hash)。

文本分割流程

TextCleaner

TextCleaner负责文本清洗,去除不必要的格式和噪声,提高索引和检索质量。清洗操作包括:

  • 移除多余空格
  • 去除URL和电子邮件(可选)
  • 标准化文本格式
  • 处理特殊字符

三、向量化与索引模块

3.1 设计思路

向量化与索引模块负责将文本段落转换为向量表示并存储在向量数据库中,同时创建关键词索引以支持混合搜索。该模块采用工厂模式和适配器模式设计,支持多种向量数据库和嵌入模型。

主要功能包括:

  • 文本向量化(嵌入)
  • 向量存储和索引
  • 关键词提取和索引
  • 缓存管理

3.2 向量化流程图

3.3 核心组件

嵌入缓存(CacheEmbedding)

Dify使用CacheEmbedding类来管理文本嵌入过程,它具有缓存功能,可以避免重复计算嵌入向量:

  1. 对于每个文本块,首先计算其哈希值
  2. 检查数据库中是否已存在该哈希值对应的嵌入向量
  3. 如果存在,直接使用缓存的向量
  4. 如果不存在,调用嵌入模型生成向量,并将结果存入缓存
classCacheEmbedding(Embeddings):
def__init__(self, model_instance: ModelInstance, user: Optional[str] = None)->None:
self._model_instance = model_instance
self._user = user

defembed_documents(self, texts: list[str])-> list[list[]]:
# 使用文档嵌入缓存或存储(如果不存在)
text_embeddings = [Nonefor_inrange(len(texts))]
embedding_queue_indices = []

# 检查缓存中是否存在嵌入
fori, textinenumerate(texts):
hash = helper.generate_text_hash(text)
embedding = db.session.query(Embedding).filter_by(
model_name=self._model_instance.model,
hash=hash,
provider_name=self._model_instance.provider
).first()

ifembedding:
text_embeddings[i] = embedding.get_embedding()
else:
embedding_queue_indices.append(i)

# 处理未缓存的嵌入
ifembedding_queue_indices:
# 生成嵌入并缓存

嵌入缓存流程

向量工厂(VectorFactory)

Dify 支持多种向量数据库,通过工厂模式实现:

classVector:
def__init__(self, dataset: Dataset, attributes: list = None):
ifattributesisNone:
attributes = ['doc_id','dataset_id','document_id','doc_hash']
self._dataset = dataset
self._embeddings = self._get_embeddings()
self._attributes = attributes
self._vector_processor = self._init_vector()

def_init_vector(self)-> BaseVector:
vector_type = dify_config.VECTOR_STORE
ifself._dataset.index_struct_dict:
vector_type = self._dataset.index_struct_dict['type']

vector_factory_cls = self.get_vector_factory(vector_type)
returnvector_factory_cls().init_vector(self._dataset, self._attributes, self._embeddings)

@staticmethod
defget_vector_factory(vector_type: str)-> type[AbstractVectorFactory]:
match vector_type:
case VectorType.CHROMA:
core.rag.datasource.vdb.chroma.chroma_vectorimportChromaVectorFactory
returnChromaVectorFactory
# ... 其他向量数据库的支持

向量工厂模式

支持的向量数据库包括:

  • Chroma
  • Milvus
  • MyScale
  • PGVector
  • Qdrant
  • Relyt
  • Elasticsearch
  • TiDB Vector
  • Weaviate
  • Tencent
  • Oracle
  • OpenSearch
  • AnalyticDB

索引处理器(IndexingRunner)

IndexingRunner负责协调整个索引过程,包括文档提取、转换和加载。它实现了完整的 ETL 流程:

  • Extract:从原始文档中提取文本
  • Transform:清洗和分割文本
  • Load:将处理后的段落加载到向量数据库和关键词索引中
classIndexingRunner:
def__init__(self, dataset, document_id, document_model, document_content, document_metadata, tenant_id, user_id):
# 初始化

defrun(self):
# 1. 提取文本
documents = self.extract()

# 2. 转换(分割和清洗)
= self.transform(documents)

# 3. 保存段落
self.save_segments()

# 4. 加载到索引
self.load()

索引处理流程

段落索引处理器(ParagraphIndexProcessor)

段落索引处理器实现了提取、转换、加载和检索方法:

classParagraphIndexProcessor(IndexProcessorBase):
defextract(self, extract_setting: ExtractSetting)-> list[Document]:
# 提取文档
returnExtractProcessor.extract(extract_setting)

deftransform(self, documents: list[Document])-> list[Document]:
# 清洗和分割文本
cleaned_documents = self.clean(documents)
returnself.split(cleaned_documents)

defload(self, dataset_id: str,: list[Document])->None:
# 创建向量索引和关键词索引
self.create_vector_index(dataset_id, segments)
self.create_keyword_index(dataset_id, segments)

defretrieve(self, dataset_id: str, query: str, **kwargs)-> list[Document]:
# 调用检索服务
returnService.retrieve(
retrival_method=kwargs.get('_method','semantic_search'),
dataset_id=dataset_id,
query=query,
top_k=kwargs.get('top_k',2),
score_threshold=kwargs.get('score_threshold',0.0),
reranking_model=kwargs.get('reranking_model'),
reranking_mode=kwargs.get('reranking_mode','reranking_model'),
weights=kwargs.get('weights')
)

Jieba关键词索引

Jieba组件负责从文本中提取关键词并建立索引,支持关键词搜索和混合搜索。它提供:

  • 关键词提取
  • TF-IDF权重计算
  • 关键词索引创建和更新

四、检索与重排模块

4.1 设计思路

检索与重排模块负责根据用户查询找到最相关的文本段落,并对结果进行优化排序。该模块支持多种检索方法和重排策略,可以根据不同场景进行配置。

主要功能包括:

  • 语义检索(向量相似度搜索)
  • 关键词检索(基于关键词匹配)
  • 全文检索(基于倒排索引)
  • 混合检索(结合多种检索方法)
  • 结果重排序(基于多种因素)

4.2 检索与重排流程图

4.3 核心组件

检索方法(RetrievalMethod)

RetrievalMethod是一个枚举类,定义了系统支持的检索方法:

  • SEMANTIC_SEARCH:语义搜索,基于向量相似度
  • FULL_TEXT_SEARCH:全文搜索,基于倒排索引
  • HYBRID_SEARCH:混合搜索,结合语义和关键词
classMethod(Enum):
SEMANTIC_SEARCH ='semantic_search'
FULL_TEXT_SEARCH ='full_text_search'
HYBRID_SEARCH ='hybrid_search'

@staticmethod
defis_support_semantic_search(retrieval_method: str)-> bool:
returnretrieval_methodin{Method.SEMANTIC_SEARCH.value, RetrievalMethod.HYBRID_SEARCH.value}

@staticmethod
defis_support_fulltext_search(_method: str)-> bool:
returnretrieval_methodin{RetrievalMethod.FULL_TEXT_SEARCH.value, RetrievalMethod.HYBRID_SEARCH.value}

检索方法流程图

检索服务(RetrievalService)

Service是检索功能的核心实现,负责根据配置的检索方法执行搜索并返回结果。它提供以下功能:

  • 支持多种检索方法
  • 多线程并行检索
  • 结果合并和排序
  • 支持重排序

主要方法包括:

  • retrieve:主检索方法,根据检索方法执行搜索
  • keyword_search:关键词搜索实现
  • embedding_search:向量相似度搜索实现
  • full_text_index_search:全文索引搜索实现

检索服务协调不同的检索方法并处理结果:

classService:
@classmethod
defretrieve(cls, retrival_method: str, dataset_id: str, query: str,
top_k: int, score_threshold: Optional[] =.0,
reranking_model: Optional[dict] = None, reranking_mode: Optional[str] ='reranking_model',
weights: Optional[dict] = None):
# 获取数据集
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
ifnotdatasetordataset._document_count ==0ordataset._segment_count ==0:
return[]

all_documents = []
threads = []
exceptions = []

# 关键词搜索
ifretrival_method =='keyword_search':
keyword_thread = threading.Thread(target=Service.keyword_search, kwargs={...})
threads.append(keyword_thread)
keyword_thread.start()

# 语义搜索
ifMethod.is_support_semantic_search(retrival_method):
embedding_thread = threading.Thread(target=Service.embedding_search, kwargs={...})
threads.append(embedding_thread)
embedding_thread.start()

# 全文搜索
ifMethod.is_support_fulltext_search(retrival_method):
full_text_index_thread = threading.Thread(target=RetrievalService.full_text_index_search, kwargs={...})
threads.append(full_text_index_thread)
full_text_index_thread.start()

# 等待所有线程完成
forthreadinthreads:
thread.join()

# 混合搜索需要后处理
ifretrival_method ==Method.HYBRID_SEARCH.value:
data_post_processor = DataPostProcessor(str(dataset.tenant_id), reranking_mode,
reranking_model, weights,False)
all_documents = data_post_processor.invoke(
query=query,
documents=all_documents,
score_threshold=score_threshold,
top_n=top_k
)

returnall_documents

检索服务时序图

权重重排(WeightRerankRunner)

WeightRerankRunner实现了基于权重的重排序策略,可以根据不同因素对检索结果进行优化排序。它考虑以下因素:

  • 向量相似度得分
  • 关键词匹配得分
  • 自定义权重配置

通过调整不同因素的权重,可以优化检索结果的相关性和质量。

权重重排结合了关键词得分和向量相似度得分:

classWeightRerankRunner:
def__init__(self, tenant_id: str, weights: Weights)->None:
self.tenant_id = tenant_id
self.weights = weights

defrun(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
top_n: Optional[int] = None, user: Optional[str] = None)-> list[Document]:
# 去重
docs = []
doc_id = []
unique_documents = []
fordocumentindocuments:
ifdocument.metadata['doc_id']notindoc_id:
doc_id.append(document.metadata['doc_id'])
docs.append(document.page_content)
unique_documents.append(document)

documents = unique_documents

# 计算关键词得分和向量得分
rerank_documents = []
query_scores = self._calculate_keyword_score(query, documents)
query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting)

# 合并得分
fordocument, query_score, query_vector_scoreinzip(documents, query_scores, query_vector_scores):
score = self.weights.vector_setting.vector_weight * query_vector_score + \
self.weights.keyword_setting.keyword_weight * query_score
ifscore_thresholdandscore < score_threshold:
continue
document.metadata['score'] = score
rerank_documents.append(document)

# 排序并返回结果
rerank_documents = sorted(rerank_documents, key=lambdax: x.metadata['score'], reverse=True)
returnrerank_documents[:top_n]iftop_nelsererank_documents

权重重排流程图

数据后处理器(DataPostProcessor)

DataPostProcessor负责对检索结果进行后处理,包括重排序、去重和格式化。它支持多种重排序策略,可以根据不同场景进行配置。

数据后处理器负责重排序和结果处理:

classDataPostProcessor:
def__init__(self, tenant_id: str, reranking_mode: str, reranking_model: Optional[dict],
weights: Optional[dict], enable_reranking: bool = True):
# 初始化

definvoke(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
top_n: Optional[int] = None)-> list[Document]:
# 根据模式选择重排序方法
ifself.enable_rerankingandself.reranking_mode == RerankMode.RERANKING_MODEL.value:
# 使用模型重排序
returnself.rerank_model_runner.run(query, documents, score_threshold, top_n)
elifself.enable_rerankingandself.reranking_mode == RerankMode.WEIGHT.value:
# 使用权重重排序
returnself.weight_rerank_runner.run(query, documents, score_threshold, top_n)
else:
# 不重排序,直接返回
returndocuments[:top_n]iftop_nelsedocuments

数据后处理流程图

五、总结

Dify 的 RAG 系统是一个功能完整、灵活可扩展的检索增强生成框架,具有以下特点:

  1. 多格式文档支持:支持 PDF、Word、Excel、Markdown、HTML、CSV 等多种文档格式。

  2. 灵活的文本分块:支持多种分块策略,包括固定大小、基于分隔符和基于语义的分块。

  3. 高效的向量化:实现了嵌入缓存机制,提高了向量化效率。

  4. 多样的存储选项:支持 Chroma、Milvus、PGVector、Qdrant 等多种向量数据库。

  5. 多种检索方法:支持语义搜索、全文搜索、关键词搜索和混合搜索。

  6. 高级重排序:支持基于模型的重排序和基于权重的重排序。

  7. 并行处理:使用多线程并行执行不同的检索方法,提高效率。

  8. 可扩展架构:采用工厂模式和抽象基类,便于扩展新的功能。

通过这些组件的协同工作,Dify 的 RAG 系统能够有效地处理和检索大量文档,为大语言模型提供准确的上下文信息,从而生成更加准确和相关的回答。ingFang SC", system-ui, -apple-system, "system-ui", "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif; font-size: 17px; letter-spacing: 0.544px; text-align: right; word-spacing: 0px; -webkit-tap-highlight-color: transparent; margin: 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ