链载Ai

标题: 高级 RAG 检索策略之流程与模块化 [打印本页]

作者: 链载Ai    时间: 2 小时前
标题: 高级 RAG 检索策略之流程与模块化


我们介绍了很多关于高级 RAG(Retrieval Augmented Generation)的检索策略,每一种策略就像是机器中的零部件,我们可以通过对这些零部件进行不同的组合,来实现不同的 RAG 功能,从而满足不同的需求。今天我们就来介绍高级 RAG 检索中一些常见的 RAG 模块,以及如何通过流程的方式来组合这些模块,实现高级 RAG 检索功能。

RAG 模块化

模块化 RAG 提出了一种高度可扩展的范例,将 RAG 系统分为模块类型、模块和操作符的三层结构。每个模块类型代表 RAG 系统中的一个核心流程,包含多个功能模块。每个功能模块又包含多个特定的操作符。整个 RAG 系统变成了多个模块和相应操作符的排列组合,形成了我们所说的 RAG 流程。在流程中,每种模块类型可以选择不同的功能模块,并且在每个功能模块中可以选择一个或多个操作符。

RAG 流程

RAG 流程是指在 RAG 系统中,从输入查询到输出生成文本的整个工作流程。这个流程通常涉及多个模块和操作符的协同工作,包括但不限于检索器、生成器以及可能的预处理和后处理模块。RAG 流程的设计旨在使得 LLM(大语言模型)能够在生成文本时利用外部知识库或文档集,从而提高回答的准确性和相关性。

RAG 推理阶段的流程一般分为以下几种模式:

下图是 Loop 模式的 RAG 流程图:

后面我们主要以 Sequential 模式为例,介绍如何通过模块化和流水线的方式来实现高级 RAG 检索功能。

代码示例

LlamaIndex[1]的查询流水线(Query Pipeline)功能提供了一种模块化的方式来组合 RAG 检索策略。我们可以通过定义不同的模块,然后将这些模块按照一定的顺序组合起来,形成一个完整的查询流水线。下面我们通过一个从简单到复杂的示例来演示如何使用 LlamaIndex 的查询流水线功能实现高级 RAG 检索。

普通 RAG

首先我们定义一个普通 RAG 的流水线,这个流水线包含了 3 个模块,分别是:输入、检索和输出。其中输入模块用于接收用户输入的查询,检索模块用于从知识库中检索相关文档,输出模块用于根据检索结果生成回答。

在定义查询流水线之前,我们先将我们的测试文档索引入库,这里的测试文档还是用维基百科上的复仇者联盟[2]电影剧情,示例代码如下:

import os
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import (
Settings,
SimpleDirectoryReader,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)
from llama_index.core.node_parser import SentenceSplitter

documents = SimpleDirectoryReader("./data").load_data()
node_parser = SentenceSplitter()
llm = OpenAI(model="gpt-3.5-turbo")
embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.llm = llm
Settings.embed_model = embed_model
Settings.node_parser = node_parser

if not os.path.exists("storage"):
index = VectorStoreIndex.from_documents(documents)
index.set_index_id("avengers")
index.storage_context.persist("./storage")
else:
store_context = StorageContext.from_defaults(persist_dir="./storage")
index = load_index_from_storage(
storage_context=store_context, index_id="avengers"
)

接下来我们定义一个普通的 RAG 流水线,示例代码如下:

from llama_index.core.query_pipeline import QueryPipeline, InputComponent
from llama_index.core.response_synthesizers.simple_summarize import SimpleSummarize

retriever =index.as_retriever()
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"retriever": retriever,
"output": SimpleSummarize(),
}
)

p.add_link("input", "retriever")
p.add_link("input", "output", dest_key="query_str")
p.add_link("retriever", "output", dest_key="nodes")

查询流水线添加模块和连接关系的方式除了add_modulesadd_link方法外,还可以通过add_chain方法添加,示例代码如下:

p = QueryPipeline(verbose=True)
p.add_chain([InputComponent(), retriever])

这种方式可以一次性添加模块与连接关系,但这种方式只能添加单参数的模块,如果模块有多个参数则需要使用add_modulesadd_link方法。

接下来我们再来运行查询流水线,示例代码如下:

