链载Ai

标题: RAG—Chunking策略实战|得物技术 [打印本页]

作者: 链载Ai    时间: 4 天前
标题: RAG—Chunking策略实战|得物技术

目录

一、背景

二、什么是分块(Chunking)

三、为何要对内容做分块处理

四、分块策略详解

1. 基础分块

2. 结构感知分块

3. 语义与主题分块

4. 高级分块

5. 混合分块

五、结论


背 景

在 RAG 系统中,即便采用性能卓越的 LLM 并反复打磨 Prompt,问答仍可能出现上下文缺失、事实性错误或拼接不连贯等问题。多数团队会频繁更换检索算法与 Embedding模型,但收益常常有限。真正的瓶颈,往往潜伏在数据入库之前的一个细节——文档分块(chunking)。不当的分块会破坏语义边界,拆散关键线索并与噪声纠缠,使被检索的片段呈现“顺序错乱、信息残缺”的面貌。在这样的输入下,再强大的模型也难以基于支离破碎的知识推理出完整、可靠的答案。某种意义上,分块质量几乎决定了RAG的性能上限——它决定知识是以连贯的上下文呈现,还是退化为无法拼合的碎片。


在实际场景中,最常见的错误是按固定长度生硬切割,忽略文档的结构与语义:定义与信息被切开、表头与数据分离、步骤说明被截断、代码与注释脱节,结果就是召回命中却无法支撑结论,甚至诱发幻觉与错误引用。相反,高质量的分块应尽量贴合自然边界(标题、段落、列表、表格、代码块等),以适度重叠保持上下文连续,并保留必要的来源与章节元数据,确保可追溯与重排可用。当分块尊重文档的叙事与结构时,检索的相关性与答案的事实一致性往往显著提升,远胜于一味更换向量模型或调参;换言之,想要真正改善 RAG 的稳健性与上限,首先要把“知识如何被切开并呈现给模型”这件事做好。


PS:本文主要是针对中文文档类型的嵌入进行实战。

什么是分块(Chunking)

分块是将大块文本分解成较小段落的过程,这使得文本数据更易于管理和处理。通过分块,我们能够更高效地进行内容嵌入(embedding),并显著提升从向量数据库中召回内容的相关性和准确性。


在实际操作中,分块的好处是多方面的。首先,它能够提高模型处理的效率,因为较小的文本段落更容易进行嵌入和检索。


其次,分块后的文本能够更精确地匹配用户查询,从而提供更相关的搜索结果。这对于需要高精度信息检索和内容生成的应用程序尤为重要。


通过优化内容的分块和嵌入策略,我们可以最大化LLM在各种应用场景中的性能。分块技术不仅提高了内容召回的准确性,还提升了整体系统的响应速度和用户体验。


因此,在构建和优化基于LLM的应用程序时,理解和应用分块技术是不可或缺的步骤。


分块过程中主要的两个概念:chunk_size块的大小,chunk_overlap重叠窗口。


为何要对内容做分块处理


总之理想的分块是在“上下文完整性”和“信息密度”之间取得动态平衡:chunk_size决定信息承载量,chunk_overlap用于弥补边界断裂并维持语义连续。只要边界对齐语义、粒度贴合内容,检索与生成的质量就能提升。


分块策略详解

基础分块

基于固定长度分块

from langchain_text_splitters import CharacterTextSplitter
splitter = CharacterTextSplitter( separator="", # 纯按长度切 chunk_size=600, # 依据实验与模型上限调整 chunk_overlap=90, # 15% 重叠)chunks = splitter.split_text(text)


基于句子的分块

importre
defsplit_sentences_zh(text:str): # 在句末标点(。!?;)后面带可选引号的场景断句 pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)') sentences = [m.group(0).strip()forminpattern.finditer(text)ifm.group(0).strip()] returnsentences
defsentence_chunk(text:str, chunk_size=600, overlap=80): sents = split_sentences_zh(text) chunks, buf = [],"" forsinsents: iflen(buf) +len(s) <= chunk_size: buf += s else: ifbuf: chunks.append(buf) # 简单重叠:从当前块尾部截取 overlap 字符与下一句拼接 buf = (buf[-overlap:]ifoverlap >0andlen(buf) > overlapelse"") + s ifbuf: chunks.append(buf) returnchunks
chunks = sentence_chunk(text, chunk_size=600, overlap=90)

HanLP 分句示例:

fromhanlp_common.constantimportROOTimporthanlp
tokenizer = hanlp.load('KU_NAME_MERGED_SIX_MONTHS_CONVSEG') # 或句法/句子级管线# HanLP 高层 API 通常通过句法/语料管线获得句子边界,具体以所用版本 API 为准# 将句子列表再做聚合为 chunk_size


基于递归字符分块

importrefromlangchain_text_splittersimportRecursiveCharacterTextSplitter
separators = [ r"\n#{1,6}\s", # 标题 r"\n\d+(?:\.\d+)*\s", # 数字编号标题 1. / 2.3. 等 "\n\n", # 段落 "\n", # 行 " ", # 空格 "", # 兜底字符级]splitter = RecursiveCharacterTextSplitter( separators=separators, chunk_size=700, chunk_overlap=100, is_separator_regex=True, # 告诉分割器上面包含正则)chunks = splitter.split_text(text)

总结





结构感知分块

利用文档固有结构(标题层级、列表、代码块、表格、对话轮次)作为分块边界,逻辑清晰、可追溯性强,能在保证上下文完整性的同时提升检索信噪比。


结构化文本分块



importrefromtypingimportList,Dict
heading_pat = re.compile(r'^(#{1,6})\s+(.*)$') # 标题fence_pat = re.compile(r'^```') # fenced code fence
defsplit_markdown_structure(text:str, chunk_size=900, min_chunk=250, overlap_ratio=0.1) ->List[Dict]: lines = text.splitlines() sections = [] in_code =False current = {"level":0,"title":"","content": [],"path": []} path_stack = [] # [(level, title)] forlninlines: iffence_pat.match(ln): in_code =notin_code m = heading_pat.match(ln)ifnotin_codeelseNone ifm: ifcurrent["content"]: sections.append(current) level =len(m.group(1)) title = m.group(2).strip()
whilepath_stackandpath_stack[-1][0] >= level: path_stack.pop() path_stack.append((level, title)) breadcrumbs = [tfor_, tinpath_stack] current = {"level": level,"title": title,"content": [],"path": breadcrumbs} else: current["content"].append(ln) ifcurrent["content"]: sections.append(current) # 通过二次拆分/合并将部分平铺成块 chunks = [] defemit_chunk(text_block:str, pathist[str], level:int): chunks.append({ "text": text_block.strip(), "meta": { "section_title": path[-1]ifpathelse"", "breadcrumbs": path, "section_level": level, } }) forsecinsections: raw ="\n".join(sec["content"]).strip() ifnotraw: continue iflen(raw) <= chunk_size: emit_chunk(raw, sec["path"], sec["level"]) else: paras = [p.strip()forpinraw.split("\n\n")ifp.strip()] buf ="" forpinparas: iflen(buf) +len(p) +2<= chunk_size: buf += (("\n\n"+ p)ifbufelsep) else: ifbuf: emit_chunk(buf, sec["path"], sec["level"]) buf = p ifbuf: emit_chunk(buf, sec["path"], sec["level"]) merged = [] forchinchunks: ifnotmerged: merged.append(ch) continue iflen(ch["text"]) < min_chunkandmerged[-1]["meta"]["breadcrumbs"] == ch["meta"]["breadcrumbs"]: merged[-1]["text"] +="\n\n"+ ch["text"] else: merged.append(ch) overlap =int(chunk_size * overlap_ratio) forchinmerged: bc =" > ".join(ch["meta"]["breadcrumbs"][-3:]) prefix =f"[{bc}]\n"ifbcelse"" ifprefixandnotch["text"].startswith(prefix): ch["text"] = prefix + ch["text"] # optional character overlap can在检索阶段用邻接聚合替代,这里略 returnmerged


对话式分块





fromtypingimportList,Dict
defchunk_dialogue(turnsist[Dict], max_turns=10, max_chars=900, overlap_turns=2): """ turns: [{"speaker":"User","text":"..." , "ts_start":123, "ts_end":130}, ...] """ chunks = [] i =0 whilei <len(turns): j = i char_count =0 speakers =set() whilej <len(turns): t = turns[j] uttr_len =len(t["text"]) # 若单条超长,允许在句级二次切分(此处略),但不跨 speaker if(j - i +1) > max_turnsor(char_count + uttr_len) > max_chars: break char_count += uttr_len speakers.add(t["speaker"]) j +=1 ifj > i: window = turns[i:j] elifi <len(turns): window = [turns[i]] else: break text ="\n".join([f'{t["speaker"]}:{t["text"]}'fortinwindow]) meta = { "speakers":list(speakers), "turns_range": (i, j -1), "ts_start": window[0].get("ts_start"), "ts_end": window[-1].get("ts_end"), } chunks.append({"text": text,"meta": meta}) # 按轮次重叠回退 ifj >=len(turns): break next_start = i +len(window) - overlap_turns i =max(next_start, i +1) # 确保至少前进1步 returnchunks


总结


语义与主题分块

该方法不依赖文档的物理结构,而是依据语义连续性与话题转移来决定切分点,尤其适合希望“块内高度内聚、块间清晰分界”的知识库与研究类文本。


语义分块





fromtypingimportList,Dict,Tupleimportnumpyasnpfromsentence_transformersimportSentenceTransformerimportre
defsplit_sentences_zh(text:str) ->List[str]: # 简易中文分句,可替换为 HanLP/Stanza 更稳健的实现 pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)') return[m.group(0).strip()forminpattern.finditer(text)ifm.group(0).strip()]
defrolling_mean(vecs: np.ndarray, i:int, w:int) -> np.ndarray: s =max(0, i - w) e =min(len(vecs), i + w +1) returnvecs[s:e].mean(axis=0)
defsemantic_chunk( text:str, model_name:str="BAAI/bge-m3", window_size:int=2, min_chars:int=350, max_chars:int=1100, lambda_std:float=0.8, overlap_chars:int=80,) ->List[Dict]: sents = split_sentences_zh(text) ifnotsents: return[] model = SentenceTransformer(model_name) emb = model.encode(sents, normalize_embeddings=True, batch_size=64, show_progress_bar=False) emb = np.asarray(emb) # 基于窗口均值的“新颖度”分数 novelties = [] foriinrange(len(sents)): ref = rolling_mean(emb, i-1, window_size)ifi >0elseemb[0] ref = ref / (np.linalg.norm(ref) +1e-8) novelty =1.0-float(np.dot(emb[i], ref)) novelties.append(novelty) novelties = np.array(novelties) # 相对阈值:μ + λσ mu, sigma =float(novelties.mean()),float(novelties.std() +1e-8) threshold = mu + lambda_std * sigma chunks, buf, start_idx = [],"",0 defflush(end_idx:int): nonlocalbuf, start_idx ifbuf.strip(): chunks.append({ "text": buf.strip(), "meta": {"start_sent": start_idx,"end_sent": end_idx-1} }) buf, start_idx ="", end_idx fori, sinenumerate(sents): # 若超长则先冲洗 iflen(buf) +len(s) > max_charsandlen(buf) >= min_chars: flush(i) # 结构化重叠:附加上一个块的尾部 ifoverlap_chars >0andlen(s) < overlap_chars: buf = s continue buf += s # 达到最小长度后遇到突变则切分 iflen(buf) >= min_charsandnovelties[i] > threshold: flush(i +1) ifbuf: flush(len(sents)) returnchunks


主题的分块




fromtypingimportList,Dictimportnumpyasnpfromsentence_transformersimportSentenceTransformerfromsklearn.clusterimportKMeansimportre
defsplit_sentences_zh(text:str) ->List[str]: pattern = re.compile(r'([^。!?;]*[。!?;]+|[^。!?;]+$)') return[m.group(0).strip()forminpattern.finditer(text)ifm.group(0).strip()]
deftopic_chunk( text:str, k_topics:int=5, min_chars:int=500, max_chars:int=1400, smooth_window:int=2, model_name:str="BAAI/bge-m3") ->List[Dict]: sents = split_sentences_zh(text) ifnotsents: return[] model = SentenceTransformer(model_name) emb = model.encode(sents, normalize_embeddings=True, batch_size=64, show_progress_bar=False) emb = np.asarray(emb) km = KMeans(n_clusters=k_topics, n_init="auto", random_state=42) labels = km.fit_predict(emb) # 简单序列平滑:滑窗多数投票 smoothed = labels.copy() foriinrange(len(labels)): s =max(0, i - smooth_window) e =min(len(labels), i + smooth_window +1) window = labels[s:e] vals, counts = np.unique(window, return_counts=True) smoothed[i] =int(vals[np.argmax(counts)]) chunks, buf, start_idx, cur_label = [],"",0, smoothed[0] defflush(end_idx:int): nonlocalbuf, start_idx ifbuf.strip(): chunks.append({ "text": buf.strip(), "meta": {"start_sent": start_idx,"end_sent": end_idx-1,"topic":int(cur_label)} }) buf, start_idx ="", end_idx fori, sinenumerate(sents): switched = smoothed[i] != cur_label over_max =len(buf) +len(s) > max_chars under_min =len(buf) < min_chars # 尝试延后切分,保证最小块长 ifswitchedandnotunder_min: flush(i) cur_label = smoothed[i] ifover_maxandnotunder_min: flush(i) buf += s ifbuf: flush(len(sents)) returnchunks


高级分块

小-大分块



# 离线:构建小块索引,并保存 parent_id -> 大块文本 的映射# 在线检索:small_hits = small_index.search(embed(query), top_k=30)groups = group_by_parent(small_hits)scored_parents = score_groups(groups, agg="max")candidates = top_m(scored_parents, m=3)
# 交叉编码重排rerank_inputs = [(query, parent_text(pid)) for pid in candidates]reranked = cross_encoder_rerank(rerank_inputs)
# 组装上下文:对每个父块,仅保留命中句及其邻近窗口,并加上标题路径contexts = []for pid, _ in reranked: hits = groups[pid] context = build_local_window(parent_text(pid), hits, window_sents=1) contexts.append(prefix_with_breadcrumbs(pid) + context)
final_context = pack_under_budget(contexts, token_budget=3000) # 留出回答空间

父子段分块






fromtypingimportList,Dict,Tupleimportnumpyasnpfromsentence_transformersimportSentenceTransformer

embedder = SentenceTransformer("BAAI/bge-m3")
defsearch_parent_child(query:str, top_k_child=40, top_m_parent=3, window_chars=180): q = embedder.encode([query], normalize_embeddings=True)[0] hits = small_index.search(q, top_k=top_k_child) # 返回 [(child_id, score), ...] # 分组 groupsict[str,List[Tuple[str,float]]] = {} forcid, scoreinhits: p = child_parent_id[cid] groups.setdefault(p, []).append((cid,float(score))) # 聚合打分(max + coverage) scored = [] forpid, itemsingroups.items(): scores = np.array([sfor_, sinitems]) agg =0.7* scores.max() +0.3* (len(items) / (len(parents[pid]["sent_spans"]) +1e-6)) scored.append((pid,float(agg))) scored.sort(key=lambdax: x[1], reverse=True) candidates = [pidforpid, _inscored[:top_m_parent]] # 为每个父块构造“命中窗口” contexts = [] forpidincandidates: ptext = parents[pid]["text"] # 找到子块命中区间并合并窗口 spans =sorted([(children[cid]["start"], children[cid]["end"])forcid, _ingroups[pid]]) merged = [] fors, einspans: s =max(0, s - window_chars) e =min(len(ptext), e + window_chars) ifnotmergedors > merged[-1][1] +50: merged.append([s, e]) else: merged[-1][1] =max(merged[-1][1], e) windows = [ptext[s:e]fors, einmerged] prefix =" > ".join(parents[pid]["meta"].get("breadcrumbs", [])[-3:]) contexts.append((pid,f"[{prefix}]\n"+"\n...\n".join(windows))) # 交叉编码重排(此处用占位函数) reranked = cross_encoder_rerank(query, [c[1]forcincontexts]) # 返回 indices 顺序 ordered = [contexts[i]foriinreranked] returnordered # [(parent_id, context_text), ...]


代理式分块




系统:你是分块器。目标:为RAG检索创建高内聚、可追溯的块。规则:1) 不得在代码/表格/公式中间切分;2) 每块400-1000字;3) 保持标题路径完整;4) 尽量让“定义+解释”在同一块;5) 输出JSON,含start_offset/end_offset/title_path。
用户:<文档片段文本>助手(示例输出):{"segments": [ {"start":0,"end":812,"title_path": ["指南","安装"],"reason":"完整步骤+注意事项"}, {"start":813,"end":1620,"title_path": ["指南","配置"],"reason":"参数表与示例紧密相关"} ]}


混合分块

单一策略难覆盖所有文档与场景。混合分块通过“先粗后细、按需细化”,在效率、可追溯性与答案质量之间取得稳健平衡。






fromtypingimportList,Dict
defhybrid_chunk( doc_text:str, parse_structure, # 函数:返回 [{'type': 'text|code|table|dialogue', 'text': str, 'breadcrumbs': [...], 'anchor': str}] recursive_splitter, # 函数:text -> [{'text': str}] sentence_splitter, # 函数:text -> [{'text': str}] semantic_splitter, # 函数:text -> [{'text': str}] dialogue_splitter, # 函数:turns(list) -> [{'text': str}],若无对话则忽略 max_coarse_len:int=1100, min_chunk_len:int=320, target_len:int=750, overlap_ratio:float=0.1,) ->List[Dict]: """ 返回格式: [{'text': str, 'meta': {...}}] """ blocks = parse_structure(doc_text) # 先拿到结构块 chunksist[Dict] = [] defemit(t:str, meta_baseict): t = t.strip() ifnott: return # 结构重叠前缀(标题路径) bc =" > ".join(meta_base.get("breadcrumbs", [])[-3:]) prefix =f"[{bc}]\n"ifbcelse"" chunks.append({ "text": (prefix + t)ifnott.startswith(prefix)elset, "meta": meta_base }) forbinblocks: t = b["text"] btype = b.get("type","text") # 原子块:代码/表格 ifbtypein{"code","table","formula"}: emit(t, {**b,"splitter":"atomic"}) continue # 对话块 ifbtype =="dialogue": forckindialogue_splitter(b.get("turns", [])): emit(ck["text"], {**b,"splitter":"dialogue"}) continue # 普通文本:依据长度与“可读性”启用不同细分器 iflen(t) <= max_coarse_len: # 中短文本:递归 or 句子 sub = recursive_splitter(t) # 合并过短子块 buf ="" forsinsub: txt = s["text"] iflen(buf) +len(txt) < min_chunk_len: buf += txt else: emit(bufortxt, {**b,"splitter":"recursive"}) buf =""ifbufelse"" ifbuf: emit(buf, {**b,"splitter":"recursive"}) else: # 超长文本:语义分块优先 forckinsemantic_splitter(t): emit(ck["text"], {**b,"splitter":"semantic"}) # 轻量字符重叠(可选) ifoverlap_ratio >0: overlapped = [] fori, chinenumerate(chunks): overlapped.append(ch) ifi +1<len(chunks)andch["meta"].get("breadcrumbs") == chunks[i+1]["meta"].get("breadcrumbs"): # 在相邻同章节块间引入小比例重叠 ov =int(len(ch["text"]) * overlap_ratio) ifov >0: head = ch["text"][-ov:] chunks[i+1]["text"] = head + chunks[i+1]["text"] chunks = overlapped returnchunks

结论







欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5