fromlangchain.storageimportInMemoryStorefromlangchain_core.documentsimportDocumentfromlangchain.retrievers.multi_vectorimportMultiVectorRetrieverfromlangchain_core.output_parsersimportStrOutputParserfromlangchain_core.promptsimportChatPromptTemplateimportuuiddocs = [Document(page_content="RAG-Fusion通过生成多个查询变体并使用RRF算法智能排序来提升检索相关性。", metadata={"doc_id":str(uuid.uuid4())}),Document(page_content="假设性文档嵌入(HyDE)先让LLM生成一个理想答案,再用该答案的嵌入来检索真实文档。", metadata={"doc_id":str(uuid.uuid4())}),]doc_ids = [doc.metadata["doc_id"]fordocindocs]question_gen_prompt_str = ("你是一位AI专家。请根据以下文档内容,生成3个用户可能会提出的、高度相关的问题。\n""只返回问题列表,每个问题占一行,不要有其他前缀或编号。\n\n""文档内容:\n""----------\n""{content}\n""----------\n")question_gen_prompt = ChatPromptTemplate.from_template(question_gen_prompt_str)question_generator_chain = question_gen_prompt | llm | StrOutputParser()sub_docs = []fori, docinenumerate(docs):doc_id = doc_ids[i]generated_questions = question_generator_chain.invoke({"content": doc.page_content}).split("\n")generated_questions = [q.strip()forqingenerated_questionsifq.strip()]forqingenerated_questions:sub_docs.append(Document(page_content=q, metadata={"doc_id": doc_id}))vectorstore_qa = Chroma.from_documents(documents=sub_docs, embedding=embeddings)doc_store = InMemoryStore()doc_store.mset(list(zip(doc_ids, docs)))multivector_retriever = MultiVectorRetriever(vectorstore=vectorstore_qa,docstore=doc_store,id_key="doc_id",)user_query ="RAG-Fusion是怎么工作的?"retrieved_qa_docs = multivector_retriever.invoke(user_query)
fromlangchain.retrieversimportMultiQueryRetriever#1.从LLM和向量数据库创建一个MultiQueryRetriever#它会自动处理“生成查询->检索->合并去重”的整个流程multiquery_retriever=MultiQueryRetriever.from_llm(retriever=vectorstore.as_retriever(),#使用我们创建的向量数据库作为基础检索器llm=llm#使用我们初始化的LLM来生成子查询)#2.使用原始查询进行调用user_query="如何通过修改问题来改进检索效果?"retrieved_docs=multiquery_retriever.invoke(user_query)#3.打印结果print_docs(retrieved_docs,f"查询扩展(MultiQuery)对'{user_query}'的检索结果")#深入了解它生成了哪些子查询importlogginglogging.basicConfig()logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)retrieved_docs=multiquery_retriever.invoke(user_query)fromlangchain_core.promptsimportChatPromptTemplatefromlangchain_core.output_parsersimportStrOutputParserfromlangchain_core.documentsimportDocumentimportoperator# 1. 定义一个用于生成子查询的链 (Chain)query_gen_prompt = ChatPromptTemplate.from_messages([("user","你是一位AI研究员。请根据以下问题,生成3个不同角度的、语义相似的查询。\n""每个查询占一行,不要有其他前缀或编号。\n\n""原始问题: {original_question}")])generate_queries_chain = query_gen_prompt | llm | StrOutputParser() | (lambdax: x.split("\n"))# 2. 定义RRF算法函数defreciprocal_rank_fusion(retrieval_results:list[list[Document]], k=60):fused_scores = {}fordoc_listinretrieval_results:forrank, docinenumerate(doc_list):doc_id = doc.page_contentifdoc_idnotinfused_scores:fused_scores[doc_id] =0fused_scores[doc_id] +=1/ (k + rank)reranked_results = [next((docfordoc_listinretrieval_resultsfordocindoc_listifdoc.page_content == doc_id),None)fordoc_id, scoreinsorted(fused_scores.items(), key=operator.itemgetter(1), reverse=True)]return[docfordocinreranked_resultsifdocisnotNone]defrag_fusion_pipeline(original_question:str):# 生成多个查询generated_queries = generate_queries_chain.invoke({"original_question": original_question})all_queries = [original_question] + generated_queriesprint(f"生成的查询:{all_queries}")# 独立检索每个查询retriever = vectorstore.as_retriever()retrieval_results = [retriever.invoke(q)forqinall_queries]# 应用RRF算法对结果进行融合和重排final_docs = reciprocal_rank_fusion(retrieval_results)returnfinal_docs# 4. 调用user_query ="如何处理用户提问太具体的情况?"fusion_docs = rag_fusion_pipeline(user_query)print(fusion_docs,f"RAG-Fusion 对 '{user_query}' 的检索结果")
fromlangchain_core.runnablesimportRunnableParallel,RunnablePassthrough#1.定义生成“后退一步”问题的Prompt和链step_back_prompt_template=ChatPromptTemplate.from_messages([("user","你是一位善于提炼核心问题的专家。请将以下可能很具体的问题,""抽象成一个更通用、更高层次的“后退一步”的问题。\n\n""例如:'LangChain的LCEL和Python的asyncio库是如何交互的?'->'LangChainLCEL的异步执行机制是怎样的?'\n\n""原始问题:{original_question}")])step_back_chain=step_back_prompt_template|llm|StrOutputParser()retriever=vectorstore.as_retriever()chain=({#第一个分支:对原始问题进行检索"original_docs":RunnablePassthrough()|retriever,#第二个分支:先生成后退问题,再用它进行检索"step_back_docs":step_back_chain|retriever,}#将两个分支的结果合并、去重|(lambdax:remove_duplicates_by_id(x["original_docs"]+x["step_back_docs"])))#辅助函数去重defremove_duplicates_by_id(documents):seen_ids=set()unique_docs=[]fordocindocuments:#假设page_content是唯一标识ifdoc.page_contentnotinseen_ids:unique_docs.append(doc)seen_ids.add(doc.page_content)returnunique_docsuser_query="RAG-Fusion里那个RRF算法的平滑参数k有什么用?"step_back_docs=chain.invoke({"original_question":user_query})#示例:使用EnsembleRetriever实现混合搜索fromlangchain.retrieversimportEnsembleRetrieverfromlangchain_community.retrieversimportBM25Retrieverfromlangchain.retrieversimportEnsembleRetriever#假设all_splits和vectorstore已准备好#1.初始化关键词检索器(SparseRetriever)bm25_retriever=BM25Retriever.from_documents(all_splits)bm25_retriever.k=3#检索3个结果#2.初始化向量检索器(DenseRetriever)vector_retriever=vectorstore.as_retriever(search_kwargs={"k":3})#3.初始化EnsembleRetriever,并设置权重#weights参数决定了最终排序时,两种检索器结果的权重ensemble_retriever=EnsembleRetriever(retrievers=[bm25_retriever,vector_retriever],weights=[0.4,0.6]#稍微偏重向量搜索的语义理解能力)#4.使用query="LangChain中的LCEL是什么?"retrieved_docs=ensemble_retriever.invoke(query)print(f"混合搜索召回了{len(retrieved_docs)}个文档。")在本篇文章中,我们探讨了多种用于优化RAG系统的检索机制,包括索引构建的最佳实践、多样的查询转换策略以及混合搜索的实现。这些技术旨在从根本上提升检索的准确性与召回率。
然而,获取初步的文档列表只是整个流程的第一步。这些结果在相关性上可能仍然参差不齐,包含了与问题不直接相关的噪音信息。因此,下一步的关键任务,就是如何对这些初步结果进行有效的后处理与筛选。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |