链载Ai

标题: Milvus 查询引擎剖析:从 SQL 到向量检索的执行全流程 [打印本页]

作者: 链载Ai    时间: 昨天 22:43
标题: Milvus 查询引擎剖析:从 SQL 到向量检索的执行全流程

一条查询请求如何在 Milvus 中执行?本文将深入查询引擎的内部,揭示向量检索的完整流程。

全文约 6000 字,建议阅读时间 15 分钟


一、查询引擎概览

1.1 查询类型

Milvus 支持三种主要查询类型:

frompymilvusimportCollection

collection = Collection("documents")

# 1. 向量搜索(ANN Search)
results = collection.search(
data=[[0.1] *768], # 查询向量
anns_field="embedding",
param={"metric_type":"COSINE","params": {"ef":64}},
limit=10
)

# 2. 标量查询(Query)
results = collection.query(
expr="score > 0.8 and category == 'tech'",
output_fields=["id","text","score"]
)

# 3. 混合查询(Hybrid Search)
results = collection.hybrid_search(
reqs=[
# 向量搜索
AnnSearchRequest(
data=[[0.1] *768],
anns_field="dense_vector",
param={"metric_type":"COSINE"},
limit=10
),
# 全文检索
AnnSearchRequest(
data=[[0.2] *1000],
anns_field="sparse_vector",
param={"metric_type":"IP"},
limit=10
)
],
rerank=WeightedRanker(0.7,0.3), # 重排序
limit=10
)

1.2 查询架构

查询执行流程:

┌─────────┐
│ Client │ 发起查询
└────┬────┘

┌────▼────┐
│ Proxy │ 1. 解析请求
└────┬────┘ 2. 查询路由
│ 3. 负载均衡

├──────────────────┬──────────────────┐
│ │ │
┌────▼────┐ ┌───▼────┐ ┌───▼────┐
│QueryNode│ │QueryNode│ │QueryNode│
│ Node1 │ │ Node2 │ │ Node3 │
└────┬────┘ └───┬────┘ └───┬────┘
│ │ │
│ 1. 标量过滤 │ │
│ 2. 向量检索 │ │
│ 3. 返回 TopK │ │
│ │ │
└─────────────────┴──────────────────┘

┌──────────────────────▼────┐
│ Proxy │
│ 1. 合并结果 │
│ 2. 全局排序 │
│ 3. 返回最终 TopK │
└────────────────────────────┘

二、查询解析与规划

2.1 表达式解析

Milvus 支持类 SQL 的表达式语法:

# 基础表达式
expr ="id in [1, 2, 3]"
expr ="score > 0.8"
expr ="category == 'tech'"

# 复合表达式
expr ="score > 0.8 and category == 'tech'"
expr ="(score > 0.8 or views > 1000) and category != 'spam'"

# 数组操作
expr ="tags array_contains 'AI'"
expr ="tags array_contains_any ['AI', 'ML']"
expr ="tags array_contains_all ['AI', 'ML', 'DL']"

# JSON 字段
expr ="metadata['author'] == 'Alice'"
expr ="metadata['tags'][0] == 'featured'"

# 字符串操作
expr ="text like 'Milvus%'"# 前缀匹配
expr ="text like '%database%'"# 包含匹配

表达式 AST(抽象语法树)

// 表达式解析器
typeExprParserstruct{
lexer *Lexer
tokens []Token
}

// 解析表达式
func(p *ExprParser)Parse(exprstring)(*ExprNode, error){
// 1. 词法分析
tokens := p.lexer.Tokenize(expr)

// 2. 语法分析,构建 AST
ast := p.buildAST(tokens)

// 3. 语义分析,类型检查
iferr := p.validate(ast); err !=nil{
returnnil, err
}

returnast,nil
}

// AST 节点
typeExprNodestruct{
Type NodeType // AND, OR, EQ, GT, LT, IN, etc.
Left *ExprNode
Right *ExprNode
Value interface{}
FieldID int64
}

// 示例:score > 0.8 and category == 'tech'
// AST:
// AND
// / \
// GT EQ
// / \ / \
// score 0.8 category 'tech'

2.2 查询计划生成

// 查询计划器
typeQueryPlannerstruct{
schema *Schema
stats *Statistics
}

