本文继续基于 RAGFlow 源代码和官方文档,详细分析其 Embeddings 模型选择与配置以及向量数据库选型与实现的技术细节。
RAGFlow 支持多种 Embeddings 模型,通过rag/llm/embedding_model.py实现了丰富的模型接口和配置选项。
RAGFlow 采用了抽象基类设计模式,通过Base类定义了所有 Embedding 模型必须实现的接口:
classBase(ABC):
def__init__(self, key, model_name):
pass
defencode(self, texts:list):
raiseNotImplementedError("
lease implement encode method!")
defencode_queries(self, text:str):
raiseNotImplementedError("
lease implement encode method!")
deftotal_token_count(self, resp):
try:
returnresp.usage.total_tokens
exceptException:
pass
try:
returnresp["usage"]["total_tokens"]
exceptException:
pass
return0这种设计使得 RAGFlow 可以轻松支持和扩展不同的 Embedding 模型,只需实现特定的接口方法。
从源码中可以看到,RAGFlow 支持以下 Embedding 模型:
classDefaultEmbedding(Base):
os.environ['CUDA_VISIBLE_DEVICES'] ='0'
_model =None
_model_name =""
_model_lock = threading.Lock()
def__init__(self, key, model_name, **kwargs):
ifnotsettings.LIGHTEN:
withDefaultEmbedding._model_lock:
fromFlagEmbeddingimportFlagModel
importtorch
ifnotDefaultEmbedding._modelormodel_name != DefaultEmbedding._model_name:
try:
DefaultEmbedding._model = FlagModel(os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/","", model_name)),
query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
use_fp16=torch.cuda.is_available())
DefaultEmbedding._model_name = model_name
exceptException:
model_dir = snapshot_download(repo_id="BAAI/bge-large-zh-v1.5",
local_dir=os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/","", model_name)),
local_dir_use_symlinks=False)
DefaultEmbedding._model = FlagModel(model_dir,
query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
use_fp16=torch.cuda.is_available())
self._model = DefaultEmbedding._model
self._model_name = DefaultEmbedding._model_nameclassOpenAIEmbed(Base):
def__init__(self, key, model_name="text-embedding-ada-002", base_url="https://api.openai.com/v1"):
ifnotbase_url:
base_url ="https://api.openai.com/v1"
self.client = OpenAI(api_key=key, base_url=base_url)
self.model_name = model_nameclassLocalAIEmbed(Base):
def__init__(self, key, model_name, base_url):
ifnotbase_url:
raiseValueError("Local embedding model url cannot be None")
base_url = urljoin(base_url,"v1")
self.client = OpenAI(api_key="empty", base_url=base_url)
self.model_name = model_name.split("___")[0]classAzureEmbed(OpenAIEmbed):
def__init__(self, key, model_name, **kwargs):
fromopenai.lib.azureimportAzureOpenAI
api_key = json.loads(key).get('api_key','')
api_version = json.loads(key).get('api_version','2024-02-01')
self.client = AzureOpenAI(api_key=api_key, azure_endpoint=kwargs["base_url"], api_version=api_version)
self.model_name = model_nameclassBaiChuanEmbed(OpenAIEmbed):
def__init__(self, key, model_name='Baichuan-Text-Embedding', base_url='https://api.baichuan-ai.com/v1'):
ifnotbase_url:
base_url ="https://api.baichuan-ai.com/v1"
super().__init__(key, model_name, base_url)classQWenEmbed(Base):
def__init__(self, key, model_name="text_embedding_v2", **kwargs):
self.key = key
self.model_name = model_nameclassZhipuEmbed(Base):
def__init__(self, key, model_name="embedding-2", **kwargs):
self.client = ZhipuAI(api_key=key)
self.model_name = model_nameclassOllamaEmbed(Base):
def__init__(self, key, model_name, **kwargs):
self.client = Client(host=kwargs["base_url"])ifnotkeyorkey =="x"else\
Client(host=kwargs["base_url"], headers={"Authorization":f"Bear{key}"})
self.model_name = model_nameclassGoogleEmbed(Base):
def__init__(self, key, model_name="embedding-001", **kwargs):
genai.configure(api_key=key)
self.model_name = model_nameRAGFlow 在 embedding 模型实现中采用了批处理优化,以提高处理效率:
defencode(self, texts:list):
batch_size =16
texts = [truncate(t,2048)fortintexts]
token_count =0
fortintexts:
token_count += num_tokens_from_string(t)
ress = []
foriinrange(0,len(texts), batch_size):
ress.extend(self._model.encode(texts[i:i + batch_size]).tolist())
returnnp.array(ress), token_count这种批处理方式可以减少 API 调用次数,提高效率。不同的模型实现了不同的批处理策略,例如:
RAGFlow 对不同模型的文本长度限制进行了处理:
# 在OpenAIEmbed中
texts = [truncate(t,8191)fortintexts]
# 在QWenEmbed中
texts = [truncate(t,2048)fortintexts]
# 在ZhipuEmbed中
ifself.model_name.lower() =="embedding-2":
MAX_LEN =512
ifself.model_name.lower() =="embedding-3":
MAX_LEN =3072
ifMAX_LEN >0:
texts = [truncate(t, MAX_LEN)fortintexts]这种处理确保了文本不会超出模型的最大长度限制,避免了 API 调用错误。
根据官方文档和源码,RAGFlow 的 Docker 镜像(非 slim 版本)预装了两个优化的 embedding 模型:
这两个模型专为中英文优化,提供了良好的多语言支持。在DefaultEmbedding类中,如果未指定模型,默认使用 BAAI/bge-large-zh-v1.5。
RAGFlow 采用了灵活的向量数据库架构,通过抽象接口支持多种向量数据库。
RAGFlow 在rag/utils/doc_store_conn.py中定义了向量数据库的抽象接口:
classDocStoreConnection(ABC):
"""
Database operations
"""
@abstractmethod
defdbType(self) ->str:
"""
Return the type of the database.
"""
raiseNotImplementedError("Not implemented")
@abstractmethod
defhealth(self) ->dict:
"""
Return the health status of the database.
"""
raiseNotImplementedError("Not implemented")
"""
Table operations
"""
@abstractmethod
defcreateIdx(self, indexName:str, knowledgebaseId:str, vectorSize:int):
"""
Create an index with given name
"""
raiseNotImplementedError("Not implemented")
@abstractmethod
defdeleteIdx(self, indexName:str, knowledgebaseId:str):
"""
Delete an index with given name
"""
raiseNotImplementedError("Not implemented")
@abstractmethod
defindexExist(self, indexName:str, knowledgebaseId:str) ->bool:
"""
Check if an index with given name exists
"""
raiseNotImplementedError("Not implemented")
"""
CRUD operations
"""
@abstractmethod
defsearch(
self, selectFields:list[str], highlightFields:list[str],
condition:dict, matchExprs:list[MatchExpr],
orderBy: OrderByExpr, offset:int, limit:int,
indexNames:str|list[str], knowledgebaseIds:list[str],
aggFields:list[str] = [], rank_feature:dict|None=None
):
"""
Search with given conjunctive equivalent filtering condition and return all fields of matched documents
"""
raiseNotImplementedError("Not implemented")这种抽象接口设计使得 RAGFlow 可以轻松支持不同的向量数据库,只需实现特定的接口方法。
RAGFlow 默认使用 OpenSearch 作为向量数据库,通过rag/utils/opensearch_coon.py实现:
@singleton
classOSConnection(DocStoreConnection):
def__init__(self):
self.info = {}
logger.info(f"Use OpenSearch{settings.OS['hosts']}as the doc engine.")
for_inrange(ATTEMPT_TIME):
try:
self.os = OpenSearch(
settings.OS["hosts"].split(","),
http_auth=(settings.OS["username"], settings.OS["password"])if"username"insettings.OSand"password"insettings.OSelseNone,
verify_certs=False,
timeout=600
)
ifself.os:
self.info =self.os.info()
break
exceptExceptionase:
logger.warning(f"{str(e)}. Waiting OpenSearch{settings.OS['hosts']}to be healthy.")
time.sleep(5)OpenSearch 实现了所有必要的接口方法,包括索引创建、删除、搜索等。
RAGFlow 为每个知识库创建独立的索引,使用命名规则ragflow_{uid}确保索引唯一性:
defcreateIdx(self, indexName:str, knowledgebaseId:str, vectorSize:int):
ifself.indexExist(indexName, knowledgebaseId):
returnTrue
try:
fromopensearchpy.clientimportIndicesClient
returnIndicesClient(self.os).create(index=indexName, body=self.mapping)
exceptException:
logger.exception("OSConnection.createIndex error %s"% (indexName))索引结构通过配置文件conf/os_mapping.json定义,确保了向量数据的正确存储和检索。
RAGFlow 实现了混合检索策略,结合关键词搜索和向量相似度搜索:
defsearch(
self, selectFields:list[str], highlightFields:list[str],
condition:dict, matchExprs:list[MatchExpr],
orderBy: OrderByExpr, offset:int, limit:int,
indexNames:str|list[str], knowledgebaseIds:list[str],
aggFields:list[str] = [], rank_feature:dict|None=None
):
use_knn =False
ifisinstance(indexNames,str):
indexNames = indexNames.split(",")
assertisinstance(indexNames,list)andlen(indexNames) >0
assert"_id"notincondition
bqry = Q("bool", must=[])
condition["kb_id"] = knowledgebaseIds
# ... 构建查询条件 ...
s = Search()
vector_similarity_weight =0.5
forminmatchExprs:
ifisinstance(m, FusionExpr)andm.method =="weighted_sum"and"weights"inm.fusion_params:
assertlen(matchExprs) ==3andisinstance(matchExprs[0], MatchTextExpr)andisinstance(matchExprs[1], MatchDenseExpr)andisinstance(matchExprs[2], FusionExpr)
weights = m.fusion_params["weights"]
vector_similarity_weight =float(weights.split(",")[1])
knn_query = {}
forminmatchExprs:
ifisinstance(m, MatchTextExpr):
# 关键词搜索
minimum_should_match = m.extra_options.get("minimum_should_match",0.0)
ifisinstance(minimum_should_match,float):
minimum_should_match =str(int(minimum_should_match *100)) +"%"
bqry.must.append(Q("query_string", fields=m.fields,type="best_fields", query=m.matching_text, minimum_should_match=minimum_should_match, boost=1))
bqry.boost =1.0- vector_similarity_weight
elifisinstance(m, MatchDenseExpr):
# 向量相似度搜索
assert(bqryisnotNone)
similarity =0.0
if"similarity"inm.extra_options:
similarity = m.extra_options["similarity"]
use_knn =True
vector_column_name = m.vector_column_name
knn_query[vector_column_name] = {}
knn_query[vector_column_name]["vector"] =list(m.embedding_data)
knn_query[vector_column_name]["k"] = m.topn
knn_query[vector_column_name]["filter"] = bqry.to_dict()
knn_query[vector_column_name]["boost"] = similarity这种混合检索策略可以提高检索质量,结合了关键词搜索的精确性和向量相似度搜索的语义理解能力。
RAGFlow 支持配置关键词搜索和向量相似度搜索的权重:
vector_similarity_weight =0.5
forminmatchExprs:
ifisinstance(m, FusionExpr)andm.method =="weighted_sum"and"weights"inm.fusion_params:
assertlen(matchExprs) ==3andisinstance(matchExprs[0], MatchTextExpr)andisinstance(matchExprs[1], MatchDenseExpr)andisinstance(matchExprs[2], FusionExpr)
weights = m.fusion_params["weights"]
vector_similarity_weight =float(weights.split(",")[1])默认情况下,向量相似度权重为 0.5,关键词搜索权重为 0.5。用户可以根据需要调整这些权重,以优化检索结果。
RAGFlow 实现了批量操作以提高写入效率:
definsert(self, rows:list[dict], indexName:str, knowledgebaseId:str=None) ->list[str]:
"""
Update or insert a bulk of rows
"""
iflen(rows) ==0:
return[]
actions = []
ids = []
forrowinrows:
if"_id"inrow:
_id= row["_id"]
delrow["_id"]
else:
_id=None
action = {
"_index": indexName,
"_source": row
}
if_id:
action["_id"] = _id
ids.append(_id)
actions.append(action)
try:
fromopensearchpy.helpersimportbulk
success, failed = bulk(self.os, actions, stats_only=True)
returnids
exceptException:
logger.exception("OSConnection.insert error")
return[]这种批量操作方式可以减少 API 调用次数,提高写入效率。
根据官方文档,RAGFlow 允许为不同知识库选择不同的 embedding 模型:
Anembeddingmodelconvertschunksintoembeddings.Itcannotbechangedoncetheknowledgebasehaschunks.Toswitchtoadifferentembeddingmodel,youmustdeleteallexistingchunksintheknowledgebase.Theobviousreasonisthatwemustensurethatfilesinaspecificknowledgebaseareconvertedtoembeddingsusingthesameembeddingmodel(ensurethattheyarecomparedinthesameembeddingspace).
这种设计确保了同一知识库内所有文档使用相同的 embedding 模型,保证了向量空间的一致性。
RAGFlow 通过配置文件设置向量数据库连接参数:
self.os = OpenSearch(
settings.OS["hosts"].split(","),
http_auth=(settings.OS["username"], settings.OS["password"])if"username"insettings.OSand"password"insettings.OSelseNone,
verify_certs=False,
timeout=600
)这些参数可以通过环境变量或配置文件进行设置,使得 RAGFlow 可以灵活地连接不同的 OpenSearch 实例。
RAGFlow 支持配置多种检索参数,如相似度阈值、向量相似度权重等:
RAGFlow uses multiple recall of both full-text search and vector search in its chats. Prior to setting up an AI chat, consider adjusting the following parameters to ensure that the intended information always turns up in answers:
* Similarity threshold: Chunks with similarities below the threshold will be filtered. By default, it is set to 0.2.
* Vector similarity weight: The percentage by which vector similarity contributes to the overall score. By default, it is set to 0.3.这些参数可以根据具体需求进行调整,以优化检索结果。
RAGFlow 在 Embeddings 模型选择与配置以及向量数据库选型与实现方面具有以下特点:
这些特点使得 RAGFlow 能够灵活地适应不同的应用场景,提供高质量的检索结果。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |