|
ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15.4px;font-weight: bold;display: table;margin-right: auto;margin-bottom: 2em;margin-left: auto;padding-right: 0.2em;padding-left: 0.2em;background: rgb(15, 76, 129);color: rgb(255, 255, 255);">前言ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;margin: 1.5em 8px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">我们在上一章《【项目实战】基于Agent的金融问答系统之项目简介》中简单介绍了项目背景以及数据集情况,本章将介绍RAG模块的实现。ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15.4px;font-weight: bold;display: table;margin: 4em auto 2em;padding-right: 0.2em;padding-left: 0.2em;background: rgb(15, 76, 129);color: rgb(255, 255, 255);">功能列表ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;margin: 1.5em 8px;letter-spacing: 0.1em;color: rgb(63, 63, 63);">参考之前所学内容《大模型之初识RAG》,我们需要实现如下功能:ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style: circle;color: rgb(63, 63, 63);" class="list-paddingleft-1"> •向量库的基础功能 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style-position: initial;list-style-image: initial;" class="list-paddingleft-1">•连接向量库 •数据入库 •文件导入 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style-position: initial;list-style-image: initial;" class="list-paddingleft-1">•PDF文件的读取 •PDF文件的切分 •调用向量库接口入库 •文件检索 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;padding-left: 1em;list-style-position: initial;list-style-image: initial;" class="list-paddingleft-1">•连接向量库 •检索器检索文件 ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15.4px;font-weight: bold;display: table;margin: 4em auto 2em;padding-right: 0.2em;padding-left: 0.2em;background: rgb(15, 76, 129);color: rgb(255, 255, 255);">开发过程ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 14px;font-weight: bold;margin-top: 2em;margin-right: 8px;margin-bottom: 0.75em;padding-left: 8px;border-left: 3px solid rgb(15, 76, 129);color: rgb(63, 63, 63);">1、规划工程文件项目开始之后,我们如果能够抑制住直接撸代码的冲动,改为提前做好规划,这会是一个好的习惯。 为此,我提前做了如下规划: 代码管理•代码使用Git进行管理,这样后期多人协作时方便代码更新、代码Merge、冲突解决等。 •由于国内访问Github优势会ban,所以我们将代码仓库放在Gitee上。 •为代码仓库起了一个响亮的名称后,仓库地址定为https://gitee.com/deadwalk/smart-finance-bot
项目目录考虑这个项目会涉及到前端、后端、数据集、模型等,所以项目目录规划如下: smart-finance-bot\ |-dataset\#该目录用于保存PDF以及SQLite数据 |-doc\#该目录用于保存文档类文件,例如:需求文档、说明文档、数据文档 |-app\#该目录用于服务端代码 |-agent\#该目录用于保存agent相关代码 |-rag\#该目录用于保存rag相关代码 |-test\#该目录用于保存测试类驱动相关代码 |-conf\#该目录用于保存配置文件 |-.qwen#该文件保存QWen的配置文件(请自行添加对应的APIKEY) |-.ernie#该文件保存百度千帆的配置文件(请自行添加对应的APIKEY) |-chatweb\#该目录用于保存前端页面代码 |-scripts\#该目录用于保存脚本文件,例如:启动脚本、导入向量数据库脚本等 |-test_result\#该目录用于保存测试结果 |-docker\ |-backend\#该目录对应后端python服务的Dockerfile |-frontend\#该目录对应前端python服务的Dockerfile
上述目录中,dataset是直接使用git的submodul命令,直接将天池大赛提供的数据集引入到本项目中,方便后续使用。 引入方法: gitsubmoduleaddhttps://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.gitdataset 命名规范项目如果能够约束统一的命名规范,这对于后续代码的可读性、可维护性会提供需要便利,在此我沿用了约定俗成的代码命名规范: 整体命名时,要尽量见文知意。 2、实现基本的连接大模型的util库代码文件及目录:app/utils/util.py fromdotenvimportload_dotenv importos
#获取当前文件的目录 current_dir=os.path.dirname(__file__)
#构建到conf/.qwen的相对路径 conf_file_path_qwen=os.path.join(current_dir,'..','conf','.qwen')
#加载千问环境变量 load_dotenv(dotenv_path=conf_file_path_qwen)
defget_qwen_models(): """ 加载千问系列大模型 """ #llm大模型 fromlangchain_community.llms.tongyiimportTongyi
llm=Tongyi(model="qwen-max",temperature=0.1,top_p=0.7,max_tokens=1024)
#chat大模型 fromlangchain_community.chat_modelsimportChatTongyi
chat=ChatTongyi(model="qwen-max",temperature=0.01,top_p=0.2,max_tokens=1024) #embedding大模型 fromlangchain_community.embeddingsimportDashScopeEmbeddings
embed=DashScopeEmbeddings(model="text-embedding-v3")
returnllm,chat,embed
在app/conf/.qwen中,添加对应的API KEY,例如: DASHSCOPE_API_KEY=sk-xxxxxx 3、实现向量库基础功能向量库文件考虑使用Chroma实现,所以我们先实现一个向量库的类,用于完成基本的向量库连接、数据入库操作。 代码文件及目录:app/rag/chroma_conn.py importchromadb fromchromadbimportSettings fromlangchain_chromaimportChroma
classChromaDB: def__init__(self, chroma_server_type="local",#服务器类型:http是http方式连接方式,local是本地读取文件方式 host="localhost",port=8000,#服务器地址,http方式必须指定 persist_path="chroma_db",#数据库的路径:如果是本地连接,需要指定数据库的路径 collection_name="langchain",#数据库的集合名称 embed=None#数据库的向量化函数 ):
self.host=host self.port=port self.path=persist_path self.embed=embed self.store=None
#如果是http协议方式连接数据库 ifchroma_server_type=="http": client=chromadb.HttpClient(host=host,port=port)
self.store=Chroma(collection_name=collection_name, embedding_function=embed, client=client)
ifchroma_server_type=="local": self.store=Chroma(collection_name=collection_name, embedding_function=embed, persist_directory=persist_path)
ifself.storeisNone: raiseValueError("Chromastoreinitfailed!")
defadd_with_langchain(self,docs): """ 将文档添加到数据库 """ self.store.add_documents(documents=docs)
defget_store(self): """ 获得向量数据库的对象实例 """ returnself.store
在实际项目实践过程中,我们发现导入Chroma数据时使用本地化连接方式更快一些,所以对连接方式做了两个参数的扩展,local 代表本地连接,http 代表远程连接。
4、实现入库功能本着先跑通流程,再优化交互过程的思路,对于PDF文件入向量库的过程,我们先通过一段脚本实现(暂不做前端UI的交互)。 代码文件及目录:app/rag/pdf_processor.py importos importlogging importtime fromtqdmimporttqdm fromlangchain_community.document_loadersimportPyMuPDFLoader fromlangchain_text_splittersimportRecursiveCharacterTextSplitter fromrag.chroma_connimportChromaDB
classPDFProcessor: def__init__(self, directory,#PDF文件所在目录 chroma_server_type,#ChromaDB服务器类型 persist_path,#ChromaDB持久化路径 embed):#向量化函数
self.directory=directory self.file_group_num=80#每组处理的文件数 self.batch_num=6#每次插入的批次数量
self.chunksize=500#切分文本的大小 self.overlap=100#切分文本的重叠大小
self.chroma_db=ChromaDB(chroma_server_type=chroma_server_type, persist_path=persist_path, embed=embed) #配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s-%(levelname)s-%(message)s', datefmt='%Y-%m-%d%H:%M:%S' )
defload_pdf_files(self): """ 加载目录下的所有PDF文件 """ pdf_files=[] forfileinos.listdir(self.directory): iffile.lower().endswith('.pdf'): pdf_files.append(os.path.join(self.directory,file))
logging.info(f"Found{len(pdf_files)}PDFfiles.") returnpdf_files
defload_pdf_content(self,pdf_path): """ 读取PDF文件内容 """ pdf_loader=PyMuPDFLoader(file_path=pdf_path) docs=pdf_loader.load() logging.info(f"Loadingcontentfrom{pdf_path}.") returndocs
defsplit_text(self,documents): """ 将文本切分成小段 """ #切分文档 text_splitter=RecursiveCharacterTextSplitter( chunk_size=self.chunksize, chunk_overlap=self.overlap, length_function=len, add_start_index=True, )
docs=text_splitter.split_documents(documents)
logging.info("SplittextintosmallerchunkswithRecursiveCharacterTextSplitter.") returndocs
definsert_docs_chromadb(self,docs,batch_size=6): """ 将文档插入到ChromaDB """ #分批入库 logging.info(f"Inserting{len(docs)}documentsintoChromaDB.")
#记录开始时间 start_time=time.time() total_docs_inserted=0
#计算总批次 total_batches=(len(docs)+batch_size-1)//batch_size
withtqdm(total=total_batches,desc="Insertingbatches",unit="batch")aspbar: foriinrange(0,len(docs),batch_size): #获取当前批次的样本 batch=docs[i:i+batch_size]
#将样本入库 self.chroma_db.add_with_langchain(batch) #self.chroma_db.async_add_with_langchain(batch)
#更新已插入的文档数量 total_docs_inserted+=len(batch)
#计算并显示当前的TPM elapsed_time=time.time()-start_time#计算已用时间(秒) ifelapsed_time>0:#防止除以零 tpm=(total_docs_inserted/elapsed_time)*60#转换为每分钟插入的文档数 pbar.set_postfix({"TPM":f"{tpm:.2f}"})#更新进度条的后缀信息
#更新进度条 pbar.update(1)
defprocess_pdfs_group(self,pdf_files_group): #读取PDF文件内容 pdf_contents=[]
forpdf_pathinpdf_files_group: #读取PDF文件内容 documents=self.load_pdf_content(pdf_path)
#将documents逐一添加到pdf_contents pdf_contents.extend(documents)
#将文本切分成小段 docs=self.split_text(pdf_contents)
#将文档插入到ChromaDB self.insert_docs_chromadb(docs,self.batch_num)
defprocess_pdfs(self): #获取目录下所有的PDF文件 pdf_files=self.load_pdf_files()
group_num=self.file_group_num
#group_num个PDF文件为一组,分批处理 foriinrange(0,len(pdf_files),group_num): pdf_files_group=pdf_files[i:i+group_num] self.process_pdfs_group(pdf_files_group)
print("PDFsprocessedsuccessfully!")
5、测试导入功能因为Python的导入库的原因(一般都是从工作目录查找),所以我们在项目根目录下创建test_framework.py,方便后续统一测试工作。 smart-finance-bot\ |-app\#该目录用于服务端代码 |-rag\#该目录用于保存rag相关代码 |-pdf_processor.py |-chroma_conn.py |-test_framework.py
代码文件:app/test_framework.py #测试导入PDF到向量库主流程 deftest_import(): fromrag.pdf_processorimportPDFProcessor fromutils.utilimportget_qwen_models
llm,chat,embed=get_qwen_models() #embed=get_huggingface_embeddings()
directory="./app/dataset/pdf" persist_path="chroma_db" server_type="local"
#创建PDFProcessor实例 pdf_processor=PDFProcessor(directory=directory, chroma_server_type=server_type, persist_path=persist_path, embed=embed)
#处理PDF文件 pdf_processor.process_pdfs()
if__name__=="__main__": test_import()
1、通过命令行启动ChromaDB服务端: chromarun--pathchroma_db--port8000--hostlocalhost 2、运行test_framework.py 运行效果:  备注:一般测试框架会使用Pytest并且编写相应的单元测试函数,本次项目中由于项目较小且函数返回结果不固定,所以就没有写UnitTest。如果想了解Pytest的使用示例,可以参考我的其他代码仓库,例如:UnitTest的使用
6、实现检索功能代码文件:app/rag/rag.py importlogging fromlangchain_core.promptsimportChatPromptTemplate fromlangchain_core.runnablesimportRunnablePassthrough fromlangchain_core.runnables.baseimportRunnableLambda fromlangchain_core.output_parsersimportStrOutputParser from.chroma_connimportChromaDB
#配置日志记录 logging.basicConfig(level=logging.INFO,format='%(asctime)s-%(levelname)s-%(message)s')
classRagManager: def__init__(self, chroma_server_type="http", host="localhost",port=8000, persist_path="chroma_db", llm=None,embed=None): self.llm=llm self.embed=embed
chrom_db=ChromaDB(chroma_server_type=chroma_server_type, host=host,port=port, persist_path=persist_path, embed=embed) self.store=chrom_db.get_store()
defget_chain(self,retriever): """获取RAG查询链"""
#RAG系统经典的Prompt(A增强的过程) prompt=ChatPromptTemplate.from_messages([ ("human","""Youareanassistantforquestion-answeringtasks.Usethefollowingpieces ofretrievedcontexttoanswerthequestion. Ifyoudon'tknowtheanswer,justsaythatyoudon'tknow. Usethreesentencesmaximumandkeeptheanswerconcise. Question:{question} Context:{context} Answer:""") ]) #将format_docs方法包装为Runnable format_docs_runnable=RunnableLambda(self.format_docs) #RAG链 rag_chain=( {"context":retriever|format_docs_runnable, "question":RunnablePassthrough()} |prompt |self.llm |StrOutputParser() )
returnrag_chain
defformat_docs(self,docs): #返回检索到的资料文件名称 logging.info(f"检索到资料文件个数:{len(docs)}") retrieved_files="\n".join([doc.metadata["source"]fordocindocs]) logging.info(f"资料文件分别是:\n{retrieved_files}")
retrieved_content="\n\n".join(doc.page_contentfordocindocs) logging.info(f"检索到的资料为:\n{retrieved_content}")
returnretrieved_content
defget_retriever(self,k=4,mutuality=0.3): retriever=self.store.as_retriever(search_type="similarity_score_threshold", search_kwargs={"k":k,"score_threshold":mutuality})
returnretriever
defget_result(self,question,k=4,mutuality=0.3): """获取RAG查询结果"""
retriever=self.get_retriever(k,mutuality)
rag_chain=self.get_chain(retriever)
returnrag_chain.invoke(input=question)
以上是实现了一个使用基本检索器的RAG,其中: 7、测试检索效果在test_framework.py中添加RAG的测试调用函数。 代码文件:app/test_framework.py #测试RAG主流程 deftest_rag(): fromrag.ragimportRagManager fromutils.utilimportget_qwen_models
llm,chat,embed=get_qwen_models() rag=RagManager(host="localhost",port=8000,llm=llm,embed=embed)
result=rag.get_result("湖南长远锂科股份有限公司变更设立时作为发起人的法人有哪些?")
print(result)
if__name__=="__main__": test_rag()#RAG测试函数 #test_import()#批量导入PDF测试函数
注释掉批量导入函数,开启test_rag()函数,运行效果:  至此,我们完成了RAG模块的基本功能,它包括PDF文件的批量导入以及检索功能。 内容小结•首先,我们创建了一个ChromaDB的类,封装了基础的Chroma连接、插入、检索。 •其次,我们实现了PDFProcessor类,该类中会调用ChromaDB类的插入函数,将批量读取的PDF文件进行切分后保存至向量库。 •然后,我们实现了RagManager类,该类中封装了RAG的检索链,并定义了检索的参数。 •最后,我们实现了一个测试函数,用于测试RAG的检索功能。 •除此之外,有两个注意事项:
|