// 生成查询计划
func(qp *QueryPlanner)Plan(req *SearchRequest)(*QueryPlan, error){
plan := &QueryPlan{}

// 1. 解析表达式
ifreq.Expr !=""{
exprNode, err := qp.parseExpr(req.Expr)
iferr !=nil{
returnnil, err
}
plan.FilterNode = exprNode
}

// 2. 选择索引
plan.VectorIndex = qp.selectVectorIndex(req.AnnsField)
plan.ScalarIndexes = qp.selectScalarIndexes(plan.FilterNode)

// 3. 估算成本
plan.Cost = qp.estimateCost(plan)

// 4. 优化计划
plan = qp.optimize(plan)

returnplan,nil
}

// 查询计划
typeQueryPlanstruct{
// 过滤条件
FilterNode *ExprNode

// 向量检索
VectorIndex *IndexInfo
VectorField string
MetricType string
SearchParamsmap[string]interface{}

// 标量索引
ScalarIndexes []*IndexInfo

// 执行顺序
ExecutionOrder []PlanNode

// 成本估算
Costfloat64
}

2.3 查询优化

优化 1:谓词下推(Predicate Pushdown)

// 将过滤条件尽早应用,减少数据量
func(qp *QueryPlanner)pushDownPredicate(plan *QueryPlan){
// 原始计划:
// 1. 向量检索 → 10000 个结果
// 2. 标量过滤 → 100 个结果

// 优化后:
// 1. 标量过滤 → 1000 个候选
// 2. 向量检索 → 100 个结果

ifplan.FilterNode !=nil&& qp.hasScalarIndex(plan.FilterNode) {
// 先执行标量过滤
plan.ExecutionOrder = []PlanNode{
&ScalarFilterNode{Expr: plan.FilterNode},
&VectorSearchNode{...},
}
}
}

优化 2:索引选择

// 选择最优索引
func(qp *QueryPlanner)selectBestIndex(expr *ExprNode)*IndexInfo{
candidates := qp.findApplicableIndexes(expr)

// 评估每个索引的成本
bestIndex := candidates[0]
minCost := qp.estimateIndexCost(bestIndex, expr)

for_, idx :=rangecandidates[1:] {
cost := qp.estimateIndexCost(idx, expr)
ifcost < minCost {
minCost = cost
bestIndex = idx
}
}

returnbestIndex
}

优化 3:并行执行

// 并行执行多个 Segment 的查询
func(qn *QueryNode)parallelSearch(plan *QueryPlan)[]*SearchResult{
segments := qn.getSegments(plan.CollectionID)

// 创建工作池
numWorkers := runtime.NuMCPU()
jobs :=make(chan*Segment,len(segments))
results :=make(chan*SearchResult,len(segments))

// 启动 worker
fori :=0; i < numWorkers; i++ {
gofunc(){
forseg :=rangejobs {
result := qn.searchSegment(seg, plan)
results <- result
}
}()
}

// 分发任务
for_, seg :=rangesegments {
jobs <- seg
}
close(jobs)

// 收集结果
allResults :=make([]*SearchResult,0,len(segments))
fori :=0; i <len(segments); i++ {
allResults =append(allResults, <-results)
}

returnallResults
}

三、向量检索执行

3.1 Segment 内检索

// Segment 的向量检索
func(seg *Segment)Search(query []float32, topKint, filter *Bitset)*SearchResult{
// 1. 应用标量过滤(生成 Bitset)
bitset := seg.applyFilter(filter)

// 2. 向量检索
varresult *SearchResult
ifseg.hasIndex() {
// 使用索引检索
result = seg.index.Search(query, topK, bitset)
}else{
// 暴力搜索
result = seg.bruteForceSearch(query, topK, bitset)
}

// 3. 应用删除过滤
result = seg.filterDeleted(result)

returnresult
}

// HNSW 索引检索
func(idx *HNSWIndex)Search(query []float32, kint, bitset *Bitset)*SearchResult{
// 1. 从入口点开始
ep := idx.entryPoint

// 2. 逐层贪心搜索
forlevel := idx.maxLevel; level >0; level-- {
ep = idx.searchLayer(query, ep,1, level, bitset)
}

// 3. 在第 0 层精确搜索
candidates := idx.searchLayer(query, ep, idx.ef,0, bitset)

// 4. 返回 TopK
returncandidates.TopK(k)
}

// 搜索层
func(idx *HNSWIndex)searchLayer(
query []float32,
entryPoint *Node,
numClosestint,
layerint,
bitset *Bitset,
)[]*Candidate{
visited :=make(map[int64]bool)
candidates := NewMinHeap()
results := NewMaxHeap()

// 初始化
dist := idx.distance(query, entryPoint.Vector)
candidates.Push(&Candidate{Node: entryPoint, Distance: dist})
results.Push(&Candidate{Node: entryPoint, Distance: dist})
visited[entryPoint.ID] =true

// 贪心搜索
forcandidates.Len() >0{
current := candidates.Pop()

// 如果当前点比结果中最远的点还远,停止
ifcurrent.Distance > results.Top().Distance {
break
}

// 扩展邻居
for_, neighbor :=rangecurrent.Node.Neighbors[layer] {
ifvisited[neighbor.ID] {
continue
}
visited[neighbor.ID] =true

// 应用 Bitset 过滤
ifbitset !=nil&& !bitset.Test(neighbor.ID) {
continue
}

dist := idx.distance(query, neighbor.Vector)

ifdist < results.Top().Distance || results.Len() < numClosest {
candidates.Push(&Candidate{Node: neighbor, Distance: dist})
results.Push(&Candidate{Node: neighbor, Distance: dist})

ifresults.Len() > numClosest {
results.Pop()
}
}
}
}

returnresults.ToSlice()
}

3.2 标量过滤

// 标量过滤器
typeScalarFilterstruct{
expr *ExprNode
schema *Schema
}

// 生成 Bitset
func(sf *ScalarFilter)Filter(segment *Segment)*Bitset{
bitset := NewBitset(segment.NumRows)

// 遍历所有行
forrowID :=0; rowID < segment.NumRows; rowID++ {
ifsf.evaluate(segment, rowID) {
bitset.Set(rowID)
}
}

returnbitset
}

// 评估表达式
func(sf *ScalarFilter)evaluate(segment *Segment, rowIDint)bool{
returnsf.evaluateNode(sf.expr, segment, rowID)
}

func(sf *ScalarFilter)evaluateNode(node *ExprNode, segment *Segment, rowIDint)bool{
switchnode.Type {
caseNodeType_AND:
returnsf.evaluateNode(node.Left, segment, rowID) &&
sf.evaluateNode(node.Right, segment, rowID)

caseNodeType_OR:
returnsf.evaluateNode(node.Left, segment, rowID) ||
sf.evaluateNode(node.Right, segment, rowID)

caseNodeType_EQ:
fieldValue := segment.GetFieldValue(node.FieldID, rowID)
returnfieldValue == node.Value

caseNodeType_GT:
fieldValue := segment.GetFieldValue(node.FieldID, rowID)
returnfieldValue.(float64) > node.Value.(float64)

caseNodeType_IN:
fieldValue := segment.GetFieldValue(node.FieldID, rowID)
values := node.Value.([]interface{})
for_, v :=rangevalues {
iffieldValue == v {
returntrue
}
}
returnfalse

default:
returnfalse
}
}

3.3 距离计算优化

// SIMD 优化的距离计算
import"golang.org/x/sys/cpu"

// L2 距离(欧氏距离)
funcL2Distance(a, b []float32)float32{
ifcpu.X86.HasAVX2 {
returnl2DistanceAVX2(a, b)
}
returnl2DistanceScalar(a, b)
}

// AVX2 优化版本
funcl2DistanceAVX2(a, b []float32)float32{
// 使用 AVX2 指令集,一次处理 8 个 float32
// 性能提升 4-8 倍
varsumfloat32
// ... AVX2 实现
returnsum
}

// 标量版本
funcl2DistanceScalar(a, b []float32)float32{
varsumfloat32
fori :=rangea {
diff := a[i] - b[i]
sum += diff * diff
}
returnsum
}

// 余弦相似度
funcCosineSimilarity(a, b []float32)float32{
vardotProduct, normA, normBfloat32

fori :=rangea {
dotProduct += a[i] * b[i]
normA += a[i] * a[i]
normB += b[i] * b[i]
}

returndotProduct / (sqrt(normA) * sqrt(normB))
}

// 内积
funcInnerProduct(a, b []float32)float32{
varsumfloat32
fori :=rangea {
sum += a[i] * b[i]
}
returnsum
}

四、结果聚合与排序

4.1 多 Segment 结果合并

// Proxy 的结果聚合
func(p *Proxy)mergeResults(results []*SearchResult, topKint)*SearchResult{
// 1. 使用最小堆合并多个有序列表
heap := NewMinHeap()

// 2. 初始化:每个结果的第一个元素入堆
fori, result :=rangeresults {
iflen(result.IDs) >0{
heap.Push(&HeapNode{
Distance: result.Distances[0],
ID: result.IDs[0],
ResultIdx: i,
ItemIdx: 0,
})
}
}

// 3. 归并排序
merged := &SearchResult{
IDs: make([]int64,0, topK),
Distances:make([]float32,0, topK),
}

forheap.Len() >0&&len(merged.IDs) < topK {
node := heap.Pop()

// 去重(同一个 ID 可能在多个 Segment 中)
if!merged.Contains(node.ID) {
merged.IDs =append(merged.IDs, node.ID)
merged.Distances =append(merged.Distances, node.Distance)
}

// 将该结果的下一个元素入堆
result := results[node.ResultIdx]
nextIdx := node.ItemIdx +1
ifnextIdx <len(result.IDs) {
heap.Push(&HeapNode{
Distance: result.Distances[nextIdx],
ID: result.IDs[nextIdx],
ResultIdx: node.ResultIdx,
ItemIdx: nextIdx,
})
}
}

returnmerged
}

4.2 分布式结果聚合

// 多 QueryNode 结果聚合
func(p *Proxy)aggregateFromNodes(nodeResultsmap[int64]*SearchResult, topKint)*SearchResult{
// 1. 收集所有结果
allResults :=make([]*SearchResult,0,len(nodeResults))
for_, result :=rangenodeResults {
allResults =append(allResults, result)
}

// 2. 合并结果
merged := p.mergeResults(allResults, topK)

// 3. 填充其他字段(如果需要)
iflen(p.outputFields) >0{
merged = p.fillOutputFields(merged)
}

returnmerged
}

// 填充输出字段
func(p *Proxy)fillOutputFields(result *SearchResult)*SearchResult{
// 批量查询其他字段
ids := result.IDs
fields := p.queryFieldsByIDs(ids, p.outputFields)

result.Fields = fields
returnresult
}

4.3 重排序(Reranking)

frompymilvusimportCollection, AnnSearchRequest, RRFRanker

collection = Collection("documents")

# 多路召回 + 重排序
req1 = AnnSearchRequest(
data=[[0.1] *768],
anns_field="dense_vector",
param={"metric_type":"COSINE"},
limit=100# 召回 100 个
)

req2 = AnnSearchRequest(
data=[[0.2] *1000],
anns_field="sparse_vector",
param={"metric_type":"IP"},
limit=100# 召回 100 个
)

# RRF(Reciprocal Rank Fusion)重排序
results = collection.hybrid_search(
reqs=[req1, req2],
rerank=RRFRanker(k=60), # RRF 参数
limit=10# 最终返回 10 个
)

RRF 算法

// RRF 重排序
funcRRFRerank(results [][]*SearchResult, kint)[]*SearchResult{
// RRF 公式:score(d) = Σ 1 / (k + rank_i(d))
scores :=make(map[int64]float64)

for_, resultList :=rangeresults {
forrank, result :=rangeresultList {
scores[result.ID] +=1.0/float64(k + rank +1)
}
}

// 按分数排序
ranked :=make([]*SearchResult,0,len(scores))
forid, score :=rangescores {
ranked =append(ranked, &SearchResult{
ID: id,
Score: score,
})
}

sort.Slice(ranked,func(i, jint)bool{
returnranked[i].Score > ranked[j].Score
})

returnranked
}

五、查询性能优化

5.1 缓存策略

// QueryNode 的缓存
typeQueryNodeCachestruct{
// Segment 缓存
segmentCache *LRUCache

// 查询结果缓存
resultCache *LRUCache

// 向量缓存
vectorCache *LRUCache
}

// 查询时先检查缓存
func(qn *QueryNode)searchWithCache(req *SearchRequest)*SearchResult{
// 1. 生成缓存 key
cacheKey := qn.generateCacheKey(req)

// 2. 检查结果缓存
ifcached, ok := qn.cache.resultCache.Get(cacheKey); ok {
returncached.(*SearchResult)
}

// 3. 执行查询
result := qn.search(req)

// 4. 缓存结果
qn.cache.resultCache.Put(cacheKey, result)

returnresult
}

5.2 预取(Prefetch)

// 预取下一批 Segment
func(qn *QueryNode)prefetchSegments(currentSegmentIDint64){
// 预测下一个可能访问的 Segment
nextSegments := qn.predictNextSegments(currentSegmentID)

// 异步加载
for_, segID :=rangenextSegments {
gofunc(idint64){
qn.loadSegment(id)
}(segID)
}
}

5.3 批量查询

# 单条查询(慢)
forqueryinqueries:
result = collection.search(data=[query], ...)

# 批量查询(快 10 倍)
batch_size =100
foriinrange(0, len(queries), batch_size):
batch = queries[i:i+batch_size]
results = collection.search(data=batch, ...)

5.4 分区裁剪

# 不指定分区(扫描所有分区)
results = collection.search(
data=[query_vector],
anns_field="embedding",
limit=10
)

# 指定分区(只扫描相关分区)
results = collection.search(
data=[query_vector],
anns_field="embedding",
partition_names=["2024_11"], # 只搜索 11 月的数据
limit=10
)

六、实战案例

6.1 复杂查询示例

frompymilvusimportCollection

collection = Collection("products")

# 复杂过滤 + 向量搜索
results = collection.search(
data=[[0.1] *768],
anns_field="image_embedding",
param={"metric_type":"L2","params": {"nprobe":32}},
limit=20,
expr="""
(price >= 100 and price <= 500) and
category in ['electronics', 'computers'] and
rating > 4.0 and
stock > 0 and
tags array_contains_any ['featured', 'bestseller']
""",
output_fields=["name","price","rating","image_url"]
)

# 处理结果
forhitinresults[0]:
print(f"roduct:{hit.entity.get('name')}")
print(f"rice: ${hit.entity.get('price')}")
print(f"Rating:{hit.entity.get('rating')}")
print(f"Distance:{hit.distance}")
print("---")

6.2 分页查询

# 第一页
page_size =20
results_page1 = collection.search(
data=[query_vector],
anns_field="embedding",
limit=page_size,
offset=0
)

# 第二页
results_page2 = collection.search(
data=[query_vector],
anns_field="embedding",
limit=page_size,
offset=page_size
)

6.3 范围查询

# 查找距离在 [0.5, 1.0] 范围内的向量
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={
"metric_type":"L2",
"params": {"nprobe":32},
"radius":1.0, # 最大距离
"range_filter":0.5# 最小距离
},
limit=100
)

七、总结

核心要点

  1. 查询类型:向量搜索、标量查询、混合查询
  2. 查询优化:谓词下推、索引选择、并行执行
  3. 向量检索:HNSW 图搜索、SIMD 加速
  4. 结果聚合:多 Segment 合并、分布式聚合、重排序
  5. 性能优化:缓存、预取、批量查询、分区裁剪

最佳实践

✅ 使用表达式过滤减少搜索空间 ✅ 批量查询提升吞吐量 ✅ 合理设置 TopK 和搜索参数 ✅ 利用分区裁剪加速查询 ✅ 使用混合搜索提高召回率


关注云与数字化,一起探索向量数据库的技术奥秘!

如果觉得文章有帮助,欢迎点赞、转发、收藏!


参考资料


标签:#Milvus#查询引擎#向量检索#查询优化#HNSW


基于身份的网络安全:Cilium策略执行机制

深入理解Cilium eBPF数据平面架构

Cilium 首次集成国内云服务,阿里云 ENI 被纳入新版本特性

Cilium连接跟踪机制深度解析:网络状态管理的基础

Cilium隧道网络实现深度解析:多集群部署的核心








欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5