WeKnora 是腾讯开源的一套生产级 RAG 框架,定位非常明确:解决真实业务场景下“文档复杂、类型多样、规模可控但质量要求极高”的知识增强问题。社区中有人将其视为 ima 的开源实现之一,虽然这一说法无从官方考证,但可以确定的是,WeKnora 在工程完整度、边界处理和异常降级策略上,是一套经过实战打磨的系统方案。
从文档接入、解析、切分、向量化、多模态增强,到知识图谱、问题生成与摘要生成,WeKnora 几乎覆盖了一个完整 RAG 系统在生产环境中可能遇到的所有关键问题。尤其是在文档解析与 Chunk 构建这一最容易被低估、却最影响检索与生成质量的环节,WeKnora 给出了一套相当成熟且可复用的设计。
本文将聚焦 WeKnora 的文档接入与解析体系,从文件/URL/手动创建三种入口开始,深入拆解其解析器架构、Markdown 统一中间表示、语义切分策略、多模态处理,以及最终如何构建可用于检索与推理的 Chunk 数据结构。
上传文件 → 计算Hash去重 → 存储文件 → 异步解析
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// Service 层 - 核心逻辑func (s *knowledgeService) CreateKnowledgeFromFile(ctx context.Context, kbID string,file *multipart.FileHeader, metadata map[string]string, enableMultimodel *bool, customFileName string,) (*types.Knowledge, error) {// 1. 计算文件 Hash(去重检测)fileContent, _ := file.Open()fileHash := calculateFileHash(fileContent)// 2. 检查是否已存在相同文件existing, _ := s.repo.FindByFileHash(ctx, kbID, fileHash)if existing != nil {return existing, &types.DuplicateKnowledgeError{ExistingID: existing.ID,Message: "相同文件已存在",}}// 3. 上传文件到存储服务(MinIO/COS)filePath, _ := s.storage.Upload(fileContent, fileName)// 4. 创建 Knowledge 记录knowledge := &types.Knowledge{ID: uuid.New().String(),Type: "file",FileName: fileName,FileType: getFileType(fileName), // pdf, docx, xlsx...FileHash: fileHash,FilePath: filePath,FileSize: file.Size,ParseStatus: types.ParseStatusPending, // 等待异步处理Metadata: metadata,}s.repo.CreateKnowledge(ctx, knowledge)// 5. 入队异步解析任务payload := types.DocumentProcessPayload{KnowledgeID: knowledge.ID,EnableMultimodel: enableMultimodel,}task := asynq.NewTask(types.TypeDocumentProcess, payload)s.task.Enqueue(task)return knowledge, nil}
接收URL → URL去重 → 异步抓取解析
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// Service 层 - 核心逻辑func (s *knowledgeService) CreateKnowledgeFromURL(ctx context.Context,kbID, url string, enableMultimodel *bool, title string,) (*types.Knowledge, error) {// 1. 验证 URL 格式if !isValidURL(url) {return nil, errors.NewBadRequestError("无效的URL格式")}// 2. 检查是否已存在相同 URLexisting, _ := s.repo.FindBySourceURL(ctx, kbID, url)if existing != nil {return existing, &types.DuplicateKnowledgeError{ExistingID: existing.ID,Message: "相同URL已存在",}}// 3. 创建 Knowledge 记录knowledge := &types.Knowledge{ID: uuid.New().String(),Type: "url",SourceURL: url,Title: title,FileName: extractFileNameFromURL(url), // 从 URL 提取文件名FileType: "html",ParseStatus: types.ParseStatusPending, // 等待异步处理}s.repo.CreateKnowledge(ctx, knowledge)// 4. 入队异步抓取任务payload := types.DocumentProcessPayload{KnowledgeID: knowledge.ID,SourceURL: url, // 传递 URL 供异步任务抓取EnableMultimodel: enableMultimodel,}task := asynq.NewTask(types.TypeDocumentProcess, payload)s.task.Enqueue(task)return knowledge, nil}
接收Markdown内容 → 无去重 → 同步处理
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// Service 层 - 核心逻辑(同步处理)func (s *knowledgeService) CreateKnowledgeFromManual(ctx context.Context,kbID string, req *types.ManualKnowledgePayload,) (*types.Knowledge, error) {// 1. 创建 Knowledge 记录knowledge := &types.Knowledge{ID: uuid.New().String(),Type: "manual",Title: req.Title,FileName: req.Title + ".md",FileType: "md",ParseStatus: types.ParseStatusProcessing, // 直接开始处理}s.repo.CreateKnowledge(ctx, knowledge)// 2. 同步解析 Markdown 内容(无需异步任务)chunks := s.parseMarkdown(req.Content)// 3. 同步处理 Chunks(生成 embedding、索引)s.processChunks(ctx, kb, knowledge, chunks)// 4. 更新状态为完成knowledge.ParseStatus = types.ParseStatusCompleteds.repo.UpdateKnowledge(ctx, knowledge)return knowledge, nil}
尝试多个解析器,直到第一个成功
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineclass FirstParser(BaseParser):_parser_cls: Tuple[Type["BaseParser"], ...] = ()def parse_into_text(self, content: bytes) -> Document:"""顺序尝试每个解析器"""for p in self._parsers:try:document = p.parse_into_text(content)if document.is_valid():return document # ← 第一个成功就返回except Exception:continue # ← 失败就继续下一个return Document() # ← 都失败就返回空
多个解析器串联处理,前一个的输出是后一个的输入
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineclass PipelineParser(BaseParser):_parser_cls: Tuple[Type["BaseParser"], ...] = ()def parse_into_text(self, content: bytes) -> Document:"""依次调用每个解析器,累积图片"""images: Dict[str, str] = {}document = Document()for p in self._parsers:document = p.parse_into_text(content)content = endecode.encode_bytes(document.content) # ← 转换为下一个解析器的输入images.update(document.images) # ← 累积图片document.images.update(images)return document
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 尝试顺序:# 1. MinerUParser (主解析器)# 2. MarkitdownParser (备选解析器)class PDFParser(FirstParser):_parser_cls = (MinerUParser, MarkitdownParser)# 管道流程:# PDF → StdMinerUParser (调用 MinerU API)# → MarkdownTableFormatter (格式化表格)# → 最终 Documentclass MinerUParser(PipelineParser):_parser_cls = (StdMinerUParser, MarkdownTableFormatter)class MarkitdownParser(PipelineParser):"""使用管道处理模式的 Markdown 解析器数据流:PDF 字节流↓StdMarkitdownParser (第一阶段)├─ 调用 MarkItDown 库解析├─ 返回 Markdown 文本 + 内嵌数据 URI└─ Document(content, images={})↓MarkdownParser (第二阶段)├─ 处理 Markdown 内容├─ 提取数据 URI 中的图片├─ 上传到存储└─ 返回最终 Document(content, images)"""_parser_cls = (StdMarkitdownParser, MarkdownParser)# 两个解析器按顺序管道处理
优先使用 MinerU 将文件转换成 markdown 格式,这里需要用户自行配置 MinerU token 才能正常使用。降级采用微软开源框架 MarkItDown,将 pdf 转换成 markdown 格式,MarkItDown 插件处理不了 pdf 扫描件。转换成 markdown 之后,对 markdown 进行初步格式化处理:
ounter(lineounter(lineounter(lineounter(lineounter(line# 尝试顺序:# 1. MarkitdownParser (主解析器,与 pdf 备选方案一致)# 2. DocxParser (备选解析器)class Docx2Parser(FirstParser):_parser_cls = (MarkitdownParser, DocxParser)
降级策略 DocxParser 采用 python-docx,最终输出 markdown 格式的文本内容。处理流程如下:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 过滤小图片(装饰元素)if image.width < 50 or image.height < 50:return None# 缩放大图片if image.width > max_image_size:image = image.resize((new_width, new_height))
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef _convert_table_to_html(self, table):html = "<table>"for r in table.rows:html += "<tr>"for c in r.cells:# 处理合并单元格html += f"<td colspan='{span}'>{c.text}</td>"html += "</tr>"html += "</table>"return html
ounter(lineounter(lineounter(lineounter(line@dataclassclass DocumentModel:content: str # Markdown 格式文本(含图片链接)images: Dict[str, str] # {url: base64_data}
doc 文档处理优先使用 _parse_with_docx 方法,采用 LibreOffice 库将 .doc 文件转换为 .docx 格式,再按照 Docx2Parser 流程解析文件。
若 _parse_with_docx 方法失败,将会使用降级方案 _parse_with_antiword,采用 antiword 库直接提取 .doc 文本内容。antiword 只能提取纯文本,不支持图片和表格的提取。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineclass DocParser(Docx2Parser):"""DOC document parser"""def parse_into_text(self, content: bytes) -> Document:logger.info(f"Parsing DOC document, content size: {len(content)} bytes")handle_chain = [# 1. Try to convert to docx format to extract imagesself._parse_with_docx,# 2. If image extraction is not needed or conversion failed,# try using antiword to extract textself._parse_with_antiword,# 3. If antiword extraction fails, use textract# NOTE: _parse_with_textract is disabled due to SSRF vulnerability# self._parse_with_textract,]
使用 pandas 库读取 csv 文件,将其转换为 DataFrame 格式,再将 DataFrame 转换为文本内容。会默认将 DataFrame 的第一行作为表头,将其他行作为数据行进行组合。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# Read CSV content into a pandas DataFrame, skipping malformed linesdf = pd.read_csv(BytesIO(content), on_bad_lines="skip")# Process each row in the DataFramefor i, (idx, row) in enumerate(df.iterrows()):# Format row as "column: value" pairs separated by commascontent_row = (",".join(f"{col.strip()}: {str(row[col]).strip()}" for col in df.columns)+ "\n")# Update end position for this chunkend += len(content_row)text.append(content_row)# Create a chunk for this row with position trackingchunks.append(Chunk(content=content_row, seq=i, start=start, end=end))# Update start position for next chunkstart = endreturn Document(content="".join(text),chunks=chunks,)
最终输出格式示例如下:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineDocument(content="姓名: 张三,年龄: 25,城市: 北京\n姓名: 李四,年龄: 30,城市: 上海\n...",chunks=[Chunk(content="姓名: 张三,年龄: 25,城市: 北京\n", seq=0, start=0, end=25),Chunk(content="姓名: 李四,年龄: 30,城市: 上海\n", seq=1, start=25, end=50),Chunk(content="姓名: 王五,年龄: 28,城市: 深圳\n", seq=2, start=50, end=75),])
Tips: 这种默认第一行为表头的策略能够最大程度的保持 csv 文档的结构。但对于第一行不是表头的情况,也会有混乱语义的危险。
使用 openpyxl 库读取 excel 文件,将每个工作表转换为文本内容。与 csv 处理策略相同,会默认将每个工作表的第一行作为表头,将其他行作为数据行进行组合。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# Load Excel file from bytes into pandas ExcelFile objectexcel_file = pd.ExcelFile(BytesIO(content))# Process each sheet in the Excel filefor excel_sheet_name in excel_file.sheet_names:# Parse the sheet into a DataFramedf = excel_file.parse(sheet_name=excel_sheet_name)# Remove rows where all values are NaN (completely empty rows)df.dropna(how="all", inplace=True)# Process each row in the DataFramefor _, row in df.iterrows():page_content = []# Build key-value pairs for non-null valuesfor k, v in row.items():if pd.notna(v): # Skip NaN/null valuespage_content.append(f"{k}: {v}")# Skip rows with no valid contentif not page_content:continue# Format row as comma-separated key-value pairscontent_row = ",".join(page_content) + "\n"end += len(content_row)text.append(content_row)# Create a chunk for this row with position trackingchunks.append(Chunk(content=content_row, seq=len(chunks), start=start, end=end))start = end# Combine all text and return as Documentreturn Document(content="".join(text), chunks=chunks)
Tips: pandas 处理 excel 文档,如果遇见多级表头,多个合并单元格的复杂文档,会丢失结构信息,导致语义混乱。
图片处理很简单,上传图片到存储服务, 将返回的图片 URL 转换为 Markdown 图片语法,并生成 base64 编码。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef parse_into_text(self, content: bytes) -> Document:logger.info(f"Parsing image content, size: {len(content)} bytes")# Get file extensionext = os.path.splitext(self.file_name)[1].lower()# Upload image to storageimage_url = self.storage.upload_bytes(content, file_ext=ext)logger.info(f"Successfully uploaded image, URL: {image_url[:50]}...")# Generate markdown texttext = f""images = {image_url: base64.b64encode(content).decode()}# Create image object and add to mapreturn Document(content=text, images=images)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 管道流程:# URL → StdWebParser (获取网页内容)# → MarkdownParser (格式化 Markdown 内容)# → 最终 Documentclass WebParser(PipelineParser):_parser_cls = (StdWebParser, MarkdownParser)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineasync def scrape(self, url: str) -> str:async with async_playwright() as p:# 配置代理(可选)kwargs = {}if self.proxy:kwargs["proxy"] = {"server": self.proxy}# 启动 WebKit 浏览器browser = await p.webkit.launch(**kwargs)page = await browser.new_page()# 访问页面,30秒超时await page.goto(url, timeout=30000)# 获取完整 HTMLcontent = await page.content()await browser.close()return content
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef parse_into_text(self, content: bytes) -> Document:url = endecode.decode_bytes(content)# 抓取 HTMLchtml = asyncio.run(self.scrape(url))# 提取正文,转为 Markdownmd_text = extract(chtml,output_format="markdown", # 输出格式with_metadata=True, # 保留元数据include_images=True, # 保留图片include_tables=True, # 保留表格include_links=True, # 保留链接)return Document(content=md_text)
通过 StdWebParser 解析器,输出 markdown 文档,包含元数据(如标题、作者、发布日期等)和图片、表格、链接等。进入后续 MarkdownParser 解析器,将 markdown 文档转换为 Document 格式。
以下是对文档处理的核心方案,因为以上所有文档类型都是先转换成对应的 Markdown 文档结构,再进行统一处理。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line原始 Markdown 文本↓TextSplitter.split_text(text)├─ Step 1: _split(text)│ └─ 递归分割,保证每个 split ≤ chunk_size│├─ Step 2: _split_protected(text)│ └─ 提取所有受保护内容的位置和范围│├─ Step 3: _join(splits, protect)│ └─ 合并 splits 和 protected,保证受保护内容完整│└─ Step 4: _merge(splits)├─ 合并 splits 成为最终的 chunks├─ 处理 overlap(重叠)└─ 返回 List[Tuple[start, end, text]]返回:List[(start_pos, end_pos, chunk_text), ...]
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef split_text(self, text: str) -> List[Tuple[int, int, str]]:if text == "":return []# Step 1: Split text by separators recursivelysplits = self._split(text)# Step 2: Extract protected content positionsprotect = self._split_protected(text)# Step 3: Merge splits with protected content to ensure integritysplits = self._join(splits, protect)# Verify that joining all splits reconstructs the original textassert "".join(splits) == text# Step 4: Merge splits into final chunks with overlapchunks = self._merge(splits)return chunks
受保护的 Regex 模式(6 种),分别是LaTeX 数学公式,Markdown 图片链接,Markdown 普通链接,Markdown 表格表头(Header + 分隔行),Markdown 表格内容,代码块头(带语言标识),对于这些场景不进行截断
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineprotected_regex: List[str] = [# math formula - LaTeX style formulas enclosed in $$r"\$\$[\s\S]*?\$\$",# image - Markdown image syntax r"!\[.*?\]\(.*?\)",# link - Markdown link syntax [text](url)r"\[.*?\]\(.*?\)",# table header - Markdown table header with separator liner"(?:\|[^|\n]*)+\|[\r\n]+\s*(?:\|\s*:?-{3,}:?\s*)+\|[\r\n]+",# table body - Markdown table rowsr"(?:\|[^|\n]*)+\|[\r\n]+",# code header - Code block start with language identifierr"```(?:\w+)[\r\n]+[^\r\n]*",],
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 语义规则self._split_fns = [split_by_sep("\n"), # [1] 换行符(优先级最高)split_by_sep("。"), # [2] 句号split_by_sep(" "), # [3] 空格split_by_char() # [最后] 逐字分割(fallback)]
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef _split(self, text: str) -> List[str]:"""递归分割,确保每个 split ≤ chunk_size"""# 基础情况:文本已足够小if len(text) <= chunk_size:return [text]# 递归情况:使用分隔符分割splits = []# 按优先级尝试各个分隔符for split_fn in self._split_fns: # 5 个函数splits = split_fn(text)# 如果这个分隔符能分割出多个部分,就使用它if len(splits) > 1:break# 如果上面的都没有分割成功,最后用 split_by_char() 逐字分割# (这通常不会发生,除非整个文本没有任何分隔符)new_splits = []for split in splits:if len(split) <= chunk_size:new_splits.append(split)else:# 递归处理超大的分割块new_splits.extend(self._split(split))return new_splits
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line假设:chunk_size = 200,文本长度 = 1000第1次尝试:用 "\n" 分割├─ 如果能分割成多个部分,检查是否都 ≤ 200├─ 如果有部分 > 200,对这些部分递归调用 _split()└─ 递归中再用 "\n",如果还是有 > 200 的└─ 换成 "。" 试试第2次尝试:用 "。" 分割├─ 可能会分割得更细└─ 继续检查第3次尝试:用 " " (空格) 分割├─ 分割得更细最后:split_by_char()├─ 按单个字符分割└─ 确保没有分割块 > chunk_size
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef _split_protected(self, text: str) -> List[Tuple[int, str]]:"""扫描所有 protected_regex,找到所有受保护内容返回:[(start_pos, protected_text), ...]"""# 步骤1: 使用所有 protected 模式进行匹配matches = [(match.start(), match.end())for pattern in self._protected_fns # 6 个 regex patternfor match in pattern.finditer(text)]# 步骤2: 按开始位置排序,处理重叠matches.sort(key=lambda x: (x[0], -x[1]))# 排序规则:# - 按 start_pos 升序(从前到后)# - 如果 start_pos 相同,按 length 降序(长的优先)# 步骤3: 使用 accumulate 过滤重叠的匹配res = []initial = -1 # 上一个匹配的结束位置for current_start, current_end in matches:# 只处理不与前面匹配重叠的内容if current_start >= initial:# 只保留 < chunk_size 的受保护内容if current_end - current_start < chunk_size:res.append((current_start, text[current_start:current_end]))else:logger.warning(f"Protected text ignore: {text[current_start:current_end]}")# 如果受保护内容本身 > chunk_size,忽略(记录警告)# 更新 initial 为这个匹配的结束位置initial = max(initial, current_end)return res
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line1. 表格匹配 (protected_regex[4] + [5]):├─ start=20, end=80├─ text="| 列1 | 列2 |\n| :--- | ---: |\n| 数据1 | 数据2 |"└─ length=60 < chunk_size ✅ 保留2. 数学公式匹配 (protected_regex[0]):├─ start=100, end=120├─ text="$$f(x) = x^2$$"└─ length=14 < chunk_size ✅ 保留返回:[(20, "| 列1 | 列2 |\n| :--- | ---: |\n| 数据1 | 数据2 |"),(100, "$$f(x) = x^2$$")]
_join() 的目标是重新整理 chunk list,通过位置计算,确保受保护内容(如完整表格)不会被分割。如果原始 chunk 中的表格被断开了,它会合并这些断开的部分并插入完整的受保护内容。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef _join(self, splits: List[str], protect: List[Tuple[int, str]]) -> List[str]:"""关键目标:确保受保护内容保持完整问题场景:假设 _split() 返回:splits = ["普通文本 ABC。\n\n","| 列1 | 列2 |\n|", ← 表格被分割了!(不好)" :--- | ---: |\n| 数据1 | 数据2 |"]但 protect 告诉我们:protect = [(20, "| 列1 | 列2 |\n| :--- | ---: |\n| 数据1 | 数据2 |")]_join() 的作用:重新组织 splits,使得表格作为一个完整单元出现"""j = 0 # 受保护内容的索引point = 0 # 当前在原文中的位置start = 0 # 当前 split 的起始位置res = [] # 结果列表for split in splits:# 计算当前 split 在原文中的范围end = start + len(split)# 从 point 开始提取当前 split 的子串cur = split[point - start:]# 处理所有与当前 split 重叠的受保护内容while j < len(protect):p_start, p_content = protect[j]p_end = p_start + len(p_content)# 如果受保护内容在当前 split 之后,停止if end <= p_start:break# 添加受保护内容之前的部分if point < p_start:local_end = p_start - pointres.append(cur[:local_end])cur = cur[local_end:]point = p_start# 添加整个受保护内容(作为一个完整的单元)res.append(p_content)j += 1# 跳过原 split 中受保护内容的部分if point < p_end:local_start = p_end - pointcur = cur[local_start:]point = p_end# 如果当前 split 已处理完,跳出if not cur:break# 添加当前 split 的剩余部分if cur:res.append(cur)point = end# 移到下一个 splitstart = endreturn res
_join() 简述就是:
处理 chunk 的 overlap 部分和解析处理 chunk 对应的多级 header 信息,构建最终的 chunk 内容。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef _merge(self, splits: List[str]) -> List[Tuple[int, int, str]]:"""合并 splits 成为最终的 chunks处理 overlap 和 header 追踪"""chunks: List[Tuple[int, int, str]] = []cur_chunk: List[Tuple[int, int, str]] = [] # 当前 chunk 中的元素cur_headers = ""cur_len = 0cur_start, cur_end = 0, 0for split in splits:# 计算 split 在原文中的位置cur_end = cur_start + len(split)split_len = len(split)# 更新 header 信息self.header_hook.update(split)cur_headers = self.header_hook.get_headers()cur_headers_len = len(cur_headers)# 检查是否超过 chunk_sizeif cur_len + split_len + cur_headers_len > chunk_size:# 当前 chunk 已满,保存它if len(cur_chunk) > 0:chunks.append((cur_chunk[0][0], # 第一个元素的 startcur_chunk[-1][1], # 最后一个元素的 end"".join([c[2] for c in cur_chunk])))# 处理 overlap:从当前 chunk 的末尾向前选择while cur_chunk and (cur_len > chunk_overlapor cur_len + split_len + cur_headers_len > chunk_size):# 移除第一个元素first = cur_chunk.pop(0)cur_len -= len(first[2])# 添加 headers 到新 chunk(如果有)if (cur_headersand split_len + cur_headers_len < chunk_sizeand cur_headers not in split):next_start = cur_chunk[0][0] if cur_chunk else cur_startcur_chunk.insert(0, (0, 0, cur_headers))cur_len += cur_headers_len# 添加当前 split 到 chunkcur_chunk.append((cur_start, cur_end, split))cur_len += split_lencur_start = cur_end# 处理最后一个 chunkif cur_chunk:chunks.append((cur_chunk[0][0],cur_chunk[-1][1],"".join([c[2] for c in cur_chunk])))return chunks
这里重点说下解析处理 chunk 对应的多级 header 信息
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef update(self, split: str):"""解析 split 中的所有 Markdown 标题"""# 正则匹配:# 标题、## 子标题等for match in re.finditer(r"^(#+)\s+(.+)$", split, re.MULTILINE):level = len(match.group(1)) # # 的个数(1=一级,2=二级,...)title = match.group(2) # 标题文本# 更新当前的标题栈# 例:读到 ## 时,更新二级标题
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linesplit = "## 第二季度\n\nQ2 销售额..."匹配:level = 2 (因为是 ##)title = "第二季度"更新结果:headers = {1: "2024 销售报告", # 一级标题(前面设定的)2: "第二季度" # 二级标题(刚更新的)}
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef get_headers(self) -> str:"""返回当前的所有标题,按层级格式化"""# 返回的格式类似:# # 2024 销售报告# ## 第二季度
例子:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line当前标题栈:{1: "2024 销售报告",2: "第二季度"}get_headers() 返回:"# 2024 销售报告\n## 第二季度"
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linefor split in splits:cur_end = cur_start + len(split)split_len = self.len_function(split)# 【1】更新标题信息self.header_hook.update(split) # 解析当前 split 中的标题cur_headers = self.header_hook.get_headers() # 获取当前标题链cur_headers_len = self.len_function(cur_headers)# 【2】检查 headers 是否太大if cur_headers_len > self.chunk_size:logger.error(f"Got headers of size {cur_headers_len}, ...")cur_headers, cur_headers_len = "", 0 # 舍弃太大的 headers# 【3】检查是否需要创建新 chunkif cur_len + split_len + cur_headers_len > self.chunk_size:# ...保存当前 chunk...# ...处理 overlap...# 【4】如果空间足够,添加 headers 到新 chunkif (cur_headersand split_len + cur_headers_len < self.chunk_sizeand cur_headers not in split):# headers 不在 split 中才添加(避免重复)cur_chunk.insert(0, (header_start, header_end, cur_headers))cur_len += cur_headers_len# 【5】添加当前 splitcur_chunk.append((cur_start, cur_end, split))cur_len += split_lencur_start = cur_end
直接根据最大 chunk 数(默认 1000)截断,超过部分直接丢弃。防止内存溢出 (OOM),同时避免数据库性能问题。但这个策略同样存在信息完整性问题,可以考虑根据实际场景调整最大 chunk 数。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# Limit the number of returned chunksif len(chunks) > self.max_chunks:logger.warning(f"Limiting chunks from {len(chunks)} to maximum {self.max_chunks}")chunks = chunks[: self.max_chunks]
多模态图片处理流程:对每个 Chunk 中的图片进行提取、下载上传、OCR 文字识别和 VLM 图片描述生成。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineif self.enable_multimodal:# 支持的文件类型allowed_types = [# 文本文档".pdf", ".md", ".markdown", ".doc", ".docx",# 纯图片文件".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp",]if file_ext in allowed_types:chunks = self.process_chunks_images(chunks, document.images)
对单个 Chunk 中的图片处理步骤:
chunk.content 中提取 Markdown 图片语法 ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineasync def download_and_upload_image(self, img_url: str):"""处理三种情况:1. 已在存储中 (COS/MinIO) → 直接使用2. 本地文件 → 上传到存储3. 远程 URL → 下载后上传到存储"""
验证 URL 防止 SSRF 攻击:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line@staticmethoddef _is_safe_url(url: str) -> bool:"""拒绝的 URL 类型:1. 非 HTTP/HTTPS 协议2. 私有 IP (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16)3. Loopback IP (127.0.0.1, ::1)4. 云服务元数据端点 (169.254.169.254)5. 本地主机名 (.local, localhost)"""
| OCR | ||||
| VLM |
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineocr_config = {"use_gpu": False, # 禁用 GPU,使用 CPU"text_det_limit_side_len": 960, # 图片长边限制 960px"use_doc_orientation_classify": True, # 自动检测文档方向(0°/90°/180°/270°)"use_textline_orientation": True, # 文本行方向检测# 模型选择(v4 最新版本)"text_recognition_model_name": "PP-OCRv4_server_rec","text_detection_model_name": "PP-OCRv4_server_det",# 检测阈值"text_det_thresh": 0.3, # 检测候选框阈值"text_det_box_thresh": 0.6, # 文本框置信度阈值"text_det_unclip_ratio": 1.5, # 文本框扩大比例# 高精度模式"use_dilation": True, # 膨胀操作提高准确率"det_db_score_mode": "slow", # 慢速但准确的评分模式"lang": "ch", # 识别中文}
PaddleOCR 使用 AVX 指令集加速,老旧 CPU 可能不支持:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 检测 CPU 是否支持 AVXif platform.system() == "Linux":result = subprocess.run(["grep", "-o", "avx", "/proc/cpuinfo"],capture_output=True, text=True, timeout=5)has_avx = "avx" in result.stdout.lower()if not has_avx:# 降级到兼容模式os.environ["FLAGS_use_avx2"] = "0"os.environ["FLAGS_use_avx"] = "1"
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef _predict(self, image: Image.Image) -> str:# 1. 确保 RGB 格式if image.mode != "RGB":image = image.convert("RGB")# 2. 转换为 numpy 数组image_array = np.array(image)# 3. 执行 OCRocr_result = self.ocr.ocr(image_array, cls=False)# 返回格式:[[[坐标框], ("文字", 置信度)], ...]# 4. 提取文字text = [line[1][0] for line in ocr_result[0] if line and line[1]]return " ".join(text)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linechunk.images = [{# 原始信息"original_url": "https://example.com/chart.png","start": 40,"end": 90,"alt_text": "销售图表","match_text": "",# 存储信息"cos_url": "https://storage.local/abc123.png",# OCR 结果"ocr_text": "Q1销售额\n一月:100万\n二月:110万",# VLM 结果"caption": "这是一个柱状图,展示了Q1的月度销售数据..."}]
生成最终的 chunk 结构后,将对 chunk 进行向量化,以及根据配置进行相应的 RAG 增强。
清理旧的 chunks 和索引数据,避免重复数据,如果存在知识图谱数据也同样清除。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 删除旧的chunkserr := s.chunkService.DeleteChunksByKnowledgeID(ctx, knowledge.ID);// 删除旧的索引数据err := retrieveEngine.DeleteByKnowledgeIDList(ctx, []string{knowledge.ID}, embeddingModel.GetDimensions(), knowledge.Type);// 删除知识图谱数据(如果存在)err := s.graphEngine.DelGraph(ctx, []types.NameSpace{namespace});
包括 chunk 的文本内容和图片 OCR 信息,以及图片 Caption 信息。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linefor _, chunkData := range chunks {// 1. 创建主文本 ChunktextChunk := &types.Chunk{ID: uuid.New().String(),TenantID: knowledge.TenantID,KnowledgeID: knowledge.ID,KnowledgeBaseID: knowledge.KnowledgeBaseID,Content: chunkData.Content,ChunkIndex: int(chunkData.Seq),ChunkType: types.ChunkTypeText, // "text"// ...}insertChunks = append(insertChunks, textChunk)// 2. 处理图片信息if len(chunkData.Images) > 0 {for i, img := range chunkData.Images {// 2.1 创建 OCR Chunk(如果有 OCR 文本)if img.OcrText != "" {ocrChunk := &types.Chunk{ID: uuid.New().String(),Content: img.OcrText,ChunkType: types.ChunkTypeImageOCR, // "image_ocr"ParentChunkID: textChunk.ID, // 关联到父 ChunkImageInfo: string(imageInfoJSON),// ...}insertChunks = append(insertChunks, ocrChunk)}// 2.2 创建 Caption Chunk(如果有图片描述)if img.Caption != "" {captionChunk := &types.Chunk{ID: uuid.New().String(),Content: img.Caption,ChunkType: types.ChunkTypeImageCaption, // "image_caption"ParentChunkID: textChunk.ID,ImageInfo: string(imageInfoJSON),// ...}insertChunks = append(insertChunks, captionChunk)}}// 将图片信息保存到文本 ChunktextChunk.ImageInfo = string(imageInfoJSON)}}
为文本类型的 Chunk 设置前后关系,构建 chunk 链表。为了支持检索时的上下文扩展。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linefor i, chunk := range textChunks {if i > 0 {textChunks[i-1].NextChunkID = chunk.ID}if i < len(textChunks)-1 {textChunks[i+1].PreChunkID = chunk.ID}}
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineindexInfoList := make([]*types.IndexInfo, 0, len(insertChunks))for _, chunk := range insertChunks {indexInfoList = append(indexInfoList, &types.IndexInfo{Content: chunk.Content, // 用于生成 embeddingSourceID: chunk.ID,SourceType: types.ChunkSourceType,ChunkID: chunk.ID,KnowledgeID: knowledge.ID,KnowledgeBaseID: knowledge.KnowledgeBaseID,})}
简单估算本次插入向量的存储大小,检查是否超过配额。存储大小 ≈ 索引条目数 × (向量维度 × 4 + 元数据大小)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 估算存储大小totalStorageSize := retrieveEngine.EstimateStorageSize(ctx, embeddingModel, indexInfoList)// 检查配额if tenantInfo.StorageQuota > 0 {if tenantInfo.StorageUsed + totalStorageSize > tenantInfo.StorageQuota {knowledge.ParseStatus = types.ParseStatusFailedknowledge.ErrorMessage = "存储空间不足"s.repo.UpdateKnowledge(ctx, knowledge)return}}
先将 chunk 列表保存到数据库,然后批量向量化。
ounter(lineounter(lineounter(lineounter(lineounter(line// 保存 chunk 列表err := s.chunkService.CreateChunks(ctx, insertChunks);// 批量向量化err = retrieveEngine.BatchIndex(ctx, embeddingModel, indexInfoList)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineif kb.ExtractConfig != nil && kb.ExtractConfig.Enabled {for _, chunk := range textChunks {err := NewChunkExtractTask(ctx, s.task, chunk.TenantID, chunk.ID, kb.SummaryModelID)...}}
构建知识图谱提取 prompt 模板,用于 LLM 提取知识图谱中的实体和关系。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linetemplate := &types.PromptTemplateStructured{Description: s.template.Description,Tags: kb.ExtractConfig.Tags, // 实体类型定义Examples: []types.GraphData{{Text: kb.ExtractConfig.Text, // 示例文本Node: kb.ExtractConfig.Nodes, // 示例节点Relation: kb.ExtractConfig.Relations, // 示例关系},},}
调用 LLM 提取 chunk 中的实体和关系,根据模板生成结构化的知识图谱。在提取实体和关系时,若 LLM 提取出了 relation,但没有对应的 node,会触发自动补充节点功能,将缺失的节点添加到知识图谱中。
ounter(lineounter(lineextractor := chatpipline.NewExtractor(chatModel, template)graph, err := extractor.Extract(ctx, chunk.Content)
将提取到的知识图谱关联到对应的 Chunk 中,存储在数据库中。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 为每个节点关联来源 Chunkfor _, node := range graph.Node {node.Chunks = []string{chunk.ID}}// 写入图数据库err = s.graphEngine.AddGraph(ctx,types.NameSpace{KnowledgeBase: chunk.KnowledgeBaseID,Knowledge: chunk.KnowledgeID,},[]*types.GraphData{graph},)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineif options.EnableQuestionGeneration && len(textChunks) > 0 {questionCount := options.QuestionCountif questionCount <= 0 {questionCount = 3}if questionCount > 10 {questionCount = 10}s.enqueueQuestionGenerationTask(ctx, knowledge.KnowledgeBaseID, knowledge.ID, questionCount)}
先对文本 chunk 按照时间排序,在通过当前文本 chunk 的链表结构结合前后 chunk 内容构建问题生成的相关文本内容。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 按 StartAt 排序(用于获取上下文)sort.Slice(textChunks, func(i, j int) bool {return textChunks[i].StartAt < textChunks[j].StartAt})// 获取当前文本 chunk 的前后 chunk 内容,用于构建问题生成的相关文本for i, chunk := range textChunks {// 获取前后 chunk 作为上下文var prevContent, nextContent stringif i > 0 {prevContent = textChunks[i-1].Contentif len(prevContent) > 500 {prevContent = prevContent[len(prevContent)-500:] // 取后500字符}}if i < len(textChunks)-1 {nextContent = textChunks[i+1].Contentif len(nextContent) > 500 {nextContent = nextContent[:500] // 取前500字符}}...}
通过 LLM 结合上下文(前 chunk 内容、后 chunk 内容、知识标题)生成与当前 chunk 相关的问题,并保存至 metadata。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 4. 调用 LLM 生成问题questions, _ := s.generateQuestionsWithContext(ctx, chatModel,chunk.Content, prevContent, nextContent, knowledge.Title, questionCount)// 5. 保存问题到 chunk 的 metadatageneratedQuestions := make([]types.GeneratedQuestion, len(questions))for j, question := range questions {questionID := fmt.Sprintf("q%d", time.Now().UnixNano()+int64(j))generatedQuestions[j] = types.GeneratedQuestion{ID: questionID,Question: question,}}
将生成的问题向量化,用于检索。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 构建问题索引for _, gq := range generatedQuestions {sourceID := fmt.Sprintf("%s-%s", chunk.ID, gq.ID)indexInfoList = append(indexInfoList, &types.IndexInfo{Content: gq.Question,SourceID: sourceID,ChunkID: chunk.ID, // 指向原始 chunkKnowledgeID: knowledge.ID,KnowledgeBaseID: knowledge.KnowledgeBaseID,})}// 向量化retrieveEngine.BatchIndex(ctx, embeddingModel, indexInfoList)
若存在文本 chunk,则会触发摘要生成任务。
ounter(lineounter(lineounter(lineif len(textChunks) > 0 {s.enqueueSummaryGenerationTask(ctx, knowledge.KnowledgeBaseID, knowledge.ID)}
对文本 chunk 按照 ChunkIndex 排序,确保摘要内容按文档顺序拼接。
ounter(lineounter(lineounter(lineounter(line// 按 ChunkIndex 排序sort.Slice(textChunks, func(i, j int) bool {return textChunks[i].ChunkIndex < textChunks[j].ChunkIndex})
对排序后的 chunk 内容进行拼接,限制总长度为 4096 字符,作为摘要的原始内容。同时处理原始内容中的图片描述和 OCR 文字。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 拼接 chunk 内容(按 StartAt 排序,限制 4096 字符)chunkContents := ""for _, chunk := range sortedChunks {if chunk.EndAt > 4096 {break // 限制内容长度}chunkContents = string([]rune(chunkContents)[:chunk.StartAt]) + chunk.Content// 收集图片信息if chunk.ImageInfo != "" {var images []*types.ImageInfojson.Unmarshal([]byte(chunk.ImageInfo), &images)allImageInfos = append(allImageInfos, images...)}}// 移除 Markdown 图片语法re := regexp.MustCompile(`!\[[^\]]*\]\([^)]+\)`)chunkContents = re.ReplaceAllString(chunkContents, "")// 添加图片描述和 OCR 文字if len(allImageInfos) > 0 {var imageAnnotations stringfor _, img := range allImageInfos {if img.Caption != "" {imageAnnotations += fmt.Sprintf("\n[图片描述: %s]", img.Caption)}if img.OCRText != "" {imageAnnotations += fmt.Sprintf("\n[图片文字: %s]", img.OCRText)}}chunkContents = chunkContents + imageAnnotations}// 内容太短直接返回if len(chunkContents) < 300 {return chunkContents, nil}// 添加文档元数据metadataIntro := fmt.Sprintf("文档类型: %s\n文件名称: %s\n", knowledge.FileType, knowledge.FileName)contentWithMetadata = metadataIntro + "\n内容:\n" + chunkContents
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linesummary, err := summaryModel.Chat(ctx, []chat.Message{{Role: "system", Content: s.config.Conversation.GenerateSummaryPrompt},{Role: "user", Content: contentWithMetadata},}, &chat.ChatOptions{Temperature: 0.3,MaxTokens: 1024,Thinking: &thinking,})
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 保存摘要 chunksummaryChunk := &types.Chunk{ID: uuid.New().String(),TenantID: knowledge.TenantID,KnowledgeID: knowledge.ID,KnowledgeBaseID: knowledge.KnowledgeBaseID,Content: fmt.Sprintf("# 文档名称\n%s\n\n# 摘要\n%s", knowledge.FileName, summary),ChunkIndex: maxChunkIndex + 1,IsEnabled: true,ChunkType: types.ChunkTypeSummary,ParentChunkID: textChunks[0].ID,}s.chunkService.CreateChunks(ctx, []*types.Chunk{summaryChunk})// 向量化摘要 chunkindexInfo := []*types.IndexInfo{{Content: summaryChunk.Content,SourceID: summaryChunk.ID,SourceType: types.ChunkSourceType,ChunkID: summaryChunk.ID,KnowledgeID: knowledge.ID,KnowledgeBaseID: knowledge.KnowledgeBaseID,}}retrieveEngine.BatchIndex(ctx, embeddingModel, indexInfo)
本文从工程视角系统梳理了 WeKnora 在知识构建阶段的整体设计:
从多种知识上传方式开始,经过异步解析与降级策略保障;再到以 Markdown 为核心中间表示的统一解析架构;随后通过具备语义感知、结构保护与标题追踪能力的 TextSplitter 生成高质量 Chunk;并进一步叠加图片 OCR、VLM 描述、多模态向量、知识图谱抽取、问题生成与摘要生成,最终形成一套可检索、可追溯、可扩展的知识底座。
可以看到,WeKnora 并没有将 RAG 简化为“切 chunk + 向量化”这样的轻量流程,而是非常认真地对待文档结构、上下文完整性和信息损失控制这些在生产环境中至关重要的问题。这也是它与许多示例型 RAG 项目最本质的区别。
在下一篇文章中,我们将把视角从「知识如何被构建」转向「知识如何被召回」,重点分析 WeKnora 的检索体系设计:
它是如何结合 稀疏索引(关键词/倒排)、向量索引(语义检索) 与 知识图谱检索 的?
在召回中是如何使用文档摘要, chunk 相关提问,图片 ocr 文本以及格式化表格等信息的?
不同召回通道在什么场景下生效,又是如何进行融合、裁剪与排序的?
这将直接决定 WeKnora 在真实问答场景中的“命中率”和“可解释性”,也是整套 RAG 系统中承上启下的关键一环。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |