返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

RAGFlow中的Embeddings模型选择及向量数据库选型与实现分析

[复制链接]
链载Ai 显示全部楼层 发表于 1 小时前 |阅读模式 打印 上一主题 下一主题

本文继续基于 RAGFlow 源代码和官方文档,详细分析其 Embeddings 模型选择与配置以及向量数据库选型与实现的技术细节。

1. Embeddings 模型选择与配置

RAGFlow 支持多种 Embeddings 模型,通过rag/llm/embedding_model.py实现了丰富的模型接口和配置选项。

1.1 模型架构设计

RAGFlow 采用了抽象基类设计模式,通过Base类定义了所有 Embedding 模型必须实现的接口:

classBase(ABC):
def__init__(self, key, model_name):
pass

defencode(self, texts:list):
raiseNotImplementedError("lease implement encode method!")

defencode_queries(self, text:str):
raiseNotImplementedError("lease implement encode method!")

deftotal_token_count(self, resp):
try:
returnresp.usage.total_tokens
exceptException:
pass
try:
returnresp["usage"]["total_tokens"]
exceptException:
pass
return0

这种设计使得 RAGFlow 可以轻松支持和扩展不同的 Embedding 模型,只需实现特定的接口方法。

1.2 支持的 Embedding 模型

从源码中可以看到,RAGFlow 支持以下 Embedding 模型:

  1. 1.DefaultEmbedding:默认使用 FlagEmbedding 模型,基于 BAAI/bge-large-zh-v1.5
    classDefaultEmbedding(Base):
    os.environ['CUDA_VISIBLE_DEVICES'] ='0'
    _model =None
    _model_name =""
    _model_lock = threading.Lock()

    def__init__(self, key, model_name, **kwargs):
    ifnotsettings.LIGHTEN:
    withDefaultEmbedding._model_lock:
    fromFlagEmbeddingimportFlagModel
    importtorch
    ifnotDefaultEmbedding._modelormodel_name != DefaultEmbedding._model_name:
    try:
    DefaultEmbedding._model = FlagModel(os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/","", model_name)),
    query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
    use_fp16=torch.cuda.is_available())
    DefaultEmbedding._model_name = model_name
    exceptException:
    model_dir = snapshot_download(repo_id="BAAI/bge-large-zh-v1.5",
    local_dir=os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/","", model_name)),
    local_dir_use_symlinks=False)
    DefaultEmbedding._model = FlagModel(model_dir,
    query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
    use_fp16=torch.cuda.is_available())
    self._model = DefaultEmbedding._model
    self._model_name = DefaultEmbedding._model_name
  2. 2.OpenAIEmbed:使用 OpenAI 的 embedding 模型
    classOpenAIEmbed(Base):
    def__init__(self, key, model_name="text-embedding-ada-002", base_url="https://api.openai.com/v1"):
    ifnotbase_url:
    base_url ="https://api.openai.com/v1"
    self.client = OpenAI(api_key=key, base_url=base_url)
    self.model_name = model_name
  3. 3.LocalAIEmbed:支持本地部署的 embedding 模型
    classLocalAIEmbed(Base):
    def__init__(self, key, model_name, base_url):
    ifnotbase_url:
    raiseValueError("Local embedding model url cannot be None")
    base_url = urljoin(base_url,"v1")
    self.client = OpenAI(api_key="empty", base_url=base_url)
    self.model_name = model_name.split("___")[0]
  4. 4.AzureEmbed:Azure OpenAI 的 embedding 模型
    classAzureEmbed(OpenAIEmbed):
    def__init__(self, key, model_name, **kwargs):
    fromopenai.lib.azureimportAzureOpenAI
    api_key = json.loads(key).get('api_key','')
    api_version = json.loads(key).get('api_version','2024-02-01')
    self.client = AzureOpenAI(api_key=api_key, azure_endpoint=kwargs["base_url"], api_version=api_version)
    self.model_name = model_name
  5. 5.BaiChuanEmbed:百川 AI 的 embedding 模型
    classBaiChuanEmbed(OpenAIEmbed):
    def__init__(self, key, model_name='Baichuan-Text-Embedding', base_url='https://api.baichuan-ai.com/v1'):
    ifnotbase_url:
    base_url ="https://api.baichuan-ai.com/v1"
    super().__init__(key, model_name, base_url)
  6. 6.QWenEmbed:通义千问的 embedding 模型
    classQWenEmbed(Base):
    def__init__(self, key, model_name="text_embedding_v2", **kwargs):
    self.key = key
    self.model_name = model_name
  7. 7.ZhipuEmbed:智谱 AI 的 embedding 模型
    classZhipuEmbed(Base):
    def__init__(self, key, model_name="embedding-2", **kwargs):
    self.client = ZhipuAI(api_key=key)
    self.model_name = model_name
  8. 8.OllamaEmbed:Ollama 的 embedding 模型
    classOllamaEmbed(Base):
    def__init__(self, key, model_name, **kwargs):
    self.client = Client(host=kwargs["base_url"])ifnotkeyorkey =="x"else\
    Client(host=kwargs["base_url"], headers={"Authorization":f"Bear{key}"})
    self.model_name = model_name
  9. 9.GoogleEmbed:Google 的 embedding 模型
    classGoogleEmbed(Base):
    def__init__(self, key, model_name="embedding-001", **kwargs):
    genai.configure(api_key=key)
    self.model_name = model_name

