前言
在 AI 应用快速发展的今天,向量数据库已成为构建智能检索系统的核心基础设施。Milvus 作为一款开源的高性能向量数据库,在 RAG(Retrieval-Augmented Generation)系统中发挥着关键作用。
本文将带你从零开始,基于 Milvus 构建一个完整的 RAG 系统,涵盖数据准备、向量检索、结果重排、位置优化等核心环节,并分享生产环境中的最佳实践和性能优化技巧。
1. Milvus 简介与核心特性
1.1 什么是 Milvus?
Milvus 是一个开源的向量数据库,专为 AI 应用设计,支持大规模向量数据的存储、索引和检索。它具备以下核心特性:
- 易用性:提供 Python SDK,API 简洁直观
- 多索引支持:HNSW、IVF、DiskANN 等多种索引算法
1.2 为什么选择 Milvus?
在构建 RAG 系统时,我们需要一个能够:
Milvus 完美满足这些需求,已成为 RAG 系统的首选向量数据库。
1.3 Milvus Lite vs Milvus Server
Milvus 提供了两种使用方式:
| | |
|---|
| 部署方式 | | |
| 适用场景 | | |
| 数据存储 | | |
| 性能 | | |
| 使用方式 | MilvusClient(uri="./db") | connections.connect(host="localhost") |
本文使用 Milvus Lite,因为它更简单,适合快速上手和开发测试。
2. 环境准备与安装
2.1 系统要求
2.2 安装依赖
首先创建虚拟环境(推荐):
# 创建虚拟环境
python3 -m venv venv
# 激活虚拟环境
# macOS/Linux:
sourcevenv/bin/activate
# Windows:
# venv\Scripts\activate
安装必要的依赖:
pip install pymilvus[milvus_lite] # 包含Milvus Lite
pip install sentence-transformers # Embedding模型
pip install transformers # 重排模型
pip install torch # PyTorch(重排模型需要)
重要提示:必须安装pymilvus[milvus_lite](不是pymilvus),这样才能使用 Milvus Lite 的本地文件功能。
2.3 验证安装
frompymilvusimportMilvusClient
# 测试Milvus Lite连接
client = MilvusClient(uri="./test.db")
print("✓ Milvus Lite 安装成功!")
3. 数据准备:从文档到向量
3.1 文档预处理
在实际应用中,我们需要将原始文档转换为向量。这个过程包括:
- 向量化:使用 Embedding 模型将文本转换为向量
3.2 文档分块策略
文档分块是 RAG 系统的关键步骤,直接影响检索效果:
defchunk_document(text, chunk_size=512, overlap=50):
"""
文档分块
Args:
text: 原始文档文本
chunk_size: 每块的大小(字符数或tokens)
overlap: 块之间的重叠大小,保证上下文连贯性
Returns:
文档块列表
"""
chunks = []
start =0
whilestart < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap # 重叠,保证上下文连贯
returnchunks
分块策略选择:
3.3 向量化:选择 Embedding 模型
选择合适的 Embedding 模型至关重要:
| | | | |
|---|
| all-MiniLM-L6-v2 | | | | |
| BGE-large-en-v1.5 | | | | |
| BGE-large-zh-v1.5 | | | | |
| E5-mistral-7b | | | | |
代码示例:
fromsentence_transformersimportSentenceTransformer
# 选择模型(根据需求)
embedding_model ="sentence-transformers/all-MiniLM-L6-v2"# 快速,80MB
# embedding_model = "BAAI/bge-large-en-v1.5" # 高精度,1.3GB
# 加载模型
encoder = SentenceTransformer(embedding_model)
# 生成向量
texts = ["Milvus is a vector database...","RAG combines retrieval and generation..."]
embeddings = encoder.encode(texts, normalize_embeddings=True)
print(f"向量维度:{embeddings.shape}") # (2, 384) 或 (2, 1024)
3.4 创建 Milvus 集合
在插入数据之前,需要先创建集合(Collection):
frompymilvusimportMilvusClient
# 连接Milvus(自动使用Milvus Lite)
client = MilvusClient(uri="./milvus_demo.db")
# 集合配置
collection_name ="documents"
dimension =384# 根据Embedding模型选择:all-MiniLM-L6-v2=384, BGE-large=1024
# 检查集合是否存在
ifclient.has_collection(collection_name):
print(f"集合{collection_name}已存在,删除旧集合...")
client.drop_collection(collection_name)
# 创建集合
client.create_collection(
collection_name=collection_name,
dimension=dimension, # 向量维度
metric_type="L2", # 距离度量:L2(欧氏距离)或IP(内积)
auto_id=True, # 自动生成ID
)
print(f"✓ 集合{collection_name}创建成功")
3.5 插入数据
将向量和元数据插入 Milvus:
# 准备数据
documents = [
{
"text":"Milvus is an open-source vector database...",
"doc_id":"doc_001",
"title":"Introduction to Milvus"
},
# ... 更多文档
]
# 生成向量
texts = [doc["text"]fordocindocuments]
embeddings = encoder.encode(texts, normalize_embeddings=True)
# 准备插入数据
data = []
fori, (emb, doc)inenumerate(zip(embeddings, documents)):
data.append({
"vector": emb.tolist(), # 向量(必须)
"text": doc["text"], # 文本内容
"doc_id": doc["doc_id"], # 业务ID
"title": doc["title"], # 标题
})
# 插入数据
client.insert(collection_name=collection_name, data=data)
print(f"✓ 成功插入{len(data)}个文档")
完整的数据准备脚本:
"""
完整的数据准备示例
"""
frompymilvusimportMilvusClient
fromsentence_transformersimportSentenceTransformer
defprepare_data():
# 1. 连接Milvus
client = MilvusClient(uri="./milvus_demo.db")
collection_name ="documents"
# 2. 准备文档
documents = [
"Milvus is an open-source vector database...",
"RAG combines information retrieval with language models...",
# ... 更多文档
]
# 3. 初始化Embedding模型
encoder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
dimension =384
# 4. 创建集合
ifclient.has_collection(collection_name):
client.drop_collection(collection_name)
client.create_collection(
collection_name=collection_name,
dimension=dimension,
metric_type="L2",
auto_id=True,
)
# 5. 生成向量并插入
embeddings = encoder.encode(documents, normalize_embeddings=True)
data = [
{"vector": emb.tolist(),"text": doc}
foremb, docinzip(embeddings, documents)
]
client.insert(collection_name=collection_name, data=data)
print(f"✓ 数据准备完成,共{len(data)}个文档")
if__name__ =="__main__":
prepare_data()
4. 核心实现:构建 RAG 系统
4.1 RAG 系统架构
一个完整的 RAG 系统包含以下组件:
用户查询
↓
向量检索(Milvus)← 知识库
↓
结果重排(可选)
↓
上下文构建
↓
LLM生成答案
4.2 基础 RAG 实现
让我们从最简单的 RAG 系统开始:
frompymilvusimportMilvusClient
fromsentence_transformersimportSentenceTransformer
classSimpleRAGSystem:
"""基础RAG系统"""
def__init__(self, milvus_uri="./milvus_demo.db", collection_name="documents"):
# 连接Milvus
self.client = MilvusClient(uri=milvus_uri)
self.collection_name = collection_name
# 初始化Embedding模型
self.encoder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
defretrieve(self, query: str, top_k: int =10):
"""向量检索"""
# 1. 编码查询
query_vector = self.encoder.encode(query, normalize_embeddings=True)
query_vector = query_vector.tolist()
# 2. 在Milvus中搜索
results = self.client.search(
collection_name=self.collection_name,
data=[query_vector],
limit=top_k,
search_params={"metric_type":"L2","params": {}},
output_fields=["text","doc_id","title"]
)
# 3. 格式化结果
retrieved_docs = []
forhitinresults[0]:
retrieved_docs.append({
"id": hit.get("id",""),
"text": hit.get("entity", {}).get("text",""),
"score": hit.get("distance",0.0),
"doc_id": hit.get("entity", {}).get("doc_id",""),
"title": hit.get("entity", {}).get("title",""),
})
returnretrieved_docs
defquery(self, user_query: str, top_k: int =5):
"""查询流程"""
# 1. 检索
retrieved_docs = self.retrieve(user_query, top_k=top_k)
# 2. 构建上下文
context ="\n\n".join([doc["text"]fordocinretrieved_docs])
# 3. 返回结果(这里只返回上下文,实际应用中会调用LLM)
return{
"query": user_query,
"context": context,
"documents": retrieved_docs
}
# 使用示例
rag = SimpleRAGSystem()
result = rag.query("What is Milvus?")
print(result["context"])
4.3 增强版 RAG:添加重排
向量检索虽然快速,但可能不够精确。我们可以添加重排(Reranking)步骤来提升准确性:
fromtransformersimportAutoModelForSequenceClassification, AutoTokenizer
importtorch
classEnhancedRAGSystem(SimpleRAGSystem):
"""增强版RAG系统(带重排)"""
def__init__(self, milvus_uri="./milvus_demo.db", collection_name="documents"):
super().__init__(milvus_uri, collection_name)
# 初始化重排模型(可选)
self.reranker =None
self.reranker_tokenizer =None
try:
print("正在加载重排模型...")
reranker_model ="BAAI/bge-reranker-base"
self.reranker = AutoModelForSequenceClassification.from_pretrained(reranker_model)
self.reranker_tokenizer = AutoTokenizer.from_pretrained(reranker_model)
self.reranker.eval()
print("✓ 重排模型加载成功")
exceptExceptionase:
print(f"⚠️ 重排模型加载失败:{e},将跳过重排步骤")
defrerank(self, query: str, documents: List[str], top_k: int =10):
"""重排:使用BGE-Reranker对检索结果进行精排"""
ifself.rerankerisNoneorlen(documents) ==0:
returndocuments[:top_k]
# 构建查询-文档对
pairs = [[query, doc]fordocindocuments]
# Tokenize
withtorch.no_grad():
inputs = self.reranker_tokenizer(
pairs,
padding=True,
truncation=True,
return_tensors="pt",
max_length=512
)
# 计算相关性分数
scores = self.reranker(**inputs).logits.squeeze(-1)
# 按分数排序
ranked_indices = scores.argsort(descending=True)
# 返回Top-K
reranked_docs = [documents[idx]foridxinranked_indices[:top_k]]
returnreranked_docs
defquery(self, user_query: str, retrieve_top_k: int =100, rerank_top_k: int =10):
"""查询流程:检索 → 重排 → 构建上下文"""
# 1. 向量检索(粗排)
retrieved_docs = self.retrieve(user_query, top_k=retrieve_top_k)
# 2. 重排(精排)
doc_texts = [doc["text"]fordocinretrieved_docs]
reranked_texts = self.rerank(user_query, doc_texts, top_k=rerank_top_k)
# 3. 构建上下文
context ="\n\n".join(reranked_texts)
return{
"query": user_query,
"context": context,
"retrieved_count": len(retrieved_docs),
"reranked_count": len(reranked_texts)
}
重排的优势:
- 提升准确性:重排模型考虑查询和文档的交互,比单纯向量相似度更准确
- 灵活调整:可以调整重排后的 Top-K,平衡准确性和成本
5. 关键优化:突破 U 型陷阱
5.1 什么是 U 型陷阱?
研究发现,长上下文语言模型存在位置偏差(Position Bias):
- 开头位置(Primacy Bias):准确率 ~75.8% ✅
- 中间位置(Lost in the Middle):准确率 ~53.8% ❌
- 结尾位置(Recency Bias):准确率 ~63.2% ✅
这意味着,如果关键信息放在中间位置,模型可能无法有效利用,导致性能下降。
5.2 位置优化策略
通过位置优化,我们可以突破 U 型陷阱:
核心策略:
- 最相关文档 → 开头:利用 Primacy Bias
- 用户问题 → 结尾:利用 Recency Bias
5.3 实现位置优化
classOptimizedRAGSystem(EnhancedRAGSystem):
"""优化版RAG系统(位置优化)"""
defbuild_context(self, query: str, retrieved_docs: List[Dict[str, Any]])-> str:
"""
构建上下文(位置优化:突破U型陷阱的关键步骤)
策略:
- 最相关文档 → 开头(利用Primacy Bias)
- 次相关文档 → 中间(低优先级)
- 用户问题 → 结尾(利用Recency Bias)
"""
iflen(retrieved_docs) ==0:
returnf"系统提示:请回答问题。\n\n用户问题:{query}"
# 按相关性排序(最相关的在前)
sorted_docs = sorted(
retrieved_docs,
key=lambdax: x.get("score",0)ifisinstance(x.get("score"), (int, float))else0,
reverse=True# 分数越高越好(如果是相似度分数)
)
# 构建上下文
context_parts = []
# 系统提示
context_parts.append("系统提示:请基于以下文档回答问题。\n")
# 最相关文档(开头位置 - 利用Primacy Bias)
context_parts.append("# 最相关文档(开头位置)\n")
top_docs = sorted_docs[:min(3, len(sorted_docs))]
fori, docinenumerate(top_docs,1):
text = doc.get("text","")
context_parts.append(f"文档{i}:{text}\n")
# 次相关文档(中间位置)
iflen(sorted_docs) >3:
context_parts.append("\n# 次相关文档(中间位置)\n")
secondary_docs = sorted_docs[3:min(7, len(sorted_docs))]
fori, docinenumerate(secondary_docs,4):
text = doc.get("text","")
context_parts.append(f"文档{i}:{text}\n")
# 用户问题(结尾位置 - 利用Recency Bias)
context_parts.append("\n# 用户问题(结尾位置)\n")
context_parts.append(f"用户问题:{query}")
return"\n".join(context_parts)
defquery(self, user_query: str, retrieve_top_k: int =100, rerank_top_k: int =10):
"""完整查询流程:检索 → 重排 → 位置优化 → 生成"""
print(f"\n查询:{user_query}\n")
# 1. 向量检索
print(f"步骤 1: 向量检索(Top-{retrieve_top_k})...")
retrieved_docs = self.retrieve(user_query, top_k=retrieve_top_k)
print(f"✓ 检索到{len(retrieved_docs)}个文档")
iflen(retrieved_docs) ==0:
return{"query": user_query,"context":"","answer":"未找到相关文档。"}
# 2. 重排(可选)
ifself.rerankerisnotNone:
print(f"\n步骤 2: 重排(Top-{rerank_top_k})...")
doc_texts = [doc["text"]fordocinretrieved_docs]
reranked_texts = self.rerank(user_query, doc_texts, top_k=rerank_top_k)
# 更新文档列表(只保留重排后的文档)
reranked_docs = [
docfordocinretrieved_docs
ifdoc["text"]inreranked_texts
]
# 按重排顺序重新排序
text_to_doc = {doc["text"]: docfordocinreranked_docs}
reranked_docs = [text_to_doc[text]fortextinreranked_texts]
print(f"✓ 重排完成,返回 Top-{len(reranked_docs)}个文档")
else:
print(f"\n步骤 2: 跳过重排(未配置Reranker)...")
reranked_docs = retrieved_docs[:rerank_top_k]
print(f"✓ 使用检索结果,返回 Top-{len(reranked_docs)}个文档")
# 3. 位置优化构建上下文
print(f"\n步骤 3: 位置优化构建上下文...")
context = self.build_context(user_query, reranked_docs)
print(f"✓ 上下文构建完成(长度:{len(context)}字符)")
# 4. 生成答案(这里只返回上下文,实际应用中会调用LLM)
print(f"\n步骤 4: 上下文已准备就绪\n")
return{
"query": user_query,
"retrieved_docs": retrieved_docs[:5], # 只返回前5个用于展示
"reranked_docs": reranked_docs,
"context": context,
"answer":"【提示】未配置LLM,仅返回上下文。\n\n"+ context
}
5.4 效果对比
位置优化带来的效果提升:
关键结论:位置优化是突破 U 型陷阱的核心步骤,生产环境必须包含!
6. 性能优化实战
6.1 索引选择
Milvus 支持多种索引类型,选择合适的索引对性能至关重要:
| | | |
|---|
| HNSW | | | |
| IVF_FLAT | | | |
| IVF_PQ | | | |
| DiskANN | | | |
代码示例(使用 HNSW 索引):
# 注意:MilvusClient方式创建集合时,索引参数在create_collection中设置
# 如果需要更精细的索引控制,可以使用pymilvus的Collection方式
frompymilvusimportCollection, connections, FieldSchema, CollectionSchema, DataType
# 连接Milvus Server(不是Lite)
connections.connect(alias="default", host="localhost", port="19530")
# 定义Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000),
]
schema = CollectionSchema(fields=fields, description="Documents collection")
collection = Collection(name="documents", schema=schema)
# 创建HNSW索引
index_params = {
"metric_type":"L2",
"index_type":"HNSW",
"params": {
"M":16, # 每个节点的连接数
"efConstruction":200# 构建时的搜索范围
}
}
collection.create_index(field_name="vector", index_params=index_params)
# 搜索时设置ef参数
search_params = {"metric_type":"L2","params": {"ef":64}}
6.2 批量处理
对于大量查询,使用批量处理可以显著提升吞吐量:
defbatch_search(self, queries: List[str], batch_size: int =32):
"""批量检索"""
all_results = []
# 批量编码
query_vectors = self.encoder.encode(queries, normalize_embeddings=True)
# 批量搜索
foriinrange(0, len(query_vectors), batch_size):
batch_vectors = query_vectors[i:i+batch_size]
batch_queries = queries[i:i+batch_size]
results = self.client.search(
collection_name=self.collection_name,
data=batch_vectors.tolist(),
limit=10,
search_params={"metric_type":"L2","params": {}},
output_fields=["text"]
)
all_results.extend(results)
returnall_results
6.3 缓存策略
实现查询缓存可以大幅降低延迟和成本:
fromfunctoolsimportlru_cache
importhashlib
importjson
classCachedRAGSystem(OptimizedRAGSystem):
"""带缓存的RAG系统"""
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = {} # 简单的内存缓存,生产环境建议使用Redis
def_get_cache_key(self, query: str, top_k: int)-> str:
"""生成缓存键"""
key_data =f"{query}_{top_k}"
returnhashlib.md5(key_data.encode()).hexdigest()
defretrieve(self, query: str, top_k: int =10, use_cache: bool = True):
"""带缓存的检索"""
ifuse_cache:
cache_key = self._get_cache_key(query, top_k)
ifcache_keyinself.cache:
print(f"✓ 使用缓存结果")
returnself.cache[cache_key]
# 执行检索
results = super().retrieve(query, top_k)
# 存入缓存
ifuse_cache:
self.cache[cache_key] = results
returnresults
6.4 异步处理
对于高并发场景,使用异步处理可以提升性能:
importasyncio
fromconcurrent.futuresimportThreadPoolExecutor
classAsyncRAGSystem(OptimizedRAGSystem):
"""异步RAG系统"""
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=4)
asyncdefasync_retrieve(self, query: str, top_k: int =10):
"""异步检索"""
loop = asyncio.get_event_loop()
results =awaitloop.run_in_executor(
self.executor,
self.retrieve,
query,
top_k
)
returnresults
asyncdefasync_query(self, user_query: str, retrieve_top_k: int =100, rerank_top_k: int =10):
"""异步查询"""
# 异步检索
retrieved_docs =awaitself.async_retrieve(user_query, top_k=retrieve_top_k)
# 其他步骤可以继续异步化...
# 这里简化处理
return{
"query": user_query,
"documents": retrieved_docs
}
# 使用示例
asyncdefmain():
rag = AsyncRAGSystem()
result =awaitrag.async_query("What is Milvus?")
print(result)
# asyncio.run(main())
7. 生产环境最佳实践
7.1 错误处理与重试
生产环境必须包含完善的错误处理:
importtime
fromtypingimportOptional
classProductionRAGSystem(OptimizedRAGSystem):
"""生产环境RAG系统(带错误处理和重试)"""
defretrieve_with_retry(
self,
query: str,
top_k: int =10,
max_retries: int =3,
retry_delay: float =1.0
):
"""带重试的检索"""
forattemptinrange(max_retries):
try:
returnself.retrieve(query, top_k)
exceptExceptionase:
ifattempt == max_retries -1:
raise# 最后一次重试失败,抛出异常
print(f"检索失败(尝试{attempt +1}/{max_retries}):{e}")
time.sleep(retry_delay * (attempt +1)) # 指数退避
return[]
defquery(self, user_query: str, **kwargs):
"""生产环境查询(带错误处理)"""
try:
returnsuper().query(user_query, **kwargs)
exceptExceptionase:
return{
"query": user_query,
"error": str(e),
"context":"",
"answer":"抱歉,查询过程中出现错误,请稍后重试。"
}
7.2 日志记录
添加详细的日志记录,便于监控和调试:
importlogging
fromdatetimeimportdatetime
classLoggedRAGSystem(ProductionRAGSystem):
"""带日志的RAG系统"""
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rag_system.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
defquery(self, user_query: str, **kwargs):
"""带日志的查询"""
start_time = datetime.now()
self.logger.info(f"开始查询:{user_query}")
try:
result = super().query(user_query, **kwargs)
elapsed_time = (datetime.now() - start_time).total_seconds()
self.logger.info(
f"查询完成:{user_query}, "
f"耗时:{elapsed_time:.2f}秒, "
f"检索到:{len(result.get('retrieved_docs', []))}个文档"
)
returnresult
exceptExceptionase:
self.logger.error(f"查询失败:{user_query}, 错误:{e}")
raise
7.3 监控指标
收集关键性能指标:
fromcollectionsimportdefaultdict
importtime
classMonitoredRAGSystem(LoggedRAGSystem):
"""带监控的RAG系统"""
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
"total_queries":0,
"total_retrieval_time":0,
"total_rerank_time":0,
"total_query_time":0,
"errors":0,
}
defquery(self, user_query: str, **kwargs):
"""带监控的查询"""
self.metrics["total_queries"] +=1
start_time = time.time()
try:
# 监控检索时间
retrieval_start = time.time()
retrieved_docs = self.retrieve(user_query, top_k=kwargs.get("retrieve_top_k",100))
self.metrics["total_retrieval_time"] += time.time() - retrieval_start
# 监控重排时间
ifself.reranker:
rerank_start = time.time()
# ... 重排逻辑
self.metrics["total_rerank_time"] += time.time() - rerank_start
result = super().query(user_query, **kwargs)
self.metrics["total_query_time"] += time.time() - start_time
returnresult
exceptExceptionase:
self.metrics["errors"] +=1
raise
defget_metrics(self):
"""获取监控指标"""
total = self.metrics["total_queries"]
iftotal ==0:
return{}
return{
"total_queries": total,
"avg_retrieval_time": self.metrics["total_retrieval_time"] / total,
"avg_rerank_time": self.metrics["total_rerank_time"] / total,
"avg_query_time": self.metrics["total_query_time"] / total,
"error_rate": self.metrics["errors"] / total,
}
7.4 配置管理
使用配置文件管理参数:
# config.yaml
milvus:
uri:"./milvus_demo.db"
collection_name:"documents"
embedding:
model:"sentence-transformers/all-MiniLM-L6-v2"
dimension:384
reranker:
enabled:true
model:"BAAI/bge-reranker-base"
retrieval:
top_k:100
rerank_top_k:10
position_optimization:
enabled:true
top_relevant:3
secondary_relevant:5
importyaml
classConfigurableRAGSystem(OptimizedRAGSystem):
"""可配置的RAG系统"""
def__init__(self, config_path: str ="config.yaml"):
# 加载配置
withopen(config_path,'r')asf:
config = yaml.safe_load(f)
# 使用配置初始化
super().__init__(
milvus_uri=config["milvus"]["uri"],
collection_name=config["milvus"]["collection_name"]
)
self.config = config
8. 常见问题与解决方案
8.1 集合不存在
问题:Collection 'documents' does not exist
解决方案:
# 检查集合是否存在
ifnotclient.has_collection(collection_name):
print(f"集合{collection_name}不存在,请先创建集合并插入数据")
# 创建集合
client.create_collection(...)
# 插入数据
client.insert(...)
8.2 向量维度不匹配
问题:Dimension mismatch: expected 384, got 1024
解决方案:
# 确保Embedding模型维度与集合维度一致
embedding_model ="sentence-transformers/all-MiniLM-L6-v2"# 384维
# 或
embedding_model ="BAAI/bge-large-en-v1.5"# 1024维
# 创建集合时使用正确的维度
dimension =384if"all-MiniLM-L6-v2"inembedding_modelelse1024
client.create_collection(..., dimension=dimension)
8.3 内存不足
问题:加载大模型时内存不足
解决方案:
使用更小的模型:
# 使用小模型(80MB vs 1.3GB)
encoder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
延迟加载:
classLazyRAGSystem:
def__init__(self):
self._encoder =None
self._reranker =None
@property
defencoder(self):
ifself._encoderisNone:
self._encoder = SentenceTransformer("...")
returnself._encoder
使用 CPU 模式(如果 GPU 内存不足):
encoder = SentenceTransformer("...", device="cpu")
8.4 检索结果不准确
问题:检索到的文档与查询不相关
解决方案:
使用更好的 Embedding 模型:
# 从all-MiniLM-L6-v2升级到BGE-large
encoder = SentenceTransformer("BAAI/bge-large-en-v1.5")
添加重排:
# 使用重排模型提升准确性
reranker = AutoModelForSequenceClassification.from_pretrained("BAAI/bge-reranker-base")
调整检索参数:
# 增加检索数量,然后重排
retrieved_docs = self.retrieve(query, top_k=200) # 增加检索数量
reranked_docs = self.rerank(query, retrieved_docs, top_k=10) # 重排后取Top-10
8.5 性能优化
问题:查询速度慢
解决方案:
使用合适的索引:
# HNSW索引适合低延迟场景
index_params = {
"index_type":"HNSW",
"params": {"M":16,"efConstruction":200}
}
批量处理:
# 批量查询比单个查询更高效
results = batch_search(queries, batch_size=32)
缓存结果:
# 缓存常见查询的结果
cached_results = cache.get(query)
ifcached_results:
returncached_results
9. 总结
本文介绍了如何从零开始基于 Milvus 构建高性能 RAG 系统,涵盖了数据准备、向量检索、结果重排、位置优化等核心环节。
关键要点:
- ✅位置优化是关键:通过将最相关文档放在开头,可以突破 U 型陷阱,提升 22%的准确率
- ✅重排提升准确性:使用 BGE-Reranker 等重排模型可以显著提升检索效果
- ✅索引选择很重要:根据数据规模和性能需求选择合适的索引类型(HNSW、IVF 等)
- ✅生产环境需完善:错误处理、日志记录、性能监控是生产环境必备功能
希望本文能帮助你构建高性能的 RAG 系统。更多技术细节和代码示例请参考文中各章节内容。
附录:完整代码示例
A. 完整的数据准备脚本
"""
完整的数据准备脚本
"""
frompymilvusimportMilvusClient
fromsentence_transformersimportSentenceTransformer
defprepare_data():
# 1. 连接Milvus
client = MilvusClient(uri="./milvus_demo.db")
collection_name ="documents"
# 2. 准备文档
documents = [
{
"text":"Milvus is an open-source vector database designed for AI applications...",
"doc_id":"doc_001",
"title":"Introduction to Milvus"
},
# ... 更多文档
]
# 3. 初始化Embedding模型
encoder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
dimension =384
# 4. 创建集合
ifclient.has_collection(collection_name):
client.drop_collection(collection_name)
client.create_collection(
collection_name=collection_name,
dimension=dimension,
metric_type="L2",
auto_id=True,
)
# 5. 生成向量并插入
texts = [doc["text"]fordocindocuments]
embeddings = encoder.encode(texts, normalize_embeddings=True)
data = [
{
"vector": emb.tolist(),
"text": doc["text"],
"doc_id": doc["doc_id"],
"title": doc["title"],
}
foremb, docinzip(embeddings, documents)
]
client.insert(collection_name=collection_name, data=data)
print(f"✓ 数据准备完成,共{len(data)}个文档")
if__name__ =="__main__":
prepare_data()
B. 完整的 RAG 系统实现
"""
完整的优化RAG系统实现
"""
frompymilvusimportMilvusClient
fromsentence_transformersimportSentenceTransformer
fromtransformersimportAutoModelForSequenceClassification, AutoTokenizer
fromtypingimportList, Dict, Any
importtorch
classOptimizedRAGSystem:
"""优化的RAG系统"""
def__init__(
self,
milvus_uri: str ="./milvus_demo.db",
collection_name: str ="documents",
embedding_model: str ="sentence-transformers/all-MiniLM-L6-v2",
reranker_model: str = None,
):
# 连接Milvus
self.client = MilvusClient(uri=milvus_uri)
self.collection_name = collection_name
# 初始化Embedding模型
self.encoder = SentenceTransformer(embedding_model)
# 初始化重排模型(可选)
self.reranker =None
self.reranker_tokenizer =None
ifreranker_model:
self.reranker = AutoModelForSequenceClassification.from_pretrained(reranker_model)
self.reranker_tokenizer = AutoTokenizer.from_pretrained(reranker_model)
self.reranker.eval()
defretrieve(self, query: str, top_k: int =100):
"""向量检索"""
query_vector = self.encoder.encode(query, normalize_embeddings=True).tolist()
results = self.client.search(
collection_name=self.collection_name,
data=[query_vector],
limit=top_k,
search_params={"metric_type":"L2","params": {}},
output_fields=["text","doc_id","title"]
)
retrieved_docs = []
forhitinresults[0]:
retrieved_docs.append({
"id": hit.get("id",""),
"text": hit.get("entity", {}).get("text",""),
"score": hit.get("distance",0.0),
"doc_id": hit.get("entity", {}).get("doc_id",""),
"title": hit.get("entity", {}).get("title",""),
})
returnretrieved_docs
defrerank(self, query: str, documents: List[str], top_k: int =10):
"""重排"""
ifself.rerankerisNoneorlen(documents) ==0:
returndocuments[:top_k]
pairs = [[query, doc]fordocindocuments]
withtorch.no_grad():
inputs = self.reranker_tokenizer(
pairs, padding=True, truncation=True, return_tensors="pt", max_length=512
)
scores = self.reranker(**inputs).logits.squeeze(-1)
ranked_indices = scores.argsort(descending=True)
return[documents[idx]foridxinranked_indices[:top_k]]
defbuild_context(self, query: str, retrieved_docs: List[Dict[str, Any]])-> str:
"""构建上下文(位置优化)"""
sorted_docs = sorted(
retrieved_docs,
key=lambdax: x.get("score",0)ifisinstance(x.get("score"), (int, float))else0,
reverse=True
)
context_parts = ["系统提示:请基于以下文档回答问题。\n"]
# 最相关文档(开头)
context_parts.append("# 最相关文档(开头位置)\n")
fori, docinenumerate(sorted_docs[:3],1):
context_parts.append(f"文档{i}:{doc.get('text','')}\n")
# 次相关文档(中间)
iflen(sorted_docs) >3:
context_parts.append("\n# 次相关文档(中间位置)\n")
fori, docinenumerate(sorted_docs[3:7],4):
context_parts.append(f"文档{i}:{doc.get('text','')}\n")
# 用户问题(结尾)
context_parts.append("\n# 用户问题(结尾位置)\n")
context_parts.append(f"用户问题:{query}")
return"\n".join(context_parts)
defquery(self, user_query: str, retrieve_top_k: int =100, rerank_top_k: int =10):
"""完整查询流程"""
# 1. 检索
retrieved_docs = self.retrieve(user_query, top_k=retrieve_top_k)
# 2. 重排
ifself.reranker:
doc_texts = [doc["text"]fordocinretrieved_docs]
reranked_texts = self.rerank(user_query, doc_texts, top_k=rerank_top_k)
text_to_doc = {doc["text"]: docfordocinretrieved_docs}
reranked_docs = [text_to_doc[text]fortextinreranked_texts]
else:
reranked_docs = retrieved_docs[:rerank_top_k]
# 3. 构建上下文
context = self.build_context(user_query, reranked_docs)
return{
"query": user_query,
"context": context,
"documents": reranked_docs
}
# 使用示例
if__name__ =="__main__":
rag = OptimizedRAGSystem()
result = rag.query("What is Milvus?")
print(result["context"])