question = "Which two members of the Avengers created Ultron?"
output = p.run(input=question)
print(str(output))

# 结果显示
> Running module input with input:
input: Which two members of the Avengers created Ultron?

> Running module retriever with input:
input: Which two members of the Avengers created Ultron?

> Running module output with input:
query_str: Which two members of the Avengers created Ultron?
nodes: [NodeWithScore(node=TextNode(id_='53d32f3a-a2d5-47b1-aa8f-a9679e83e0b0', embedding=None, metadata={'file_path': '/data/Avengers:Age-of-Ul...

Bruce Banner and Tony Stark.

增加 reranker 模块

接下来我们在普通 RAG 的基础上增加一个 reranker 模块,用于对检索结果进行重新排序。

+from llama_index.postprocessor.cohere_rerank import CohereRerank

+reranker = CohereRerank()
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"retriever": retriever,
+"reranker": reranker,
"output": SimpleSummarize(),
}
)

p.add_link("input", "retriever")
+p.add_link("input", "reranker", dest_key="query_str")
+p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
-p.add_link("retriever", "output", dest_key="nodes")
+p.add_link("reranker", "output", dest_key="nodes")

查询流水线的运行方法除了run方法外,还有run_with_intermeation方法,这个方法可以获取流水线的中间结果,我们将retrieverrerank模块的中间结果打印出来进行对比,示例代码如下:

output, intermediates = p.run_with_intermediates(input=question)
retriever_output = intermediates["retriever"].outputs["output"]
print(f"retriever output:")
for node in retriever_output:
print(f"node id: {node.node_id}, node score: {node.score}")
reranker_output = intermediates["reranker"].outputs["nodes"]
print(f"\nreranker output:")
for node in reranker_output:
print(f"node id: {node.node_id}, node score: {node.score}")

# 显示结果
retriever output:
node id: 53d32f3a-a2d5-47b1-aa8f-a9679e83e0b0, node score: 0.6608391314791646
node id: dea3844b-789f-46de-a415-df1ef14dda18, node score: 0.5313643379538727

reranker output:
node id: 53d32f3a-a2d5-47b1-aa8f-a9679e83e0b0, node score: 0.9588471
node id: dea3844b-789f-46de-a415-df1ef14dda18, node score: 0.5837967

增加 query rewrite 模块

之前我们在查询流水线中加入了 reranker 模块,相当是对检索结果的后处理操作,现在我们再加入一个 query rewrite 模块,用于对查询问题进行预处理操作。

+query_rewriter = HydeComponent()
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
+"query_rewriter": query_rewriter,
"retriever": retriever,
"reranker": reranker,
"output": SimpleSummarize(),
}
)

-p.add_link("input", "retriever")
+p.add_link("input", "query_rewriter")
+p.add_link("query_rewriter", "retriever")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")

LlamaIndex 的查询流水线提供了自定义组件的功能,我们可以通过继承CustomQueryComponent类来实现自定义组件,下面我们来实现HydeComponent类,示例代码如下:

from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any
from llama_index.core.indices.query.query_transform import HyDEQueryTransform

class HydeComponent(CustomQueryComponent):
"""HyDE query rewrite component."""