1.3 批处理优化

RAGFlow 在 embedding 模型实现中采用了批处理优化,以提高处理效率:

defencode(self, texts:list):
batch_size =16
texts = [truncate(t,2048)fortintexts]
token_count =0
fortintexts:
token_count += num_tokens_from_string(t)
ress = []
foriinrange(0,len(texts), batch_size):
ress.extend(self._model.encode(texts[i:i + batch_size]).tolist())
returnnp.array(ress), token_count

这种批处理方式可以减少 API 调用次数,提高效率。不同的模型实现了不同的批处理策略,例如:

  • • DefaultEmbedding:批量大小为 16
  • • OpenAIEmbed:批量大小为 16,并限制文本长度为 8191
  • • QWenEmbed:批量大小为 4,并限制文本长度为 2048

1.4 文本长度处理

RAGFlow 对不同模型的文本长度限制进行了处理:

# 在OpenAIEmbed中
texts = [truncate(t,8191)fortintexts]

# 在QWenEmbed中
texts = [truncate(t,2048)fortintexts]

# 在ZhipuEmbed中
ifself.model_name.lower() =="embedding-2":
MAX_LEN =512
ifself.model_name.lower() =="embedding-3":
MAX_LEN =3072
ifMAX_LEN >0:
texts = [truncate(t, MAX_LEN)fortintexts]

这种处理确保了文本不会超出模型的最大长度限制,避免了 API 调用错误。

1.5 默认模型选择

根据官方文档和源码,RAGFlow 的 Docker 镜像(非 slim 版本)预装了两个优化的 embedding 模型:

  1. 1. BAAI/bge-large-zh-v1.5
  2. 2. maidalun1020/bce-embedding-base_v1

这两个模型专为中英文优化,提供了良好的多语言支持。在DefaultEmbedding类中,如果未指定模型,默认使用 BAAI/bge-large-zh-v1.5。

2. 向量数据库选型与实现

RAGFlow 采用了灵活的向量数据库架构,通过抽象接口支持多种向量数据库。

2.1 数据库抽象接口

RAGFlow 在rag/utils/doc_store_conn.py中定义了向量数据库的抽象接口:

classDocStoreConnection(ABC):
"""
Database operations
"""
@abstractmethod
defdbType(self) ->str:
"""
Return the type of the database.
"""
raiseNotImplementedError("Not implemented")

@abstractmethod
defhealth(self) ->dict:
"""
Return the health status of the database.
"""
raiseNotImplementedError("Not implemented")

"""
Table operations
"""
@abstractmethod
defcreateIdx(self, indexName:str, knowledgebaseId:str, vectorSize:int):
"""
Create an index with given name
"""
raiseNotImplementedError("Not implemented")

@abstractmethod
defdeleteIdx(self, indexName:str, knowledgebaseId:str):
"""
Delete an index with given name
"""
raiseNotImplementedError("Not implemented")

@abstractmethod
defindexExist(self, indexName:str, knowledgebaseId:str) ->bool:
"""
Check if an index with given name exists
"""
raiseNotImplementedError("Not implemented")

