返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

基于Agent的金融问答系统:RAG检索模块初建成

[复制链接]
链载Ai 显示全部楼层 发表于 半小时前 |阅读模式 打印 上一主题 下一主题

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15.4px;font-weight: bold;display: table;margin-right: auto;margin-bottom: 2em;margin-left: auto;padding-right: 0.2em;padding-left: 0.2em;background: rgb(15, 76, 129);color: rgb(255, 255, 255);">前言

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;margin: 1.5em 8px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">我们在上一章《【项目实战】基于Agent的金融问答系统之项目简介》中简单介绍了项目背景以及数据集情况,本章将介绍RAG模块的实现。

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15.4px;font-weight: bold;display: table;margin: 4em auto 2em;padding-right: 0.2em;padding-left: 0.2em;background: rgb(15, 76, 129);color: rgb(255, 255, 255);">功能列表

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;margin: 1.5em 8px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">参考之前所学内容《大模型之初识RAG》,我们需要实现如下功能:

    ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style: circle;color: rgb(63, 63, 63);" class="list-paddingleft-1">
  • •向量库的基础功能

    • ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style-position: initial;list-style-image: initial;" class="list-paddingleft-1">
    • •连接向量库

    • •数据入库

  • •文件导入

    • ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style-position: initial;list-style-image: initial;" class="list-paddingleft-1">
    • •PDF文件的读取

    • •PDF文件的切分

    • •调用向量库接口入库

  • •文件检索

    • ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style-position: initial;list-style-image: initial;" class="list-paddingleft-1">
    • •连接向量库

    • •检索器检索文件

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15.4px;font-weight: bold;display: table;margin: 4em auto 2em;padding-right: 0.2em;padding-left: 0.2em;background: rgb(15, 76, 129);color: rgb(255, 255, 255);">开发过程

ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;font-weight: bold;margin-top: 2em;margin-right: 8px;margin-bottom: 0.75em;padding-left: 8px;border-left: 3px solid rgb(15, 76, 129);color: rgb(63, 63, 63);">1、规划工程文件

项目开始之后,我们如果能够抑制住直接撸代码的冲动,改为提前做好规划,这会是一个好的习惯。 为此,我提前做了如下规划:

代码管理

  • •代码使用Git进行管理,这样后期多人协作时方便代码更新、代码Merge、冲突解决等。

  • •由于国内访问Github优势会ban,所以我们将代码仓库放在Gitee上。

  • •为代码仓库起了一个响亮的名称后,仓库地址定为https://gitee.com/deadwalk/smart-finance-bot

项目目录

考虑这个项目会涉及到前端、后端、数据集、模型等,所以项目目录规划如下:

smart-finance-bot\
|-dataset\#该目录用于保存PDF以及SQLite数据
|-doc\#该目录用于保存文档类文件,例如:需求文档、说明文档、数据文档
|-app\#该目录用于服务端代码
|-agent\#该目录用于保存agent相关代码
|-rag\#该目录用于保存rag相关代码
|-test\#该目录用于保存测试类驱动相关代码
|-conf\#该目录用于保存配置文件
|-.qwen#该文件保存QWen的配置文件(请自行添加对应的APIKEY)
|-.ernie#该文件保存百度千帆的配置文件(请自行添加对应的APIKEY)
|-chatweb\#该目录用于保存前端页面代码
|-scripts\#该目录用于保存脚本文件,例如:启动脚本、导入向量数据库脚本等
|-test_result\#该目录用于保存测试结果
|-docker\
|-backend\#该目录对应后端python服务的Dockerfile
|-frontend\#该目录对应前端python服务的Dockerfile

上述目录中,dataset是直接使用git的submodul命令,直接将天池大赛提供的数据集引入到本项目中,方便后续使用。

引入方法:

gitsubmoduleaddhttps://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.gitdataset

命名规范

项目如果能够约束统一的命名规范,这对于后续代码的可读性、可维护性会提供需要便利,在此我沿用了约定俗成的代码命名规范:

  • •类名:使用大驼峰命名法,例如:MyClass

  • •函数名:使用小驼峰命名法,例如:my_function

  • •变量名:使用小驼峰命名法,例如:my_variable

  • •文件夹:使用小驼峰命名法。

整体命名时,要尽量见文知意。

2、实现基本的连接大模型的util库

代码文件及目录:app/utils/util.py


fromdotenvimportload_dotenv
importos

#获取当前文件的目录
current_dir=os.path.dirname(__file__)

#构建到conf/.qwen的相对路径
conf_file_path_qwen=os.path.join(current_dir,'..','conf','.qwen')

#加载千问环境变量
load_dotenv(dotenv_path=conf_file_path_qwen)

defget_qwen_models():
"""
加载千问系列大模型
"""
#llm大模型
fromlangchain_community.llms.tongyiimportTongyi

llm=Tongyi(model="qwen-max",temperature=0.1,top_p=0.7,max_tokens=1024)

#chat大模型
fromlangchain_community.chat_modelsimportChatTongyi

chat=ChatTongyi(model="qwen-max",temperature=0.01,top_p=0.2,max_tokens=1024)
#embedding大模型
fromlangchain_community.embeddingsimportDashScopeEmbeddings

embed=DashScopeEmbeddings(model="text-embedding-v3")

returnllm,chat,embed

在app/conf/.qwen中,添加对应的API KEY,例如:

DASHSCOPE_API_KEY=sk-xxxxxx

3、实现向量库基础功能

向量库文件考虑使用Chroma实现,所以我们先实现一个向量库的类,用于完成基本的向量库连接、数据入库操作。

代码文件及目录:app/rag/chroma_conn.py

importchromadb
fromchromadbimportSettings
fromlangchain_chromaimportChroma


classChromaDB:
def__init__(self,
chroma_server_type="local",#服务器类型:http是http方式连接方式,local是本地读取文件方式
host="localhost",port=8000,#服务器地址,http方式必须指定
persist_path="chroma_db",#数据库的路径:如果是本地连接,需要指定数据库的路径
collection_name="langchain",#数据库的集合名称
embed=None#数据库的向量化函数
):

self.host=host
self.port=port
self.path=persist_path
self.embed=embed
self.store=None

#如果是http协议方式连接数据库
ifchroma_server_type=="http":
client=chromadb.HttpClient(host=host,port=port)

self.store=Chroma(collection_name=collection_name,
embedding_function=embed,
client=client)

ifchroma_server_type=="local":
self.store=Chroma(collection_name=collection_name,
embedding_function=embed,
persist_directory=persist_path)

ifself.storeisNone:
raiseValueError("Chromastoreinitfailed!")

defadd_with_langchain(self,docs):
"""
将文档添加到数据库
"""
self.store.add_documents(documents=docs)

defget_store(self):
"""
获得向量数据库的对象实例
"""
returnself.store

在实际项目实践过程中,我们发现导入Chroma数据时使用本地化连接方式更快一些,所以对连接方式做了两个参数的扩展,local 代表本地连接,http 代表远程连接。

4、实现入库功能

本着先跑通流程,再优化交互过程的思路,对于PDF文件入向量库的过程,我们先通过一段脚本实现(暂不做前端UI的交互)。

代码文件及目录:app/rag/pdf_processor.py

importos
importlogging
importtime
fromtqdmimporttqdm
fromlangchain_community.document_loadersimportPyMuPDFLoader
fromlangchain_text_splittersimportRecursiveCharacterTextSplitter
fromrag.chroma_connimportChromaDB


classPDFProcessor:
def__init__(self,
directory,#PDF文件所在目录
chroma_server_type,#ChromaDB服务器类型
persist_path,#ChromaDB持久化路径
embed):#向量化函数

self.directory=directory
self.file_group_num=80#每组处理的文件数
self.batch_num=6#每次插入的批次数量

self.chunksize=500#切分文本的大小
self.overlap=100#切分文本的重叠大小

self.chroma_db=ChromaDB(chroma_server_type=chroma_server_type,
persist_path=persist_path,
embed=embed)
#配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s-%(levelname)s-%(message)s',
datefmt='%Y-%m-%d%H:%M:%S'
)

defload_pdf_files(self):
"""
加载目录下的所有PDF文件
"""
pdf_files=[]
forfileinos.listdir(self.directory):
iffile.lower().endswith('.pdf'):
pdf_files.append(os.path.join(self.directory,file))

logging.info(f"Found{len(pdf_files)}PDFfiles.")
returnpdf_files

defload_pdf_content(self,pdf_path):
"""
读取PDF文件内容
"""
pdf_loader=PyMuPDFLoader(file_path=pdf_path)
docs=pdf_loader.load()
logging.info(f"Loadingcontentfrom{pdf_path}.")
returndocs

defsplit_text(self,documents):
"""
将文本切分成小段
"""
#切分文档
text_splitter=RecursiveCharacterTextSplitter(
chunk_size=self.chunksize,
chunk_overlap=self.overlap,
length_function=len,
add_start_index=True,
)

docs=text_splitter.split_documents(documents)

logging.info("SplittextintosmallerchunkswithRecursiveCharacterTextSplitter.")
returndocs

definsert_docs_chromadb(self,docs,batch_size=6):
"""
将文档插入到ChromaDB
"""
#分批入库
logging.info(f"Inserting{len(docs)}documentsintoChromaDB.")

#记录开始时间
start_time=time.time()
total_docs_inserted=0

#计算总批次
total_batches=(len(docs)+batch_size-1)//batch_size

withtqdm(total=total_batches,desc="Insertingbatches",unit="batch")aspbar:
foriinrange(0,len(docs),batch_size):
#获取当前批次的样本
batch=docs[i:i+batch_size]

#将样本入库
self.chroma_db.add_with_langchain(batch)
#self.chroma_db.async_add_with_langchain(batch)

#更新已插入的文档数量
total_docs_inserted+=len(batch)

#计算并显示当前的TPM
elapsed_time=time.time()-start_time#计算已用时间(秒)
ifelapsed_time>0:#防止除以零
tpm=(total_docs_inserted/elapsed_time)*60#转换为每分钟插入的文档数
pbar.set_postfix({"TPM":f"{tpm:.2f}"})#更新进度条的后缀信息

#更新进度条
pbar.update(1)

defprocess_pdfs_group(self,pdf_files_group):
#读取PDF文件内容
pdf_contents=[]

forpdf_pathinpdf_files_group:
#读取PDF文件内容
documents=self.load_pdf_content(pdf_path)

#将documents逐一添加到pdf_contents
pdf_contents.extend(documents)

#将文本切分成小段
docs=self.split_text(pdf_contents)

#将文档插入到ChromaDB
self.insert_docs_chromadb(docs,self.batch_num)

defprocess_pdfs(self):
#获取目录下所有的PDF文件
pdf_files=self.load_pdf_files()

group_num=self.file_group_num

#group_num个PDF文件为一组,分批处理
foriinrange(0,len(pdf_files),group_num):
pdf_files_group=pdf_files[i:i+group_num]
self.process_pdfs_group(pdf_files_group)

print("PDFsprocessedsuccessfully!")

5、测试导入功能

因为Python的导入库的原因(一般都是从工作目录查找),所以我们在项目根目录下创建test_framework.py,方便后续统一测试工作。

smart-finance-bot\
|-app\#该目录用于服务端代码
|-rag\#该目录用于保存rag相关代码
|-pdf_processor.py
|-chroma_conn.py
|-test_framework.py

代码文件:app/test_framework.py

#测试导入PDF到向量库主流程
deftest_import():
fromrag.pdf_processorimportPDFProcessor
fromutils.utilimportget_qwen_models

llm,chat,embed=get_qwen_models()
#embed=get_huggingface_embeddings()

directory="./app/dataset/pdf"
persist_path="chroma_db"
server_type="local"

#创建PDFProcessor实例
pdf_processor=PDFProcessor(directory=directory,
chroma_server_type=server_type,
persist_path=persist_path,
embed=embed)

#处理PDF文件
pdf_processor.process_pdfs()

if__name__=="__main__":
test_import()

1、通过命令行启动ChromaDB服务端:

chromarun--pathchroma_db--port8000--hostlocalhost

2、运行test_framework.py

运行效果:

备注:一般测试框架会使用Pytest并且编写相应的单元测试函数,本次项目中由于项目较小且函数返回结果不固定,所以就没有写UnitTest。如果想了解Pytest的使用示例,可以参考我的其他代码仓库,例如:UnitTest的使用