def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""Validate component inputs during run_component."""
assert "input" in input, "input is required"
return input

@property
def _input_keys(self) -> set:
"""Input keys dict."""
return {"input"}

@property
def _output_keys(self) -> set:
return {"output"}

def _run_component(self, **kwargs) -> Dict[str, Any]:
"""Run the component."""
hyde = HyDEQueryTransform(include_original=True)
query_bundle = hyde(kwargs["input"])
return {"output": query_bundle.embedding_strs[0]}

关于查询重写的更多策略,可以参考我之前的这篇文章。

替换 output 模块

在之前的查询流水线中,我们使用的是简单的总结输出组件,现在我们将其替换为树形总结组件,用来提高最终的输出结果。

树形总结组件以自底向上的方式递归地合并文本块并对其进行总结(即从叶子到根构建一棵树)。 具体地说,在每个递归步骤中:

  1. 我们重新打包文本块,使得每个块填充大语言模型的上下文窗口

  2. 如果只有一个块,我们给出最终响应

  3. 否则,我们总结每个块,并递归地总结这些摘要

+from llama_index.core.response_synthesizers.tree_summarize import TreeSummarize

p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"query_rewriter": query_rewriter,
"retriever": retriever,
"reranker": reranker,
-"output": SimpleSummarize(),
+"output": TreeSummarize(),
}
)

查询流水线实际上是一个 DAG(有向无环图),每个模块是图中的一个节点,模块之间的连接关系是图中的边,我们可以通过代码来展示这个图形结构,示例代码如下:

from pyvis.network import Network

net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(p.clean_dag)
net.write_html("output/pipeline_dag.html")

保存后的查询流水线图形结构如下:

使用句子窗口检索

在之前的查询流水线中,retriever模块使用的是普通的检索策略,现在我们将其替换为句子窗口检索策略,用于提高检索的准确性。

句子窗口检索的原理:首先在文档切分时,将文档以句子为单位进行切分,同时进行 Embedding 并保存数据库。然后在检索时,通过问题检索到相关的句子,但并不只是将检索到的句子作为检索结果,而是将该句子前面和后面的句子一起作为检索结果,包含的句子数量可以通过参数来进行设置,最后将检索结果再一起提交给 LLM 来生成答案。

+from llama_index.core.node_parser import SentenceWindowNodeParser

-node_parser = SentenceSplitter()
+node_parser = SentenceWindowNodeParser.from_defaults(
+window_size=3,
+window_metadata_key="window",
+original_text_metadata_key="original_text",
+)

+meta_replacer = MetadataReplacementPostProcessor(target_metadata_key="window")
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"query_rewriter": query_rewriter,
"retriever": retriever,
+"meta_replacer": meta_replacer,
"reranker": reranker,
"output": TreeSummarize(),
}
)
p.add_link("input", "query_rewriter")
p.add_link("query_rewriter", "retriever")
+p.add_link("retriever", "meta_replacer")
p.add_link("input", "reranker", dest_key="query_str")
-p.add_link("retriever", "reranker", dest_key="nodes")
+p.add_link("meta_replacer", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")

我们可以打印出retriever模块和meta_replacer模块的中间结果,来对比检索结果的变化,示例代码如下:

output, intermediates = p.run_with_intermediates(input=question)
retriever_output = intermediates["retriever"].outputs["output"]
print(f"retriever output:")
for node in retriever_output:
print(f"node: {node.text}\n")
meta_replacer_output = intermediates["meta_replacer"].outputs["nodes"]
print(f"meta_replacer output:")
for node in meta_replacer_output:
print(f"node: {node.text}\n")

# 显示结果
retriever output:
node: In the Eastern European country of Sokovia, the Avengers—Tony Stark, Thor, Bruce Banner, Steve Rogers, Natasha Romanoff, and Clint Barton—raid a Hydra facility commanded by Baron Wolfgang von Strucker, who has experimented on humans using the scepter previously wielded by Loki.

node: They meet two of Strucker's test subjects—twins Pietro (who has superhuman speed) and Wanda Maximoff (who has telepathic and telekinetic abilities)—and apprehend Strucker, while Stark retrieves Loki's scepter.

meta_replacer output:
node: and attacks the Avengers at their headquarters.Escaping with the scepter, Ultron uses the resources in Strucker's Sokovia base to upgrade his rudimentary body and build an army of robot drones.Having killed Strucker, he recruits the Maximoffs, who hold Stark responsible for their parents' deaths by his company's weapons, and goes to the base of arms dealer Ulysses Klaue in Johannesburg to get vibranium.The Avengers attack Ultron and the Maximoffs, but Wanda subdues them with haunting visions, causing Banner to turn into the Hulk and rampage until Stark stops him with his anti-Hulk armor. [a]
A worldwide backlash over the resulting destruction, and the fears Wanda's hallucinations incited, send the team into hiding at Barton's farmhouse.Thor departs to consult with Dr.Erik Selvig on the apocalyptic future he saw in his hallucination, while Nick Fury arrives and encourages the team to form a plan to stop Ultron.

node: In the Eastern European country of Sokovia, the Avengers—Tony Stark, Thor, Bruce Banner, Steve Rogers, Natasha Romanoff, and Clint Barton—raid a Hydra facility commanded by Baron Wolfgang von Strucker, who has experimented on humans using the scepter previously wielded by Loki.They meet two of Strucker's test subjects—twins Pietro (who has superhuman speed) and Wanda Maximoff (who has telepathic and telekinetic abilities)—and apprehend Strucker, while Stark retrieves Loki's scepter.
Stark and Banner discover an artificial intelligence within the scepter's gem, and secretly decide to use it to complete Stark's "Ultron" global defense program.The unexpectedly sentient Ultron, believing he must eradicate humanity to save Earth, eliminates Stark's A.I.

从结果中我们可以看出,原来的retreiver模块输出的只是简单的一句话,而meta_replacer模块输出的是多个句子,包含了检索节点的前后节点的文本,这样可以让 LLM 生成更准确的答案。

关于句子窗口检索的更多细节,可以参考我之前的这篇文章。

增加评估模块

最后我们再为查询流水线增加一个评估模块,用于评估查询流水线,这里我们使用Ragas[6]来实现评估模块。

Ragas 是一个评估 RAG 应用的框架,拥有很多且详细的评估指标。

+evaluator = RagasComponent()
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"query_rewriter": query_rewriter,
"retriever": retriever,
"meta_replacer": meta_replacer,
"reranker": reranker,
"output": TreeSummarize(),
+"evaluator": evaluator,
}
)
-p.add_link("input", "query_rewriter")
+p.add_link("input", "query_rewriter", src_key="input")
p.add_link("query_rewriter", "retriever")
p.add_link("retriever", "meta_replacer")
-p.add_link("input", "reranker", dest_key="query_str")
+p.add_link("input", "reranker", src_key="input", dest_key="query_str")
p.add_link("meta_replacer", "reranker", dest_key="nodes")
-p.add_link("input", "output", dest_key="query_str")
+p.add_link("input", "output", src_key="input", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
+p.add_link("input", "evaluator", src_key="input", dest_key="question")
+p.add_link("input", "evaluator", src_key="ground_truth", dest_key="ground_truth")
+p.add_link("reranker", "evaluator", dest_key="nodes")
+p.add_link("output", "evaluator", dest_key="answer")

我们再来看下RagasComponent的实现,示例代码如下:

from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall
from ragas import evaluate
from datasets import Dataset
from llama_index.core.query_pipeline import CustomQueryComponent
from typing import Dict, Any

metrics = [faithfulness, answer_relevancy, context_precision, context_recall]

class RagasComponent(CustomQueryComponent):
"""Ragas evalution component."""

def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
"""Validate component inputs during run_component."""
return input

@property
def _input_keys(self) -> set:
"""Input keys dict."""
return {"question", "nodes", "answer", "ground_truth", }

@property
def _output_keys(self) -> set:
return {"answer", "source_nodes", "evaluation"}

def _run_component(self, **kwargs) -> Dict[str, Any]:
"""Run the component."""
question, ground_truth, nodes, answer = kwargs.values()
data = {
"question": [question],
"contexts": [[n.get_content() for n in nodes]],
"answer": [str(answer)],
"ground_truth": [ground_truth],
}
dataset = Dataset.from_dict(data)
evalution = evaluate(dataset, metrics)
return {"answer": str(answer), "source_nodes": nodes, "evaluation": evalution}

最后我们来运行下查询流水线,示例代码如下:

question = "Which two members of the Avengers created Ultron?"
ground_truth = "Tony Stark (Iron Man) and Bruce Banner (The Hulk)."
output = p.run(input=question, ground_truth=ground_truth)
print(f"answer: {output['answer']}")
print(f"evaluation: {output['evaluation']}")

# 显示结果
answer: Tony Stark and Bruce Banner
evaluation: {'faithfulness': 1.0000, 'answer_relevancy': 0.8793, 'context_precision': 1.0000, 'context_recall': 1.0000}

总结

通过上面的示例,我们可以看到如何通过模块化和流程的方式来实现高级 RAG 检索功能,我们可以根据具体的需求,自定义不同的模块,然后将这些模块按照一定的顺序组合起来,形成一个完整的查询流水线。在 RAG 应用中,我们还可以定义多个查询流水线,用于不同的场景,比如问答、对话、推荐等,这样可以更好地满足不同的需求。






欢迎光临 链载Ai (https://www.lianzai.com/) Powered by Discuz! X3.5