fromlangchain.retrieversimportContextualCompressionRetrieverfromlangchain.retrievers.document_compressorsimportLLMChainExtractorfromlangchain_openaiimportChatOpenAI#1.定义一个基础检索器(先多检索一些,再压缩)base_retriever_for_comp=vectorstore.as_retriever(search_kwargs={"k":5})#2.定义一个LLMChainExtractor(压缩器)compressor=LLMChainExtractor.from_llm(llm=llm)#3.创建ContextualCompressionRetrievercompression_retriever=ContextualCompressionRetriever(base_compressor=compressor,base_retriever=base_retriever_for_comp)#4.使用query_comp="LangChain的调试工具叫什么?它的主要作用是什么?"retrieved_compressed_docs=compression_retriever.invoke(query_comp)print(f"对查询'{query_comp}'的ContextualCompressionRetriever检索结果:")fori,docinenumerate(retrieved_compressed_docs)
riginal_len=len(doc.metadata.get('original_content',doc.page_content))compressed_len=len(doc.page_content)print(f"文档{i+1}(原始长度:{original_len},压缩后长度:{compressed_len}):")print(doc.page_content)print("-"*30)fromtypingimportList,Tupleimportnumpyasnpfromlangchain_core.documentsimportDocumentdeffind_elbow_point(scores: np.ndarray) ->int: """ 使用点到直线最大距离的纯几何方法。 返回的是拐点在原始列表中的索引。 """ n_points =len(scores) ifn_points <3: returnn_points -1# 返回最后一个点的索引 # 创建点坐标 (x, y),x是索引,y是分数 points = np.column_stack((np.arange(n_points), scores)) # 获取第一个点和最后一个点 first_point = points[0] last_point = points[-1] # 计算每个点到首末点连线的垂直距离 # 使用向量射影的方法 line_vec = last_point - first_point line_vec_normalized = line_vec / np.linalg.norm(line_vec)
vec_from_first = points - first_point
# scalar_product 是每个点向量在直线方向上的投影长度 scalar_product = np.dot(vec_from_first, line_vec_normalized)
# vec_parallel 是投影向量 vec_parallel = np.outer(scalar_product, line_vec_normalized)
# vec_perpendicular 是垂直向量,它的模长就是距离 vec_perpendicular = vec_from_first - vec_parallel
dist_to_line = np.linalg.norm(vec_perpendicular, axis=1) # 找到距离最大的点的索引 elbow_index = np.argmax(dist_to_line) returnelbow_indexdeftruncate_with_elbow_method_final( reranked_docs
ist[Tuple[float, Document]]) ->List[Document]: ifnotreranked_docsorlen(reranked_docs) <3: print("文档数量不足3个,无法进行拐点检测,返回所有文档。") return[docfor_, docinreranked_docs] scores = np.array([scoreforscore, _inreranked_docs]) docs = [docfor_, docinreranked_docs]
# 调用我们验证过有效的拐点检测函数 elbow_index = find_elbow_point(scores)
# 我们需要包含拐点本身,所以截取到 elbow_index + 1 num_docs_to_keep = elbow_index +1 final_docs = docs[:num_docs_to_keep]
print(f"检测到分数拐点在第{elbow_index +1}位。截断后返回{len(final_docs)}个文档。") returnfinal_docsprint("\n--- 拐点检测示例 ---")# 假设 reranked_docs 是你的输入数据reranked_docs = [ (0.98,"文档1"), (0.95,"文档2"), (0.92,"文档3"), (0.75,"文档4"), (0.5,"文档5"), (0.48,"文档6")]final_documents = truncate_with_elbow_method_final(reranked_docs)print(final_documents)# 输出前三个文档