6、实现检索功能

代码文件:app/rag/rag.py

importlogging
fromlangchain_core.promptsimportChatPromptTemplate
fromlangchain_core.runnablesimportRunnablePassthrough
fromlangchain_core.runnables.baseimportRunnableLambda
fromlangchain_core.output_parsersimportStrOutputParser
from.chroma_connimportChromaDB

#配置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s-%(levelname)s-%(message)s')


classRagManager:
def__init__(self,
chroma_server_type="http",
host="localhost",port=8000,
persist_path="chroma_db",
llm=None,embed=None):
self.llm=llm
self.embed=embed

chrom_db=ChromaDB(chroma_server_type=chroma_server_type,
host=host,port=port,
persist_path=persist_path,
embed=embed)
self.store=chrom_db.get_store()

defget_chain(self,retriever):
"""获取RAG查询链"""

#RAG系统经典的Prompt(A增强的过程)
prompt=ChatPromptTemplate.from_messages([
("human","""Youareanassistantforquestion-answeringtasks.Usethefollowingpieces
ofretrievedcontexttoanswerthequestion.
Ifyoudon'tknowtheanswer,justsaythatyoudon'tknow.
Usethreesentencesmaximumandkeeptheanswerconcise.
Question:{question}
Context:{context}
Answer:""")
])
#将format_docs方法包装为Runnable
format_docs_runnable=RunnableLambda(self.format_docs)
#RAG链
rag_chain=(
{"context":retriever|format_docs_runnable,
"question":RunnablePassthrough()}
|prompt
|self.llm
|StrOutputParser()
)

returnrag_chain

defformat_docs(self,docs):
#返回检索到的资料文件名称
logging.info(f"检索到资料文件个数:{len(docs)}")
retrieved_files="\n".join([doc.metadata["source"]fordocindocs])
logging.info(f"资料文件分别是:\n{retrieved_files}")

retrieved_content="\n\n".join(doc.page_contentfordocindocs)
logging.info(f"检索到的资料为:\n{retrieved_content}")

returnretrieved_content

defget_retriever(self,k=4,mutuality=0.3):
retriever=self.store.as_retriever(search_type="similarity_score_threshold",
search_kwargs={"k":k,"score_threshold":mutuality})

returnretriever

defget_result(self,question,k=4,mutuality=0.3):
"""获取RAG查询结果"""

retriever=self.get_retriever(k,mutuality)

rag_chain=self.get_chain(retriever)

returnrag_chain.invoke(input=question)

以上是实现了一个使用基本检索器的RAG,其中:

  • •代码中通过chroma_conn.py模块连接到ChromaDB数据库,并使用ChromaDB的as_retriever方法创建一个检索器。

7、测试检索效果

在test_framework.py中添加RAG的测试调用函数。

代码文件:app/test_framework.py

#测试RAG主流程
deftest_rag():
fromrag.ragimportRagManager
fromutils.utilimportget_qwen_models

llm,chat,embed=get_qwen_models()
rag=RagManager(host="localhost",port=8000,llm=llm,embed=embed)

result=rag.get_result("湖南长远锂科股份有限公司变更设立时作为发起人的法人有哪些?")

print(result)


if__name__=="__main__":
test_rag()#RAG测试函数
#test_import()#批量导入PDF测试函数

注释掉批量导入函数,开启test_rag()函数,运行效果:

至此,我们完成了RAG模块的基本功能,它包括PDF文件的批量导入以及检索功能。

内容小结

  • •首先,我们创建了一个ChromaDB的类,封装了基础的Chroma连接、插入、检索。

  • •其次,我们实现了PDFProcessor类,该类中会调用ChromaDB类的插入函数,将批量读取的PDF文件进行切分后保存至向量库。

  • •然后,我们实现了RagManager类,该类中封装了RAG的检索链,并定义了检索的参数。

  • •最后,我们实现了一个测试函数,用于测试RAG的检索功能。

  • •除此之外,有两个注意事项:

    • •在项目初期,进行合理的项目文件目录规划,可以有效减少项目维护的难度。

    • •在项目行进中,通过搭建测试框架,可以方便函数验证以及后续重构的回归测试。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