"""
CRUD operations
"""
@abstractmethod
defsearch(
self, selectFields:list[str], highlightFields:list[str],
condition:dict, matchExprs:list[MatchExpr],
orderBy: OrderByExpr, offset:int, limit:int,
indexNames:str|list[str], knowledgebaseIds:list[str],
aggFields:list[str] = [], rank_feature:dict|None=None
):
"""
Search with given conjunctive equivalent filtering condition and return all fields of matched documents
"""
raiseNotImplementedError("Not implemented")

这种抽象接口设计使得 RAGFlow 可以轻松支持不同的向量数据库,只需实现特定的接口方法。

2.2 OpenSearch 实现

RAGFlow 默认使用 OpenSearch 作为向量数据库,通过rag/utils/opensearch_coon.py实现:

@singleton
classOSConnection(DocStoreConnection):
def__init__(self):
self.info = {}
logger.info(f"Use OpenSearch{settings.OS['hosts']}as the doc engine.")
for_inrange(ATTEMPT_TIME):
try:
self.os = OpenSearch(
settings.OS["hosts"].split(","),
http_auth=(settings.OS["username"], settings.OS["password"])if"username"insettings.OSand"password"insettings.OSelseNone,
verify_certs=False,
timeout=600
)
ifself.os:
self.info =self.os.info()
break
exceptExceptionase:
logger.warning(f"{str(e)}. Waiting OpenSearch{settings.OS['hosts']}to be healthy.")
time.sleep(5)

OpenSearch 实现了所有必要的接口方法,包括索引创建、删除、搜索等。

2.3 索引设计

RAGFlow 为每个知识库创建独立的索引,使用命名规则ragflow_{uid}确保索引唯一性:

defcreateIdx(self, indexName:str, knowledgebaseId:str, vectorSize:int):
ifself.indexExist(indexName, knowledgebaseId):
returnTrue
try:
fromopensearchpy.clientimportIndicesClient
returnIndicesClient(self.os).create(index=indexName, body=self.mapping)
exceptException:
logger.exception("OSConnection.createIndex error %s"% (indexName))

索引结构通过配置文件conf/os_mapping.json定义,确保了向量数据的正确存储和检索。

2.4 混合检索策略

RAGFlow 实现了混合检索策略,结合关键词搜索和向量相似度搜索:

defsearch(
self, selectFields:list[str], highlightFields:list[str],
condition:dict, matchExprs:list[MatchExpr],
orderBy: OrderByExpr, offset:int, limit:int,
indexNames:str|list[str], knowledgebaseIds:list[str],
aggFields:list[str] = [], rank_feature:dict|None=None
):
use_knn =False
ifisinstance(indexNames,str):
indexNames = indexNames.split(",")
assertisinstance(indexNames,list)andlen(indexNames) >0
assert"_id"notincondition

bqry = Q("bool", must=[])
condition["kb_id"] = knowledgebaseIds

# ... 构建查询条件 ...

s = Search()
vector_similarity_weight =0.5

forminmatchExprs:
ifisinstance(m, FusionExpr)andm.method =="weighted_sum"and"weights"inm.fusion_params:
assertlen(matchExprs) ==3andisinstance(matchExprs[0], MatchTextExpr)andisinstance(matchExprs[1], MatchDenseExpr)andisinstance(matchExprs[2], FusionExpr)
weights = m.fusion_params["weights"]
vector_similarity_weight =float(weights.split(",")[1])

knn_query = {}
forminmatchExprs:
ifisinstance(m, MatchTextExpr):
# 关键词搜索
minimum_should_match = m.extra_options.get("minimum_should_match",0.0)
ifisinstance(minimum_should_match,float):
minimum_should_match =str(int(minimum_should_match *100)) +"%"
bqry.must.append(Q("query_string", fields=m.fields,type="best_fields", query=m.matching_text, minimum_should_match=minimum_should_match, boost=1))
bqry.boost =1.0- vector_similarity_weight

elifisinstance(m, MatchDenseExpr):
# 向量相似度搜索
assert(bqryisnotNone)
similarity =0.0
if"similarity"inm.extra_options:
similarity = m.extra_options["similarity"]
use_knn =True
vector_column_name = m.vector_column_name
knn_query[vector_column_name] = {}
knn_query[vector_column_name]["vector"] =list(m.embedding_data)
knn_query[vector_column_name]["k"] = m.topn
knn_query[vector_column_name]["filter"] = bqry.to_dict()
knn_query[vector_column_name]["boost"] = similarity

这种混合检索策略可以提高检索质量,结合了关键词搜索的精确性和向量相似度搜索的语义理解能力。

2.5 权重配置

RAGFlow 支持配置关键词搜索和向量相似度搜索的权重:

vector_similarity_weight =0.5
forminmatchExprs:
ifisinstance(m, FusionExpr)andm.method =="weighted_sum"and"weights"inm.fusion_params:
assertlen(matchExprs) ==3andisinstance(matchExprs[0], MatchTextExpr)andisinstance(matchExprs[1], MatchDenseExpr)andisinstance(matchExprs[2], FusionExpr)
weights = m.fusion_params["weights"]
vector_similarity_weight =float(weights.split(",")[1])

默认情况下,向量相似度权重为 0.5,关键词搜索权重为 0.5。用户可以根据需要调整这些权重,以优化检索结果。

2.6 批量操作优化

RAGFlow 实现了批量操作以提高写入效率:

definsert(self, rows:list[dict], indexName:str, knowledgebaseId:str=None) ->list[str]:
"""
Update or insert a bulk of rows
"""
iflen(rows) ==0:
return[]

actions = []
ids = []

forrowinrows:
if"_id"inrow:
_id= row["_id"]
delrow["_id"]
else:
_id=None

action = {
"_index": indexName,
"_source": row
}

if_id:
action["_id"] = _id
ids.append(_id)

actions.append(action)

try:
fromopensearchpy.helpersimportbulk
success, failed = bulk(self.os, actions, stats_only=True)
returnids
exceptException:
logger.exception("OSConnection.insert error")
return[]

这种批量操作方式可以减少 API 调用次数,提高写入效率。

3. 配置与集成

3.1 Embedding 模型配置

根据官方文档,RAGFlow 允许为不同知识库选择不同的 embedding 模型:

Anembeddingmodelconvertschunksintoembeddings.Itcannotbechangedoncetheknowledgebasehaschunks.Toswitchtoadifferentembeddingmodel,youmustdeleteallexistingchunksintheknowledgebase.Theobviousreasonisthatwemustensurethatfilesinaspecificknowledgebaseareconvertedtoembeddingsusingthesameembeddingmodel(ensurethattheyarecomparedinthesameembeddingspace).

这种设计确保了同一知识库内所有文档使用相同的 embedding 模型,保证了向量空间的一致性。

3.2 向量数据库配置

RAGFlow 通过配置文件设置向量数据库连接参数:

self.os = OpenSearch(
settings.OS["hosts"].split(","),
http_auth=(settings.OS["username"], settings.OS["password"])if"username"insettings.OSand"password"insettings.OSelseNone,
verify_certs=False,
timeout=600
)

这些参数可以通过环境变量或配置文件进行设置,使得 RAGFlow 可以灵活地连接不同的 OpenSearch 实例。

3.3 检索参数配置

RAGFlow 支持配置多种检索参数,如相似度阈值、向量相似度权重等:

RAGFlow uses multiple recall of both full-text search and vector search in its chats. Prior to setting up an AI chat, consider adjusting the following parameters to ensure that the intended information always turns up in answers:

* Similarity threshold: Chunks with similarities below the threshold will be filtered. By default, it is set to 0.2.
* Vector similarity weight: The percentage by which vector similarity contributes to the overall score. By default, it is set to 0.3.

这些参数可以根据具体需求进行调整,以优化检索结果。

4. 总结

RAGFlow 在 Embeddings 模型选择与配置以及向量数据库选型与实现方面具有以下特点:

  1. 1.多模型支持:支持多种 embedding 模型,包括 OpenAI、智谱 AI、通义千问等,以及本地部署的模型。
  2. 2.批处理优化:实现了批量编码以提高效率,并自动处理长文本截断。
  3. 3.一致性保证:确保同一知识库内所有文档使用相同的 embedding 模型,保证向量空间的一致性。
  4. 4.灵活配置:允许为不同知识库选择不同的 embedding 模型,支持通过配置文件调整模型参数。
  5. 5.默认使用 OpenSearch:作为默认的向量数据库,提供高性能的检索能力和良好的扩展性。
  6. 6.混合检索策略:结合关键词搜索和向量相似度搜索,通过加权融合提高检索质量。
  7. 7.批量操作优化:实现了批量操作以提高写入效率,减少 API 调用次数。

这些特点使得 RAGFlow 能够灵活地适应不同的应用场景,提供高质量的检索结果。


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