返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

从零构建、100% 本地运行:Qwen 3 Local RAG Reasoning Agent

[复制链接]
链载Ai 显示全部楼层 发表于 昨天 20:58 |阅读模式 打印 上一主题 下一主题

今天一起从零搭建一个基于本地运行的Qwen 3和Gemma 3模型的 RAG 系统,结合文档处理、向量搜索和网络搜索功能,为用户提供准确且上下文相关的回答,项目来自 Unwind AI 的教程,开源地址见阅读原文,今天咱们一起来解读项目的搭建和技术重点。

项目概述

· 名称:Qwen 3 Local RAG Reasoning Agent

· 目标:通过本地运行的轻量级 LLM 和向量数据库,构建一个高效的 RAG 系统,支持文档问答、网页内容提取和网络搜索。

· 核心功能:

1. 文档处理:支持上传 PDF 文件或输入网页 URL,提取内容并进行智能分块。

2. 向量搜索:使用 Qdrant 向量数据库存储文档嵌入(embeddings),实现高效的相似性搜索。

3. 网络搜索:当文档知识不足时,可通过 Exa API 进行网络搜索,补充答案。

4. 灵活模式:支持 RAG 模式(结合文档和搜索)和直接 LLM 交互模式。

5. 隐私保护:所有处理都在本地完成,适合处理敏感数据。

技术架构

1. 语言模型:

· 支持多种本地模型:Qwen 3(1.7B、8B)、Gemma 3(1B、4B)、DeepSeek(1.5B)。

· 通过Ollama框架在本地运行模型,降低对云服务的依赖。

2. 文档处理:

· 使用PyPDFLoader处理 PDF 文件,WebBaseLoader提取网页内容。

·RecursiveCharacterTextSplitter将文档切分为小块,便于嵌入和搜索。

3. 向量数据库:

· 使用Qdrant存储文档的嵌入向量,支持高效的相似性搜索。

· 嵌入模型:Ollama 提供的snowflake-arctic-embed。

4. 网络搜索:

· 通过Exa API实现网络搜索,支持自定义域名过滤。

5. 用户界面:

· 使用Streamlit构建交互式 Web 界面,方便用户上传文件、输入 URL 和提问。

主要功能

1. 文档问答:

· 用户上传 PDF 或输入 URL,系统将内容转为嵌入向量,存储在 Qdrant 中。

· 用户提问时,系统通过相似性搜索找到相关文档片段,生成答案。

2. 网络搜索补充:

· 如果文档中没有足够信息,系统会自动或手动(通过开关)触发网络搜索,获取补充信息。

· 搜索结果会明确标注来源。

3. 灵活配置:

· 可选择不同模型(如 Qwen 3 或 Gemma 3)。

· 可调整相似性阈值,控制文档检索的严格程度。

· 支持禁用 RAG 模式,直接与 LLM 对话。

4. 隐私与离线支持:

· 所有模型和处理都在本地运行,无需将数据发送到云端。

· 适合需要数据隐私的场景或无网络环境。

使用方法

1. 环境准备:

· 安装 Ollama 和 Python 3.8+。

· 通过 Docker 运行 Qdrant 向量数据库。

· 获取 Exa API 密钥(可选,用于网络搜索)。

2. 安装依赖:

pipinstall-rrequirements.txt

3. 拉取模型:

ollamapullqwen3:1.7bollamapullsnowflake-arctic-embed

4. 运行 Qdrant:

dockerrun-p6333:6333-p6334:6334-v"$(pwd)/qdrant_storage:/qdrant/storage:z"qdrant/qdrant

5. 启动应用:

streamlitrunqwen_local_rag_agent.py

6. 操作:

· 在 Streamlit 界面上传 PDF 或输入 URL。

· 调整模型、RAG 模式或搜索设置。

· 输入问题,获取带来源的答案。

应用场景

· 学术研究:快速查询上传的论文或网页内容,结合网络搜索补充最新信息。

· 企业文档管理:处理内部文档(如手册、报告),提供智能问答。

· 隐私敏感场景:在本地处理法律、医疗等敏感文档,避免数据外泄。

· 离线环境:在无网络情况下,利用本地模型和文档进行知识查询。

项目优势

· 开源免费:代码公开,可自由修改和部署。

· 本地化:无需依赖云服务,保护数据隐私。

· 模块化:支持多种模型和配置,易于扩展。

· 用户友好:Streamlit 界面简单直观,适合非技术用户。

总结

这个项目是一个功能强大且灵活的本地 RAG 系统,结合了本地语言模型、向量数据库和网络搜索,适合需要隐私保护、离线操作或定制化知识查询的场景。通过简单的配置,用户可以快速构建一个智能问答助手,处理文档和网页内容,同时保持数据安全。

源代码
为访问 Github 不便的朋友附上源码:
requirements.txt
agnopypdfexaqdrant-clientlangchain-qdrantlangchain-communitystreamlitollama
qwen_local_rag_agent.py
importosimporttempfilefromdatetimeimportdatetimefromtypingimportListimportstreamlitasstimportbs4fromagno.agentimportAgentfromagno.models.ollamaimportOllamafromlangchain_community.document_loadersimportPyPDFLoader, WebBaseLoaderfromlangchain.text_splitterimportRecursiveCharacterTextSplitterfromlangchain_qdrantimportQdrantVectorStorefromqdrant_clientimportQdrantClientfromqdrant_client.modelsimportDistance, VectorParamsfromlangchain_core.embeddingsimportEmbeddingsfromagno.tools.exaimportExaToolsfromagno.embedder.ollamaimportOllamaEmbedder

classOllamaEmbedderr(Embeddings): def__init__(self, model_name="snowflake-arctic-embed"): """ Initialize the OllamaEmbedderr with a specific model.
Args: model_name (str): The name of the model to use for embedding. """ self.embedder = OllamaEmbedder(id=model_name, dimensions=1024)
defembed_documents(self, textsist[str]) ->List[List[float]]: return[self.embed_query(text)fortextintexts]
defembed_query(self, text:str) ->List[float]: returnself.embedder.get_embedding(text)

# ConstantsCOLLECTION_NAME ="test-qwen-r1"

# Streamlit App Initializationst.title("? Qwen 3 Local RAG Reasoning Agent")
# --- Add Model Info Boxes ---st.info("**Qwen3:** The latest generation of large language models in Qwen series, offering a comprehensive suite of dense and mixture-of-experts (MoE) models.")st.info("**Gemma 3:** These models are multimodal—processing text and images—and feature a 128K context window with support for over 140 languages.")# -------------------------
# Session State Initializationif'model_version'notinst.session_state: st.session_state.model_version ="qwen3:1.7b"# Default to lighter modelif'vector_store'notinst.session_state: st.session_state.vector_store =Noneif'processed_documents'notinst.session_state: st.session_state.processed_documents = []if'history'notinst.session_state: st.session_state.history = []if'exa_api_key'notinst.session_state: st.session_state.exa_api_key =""if'use_web_search'notinst.session_state: st.session_state.use_web_search =Falseif'force_web_search'notinst.session_state: st.session_state.force_web_search =Falseif'similarity_threshold'notinst.session_state: st.session_state.similarity_threshold =0.7if'rag_enabled'notinst.session_state: st.session_state.rag_enabled =True# RAG is enabled by default

# Sidebar Configurationst.sidebar.header("⚙️ Settings")
# Model Selectionst.sidebar.header("? Model Choice")model_help ="""- qwen3:1.7b: Lighter model (MoE)- gemma3:1b: More capable but requires better GPU/RAM(32k context window)- gemma3:4b: More capable and MultiModal (Vision)(128k context window)- deepseek-r1:1.5b- qwen3:8b: More capable but requires better GPU/RAM
Choose based on your hardware capabilities."""st.session_state.model_version = st.sidebar.radio( "Select Model Version", options=["qwen3:1.7b","gemma3:1b","gemma3:4b","deepseek-r1:1.5b","qwen3:8b"], help=model_help)
st.sidebar.info("Run ollama pull qwen3:1.7b")
# RAG Mode Togglest.sidebar.header("? RAG Mode")st.session_state.rag_enabled = st.sidebar.toggle("Enable RAG", value=st.session_state.rag_enabled)
# Clear Chat Buttonifst.sidebar.button("✨ Clear Chat"): st.session_state.history = [] st.rerun()
# Show API Configuration only if RAG is enabledifst.session_state.rag_enabled: st.sidebar.header("? Search Tuning") st.session_state.similarity_threshold = st.sidebar.slider( "Similarity Threshold", min_value=0.0, max_value=1.0, value=0.7, help="Lower values will return more documents but might be less relevant. Higher values are more strict." )
# Add in the sidebar configuration section, after the existing API inputs
st.sidebar.header("? Web Search")st.session_state.use_web_search = st.sidebar.checkbox("Enable Web Search Fallback", value=st.session_state.use_web_search)
ifst.session_state.use_web_search: exa_api_key = st.sidebar.text_input( "Exa AI API Key", type="password", value=st.session_state.exa_api_key, help="Required for web search fallback when no relevant documents are found" ) st.session_state.exa_api_key = exa_api_key
# Optional domain filtering default_domains = ["arxiv.org","wikipedia.org","github.com","medium.com"] custom_domains = st.sidebar.text_input( "Custom domains (comma-separated)", value=",".join(default_domains), help="Enter domains to search from, e.g.: arxiv.org,wikipedia.org" ) search_domains = [d.strip()fordincustom_domains.split(",")ifd.strip()]
# Utility Functionsdefinit_qdrant() -> QdrantClient |None: """Initialize Qdrant client with local Docker setup.
Returns: QdrantClient: The initialized Qdrant client if successful. None: If the initialization fails. """ try: returnQdrantClient(url="http://localhost:6333") exceptExceptionase: st.error(f"? Qdrant connection failed:{str(e)}") returnNone

# Document Processing Functionsdefprocess_pdf(file) ->List: """rocess PDF file and add source metadata.""" try: withtempfile.NamedTemporaryFile(delete=False, suffix='.pdf')astmp_file: tmp_file.write(file.getvalue()) loader = PyPDFLoader(tmp_file.name) documents = loader.load()
# Add source metadata fordocindocuments: doc.metadata.update({ "source_type":"pdf", "file_name": file.name, "timestamp": datetime.now().isoformat() })
text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) returntext_splitter.split_documents(documents) exceptExceptionase: st.error(f"? PDF processing error:{str(e)}") return[]

defprocess_web(url:str) ->List: """rocess web URL and add source metadata.""" try: loader = WebBaseLoader( web_paths=(url,), bs_kwargs=dict( parse_only=bs4.SoupStrainer( class_=("post-content","post-title","post-header","content","main") ) ) ) documents = loader.load()
# Add source metadata fordocindocuments: doc.metadata.update({ "source_type":"url", "url": url, "timestamp": datetime.now().isoformat() })
text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) returntext_splitter.split_documents(documents) exceptExceptionase: st.error(f"? Web processing error:{str(e)}") return[]

# Vector Store Managementdefcreate_vector_store(client, texts): """Create and initialize vector store with documents.""" try: # Create collection if needed try: client.create_collection( collection_name=COLLECTION_NAME, vectors_config=VectorParams( size=1024, distance=Distance.COSINE ) ) st.success(f"? Created new collection:{COLLECTION_NAME}") exceptExceptionase: if"already exists"notinstr(e).lower(): raisee
# Initialize vector store vector_store = QdrantVectorStore( client=client, collection_name=COLLECTION_NAME, embedding=OllamaEmbedderr() )
# Add documents withst.spinner('? Uploading documents to Qdrant...'): vector_store.add_documents(texts) st.success("✅ Documents stored successfully!") returnvector_store
exceptExceptionase: st.error(f"? Vector store error:{str(e)}") returnNone
defget_web_search_agent() -> Agent: """Initialize a web search agent.""" returnAgent( name="Web Search Agent", model=Ollama(id="llama3.2"), tools=[ExaTools( api_key=st.session_state.exa_api_key, include_domains=search_domains, num_results=5 )], instructions="""You are a web search expert. Your task is to: 1. Search the web for relevant information about the query 2. Compile and summarize the most relevant information 3. Include sources in your response """, show_tool_calls=True, markdown=True, )

defget_rag_agent() -> Agent: """Initialize the main RAG agent.""" returnAgent( name="Qwen 3 RAG Agent", model=Ollama(id=st.session_state.model_version), instructions="""You are an Intelligent Agent specializing in providing accurate answers.
When asked a question: - Analyze the question and answer the question with what you know.
When given context from documents: - Focus on information from the provided documents - Be precise and cite specific details
When given web search results: - Clearly indicate that the information comes from web search - Synthesize the information clearly
Always maintain high accuracy and clarity in your responses. """, show_tool_calls=True, markdown=True, )



defcheck_document_relevance(query:str, vector_store, threshold:float=0.7) ->tuple[bool,List]:
ifnotvector_store: returnFalse, []
retriever = vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k":5,"score_threshold": threshold} ) docs = retriever.invoke(query) returnbool(docs), docs

chat_col, toggle_col = st.columns([0.9,0.1])
withchat_col: prompt = st.chat_input("Ask about your documents..."ifst.session_state.rag_enabledelse"Ask me anything...")
withtoggle_col: st.session_state.force_web_search = st.toggle('?',help="Force web search")
# Check if RAG is enabledifst.session_state.rag_enabled: qdrant_client = init_qdrant()
# --- Document Upload Section (Moved to Main Area) --- withst.expander("? Upload Documents or URLs for RAG", expanded=False): ifnotqdrant_client: st.warning("⚠️ Please configure Qdrant API Key and URL in the sidebar to enable document processing.") else: uploaded_files = st.file_uploader( "Upload PDF files", accept_multiple_files=True, type='pdf' ) url_input = st.text_input("Enter URL to scrape")
ifuploaded_files: st.write(f"rocessing{len(uploaded_files)}PDF file(s)...") all_texts = [] forfileinuploaded_files: iffile.namenotinst.session_state.processed_documents: withst.spinner(f"rocessing{file.name}... "): texts = process_pdf(file) iftexts: all_texts.extend(texts) st.session_state.processed_documents.append(file.name) else: st.write(f"?{file.name}already processed.")
ifall_texts: withst.spinner("Creating vector store..."): st.session_state.vector_store = create_vector_store(qdrant_client, all_texts)
ifurl_input: ifurl_inputnotinst.session_state.processed_documents: withst.spinner(f"Scraping and processing{url_input}..."): texts = process_web(url_input) iftexts: st.session_state.vector_store = create_vector_store(qdrant_client, texts) st.session_state.processed_documents.append(url_input) else: st.write(f"?{url_input}already processed.")
ifst.session_state.vector_store: st.success("Vector store is ready.") elifnotuploaded_filesandnoturl_input: st.info("Upload PDFs or enter a URL to populate the vector store.")
# Display sources in sidebar ifst.session_state.processed_documents: st.sidebar.header("? Processed Sources") forsourceinst.session_state.processed_documents: ifsource.endswith('.pdf'): st.sidebar.text(f"?{source}") else: st.sidebar.text(f"?{source}")
ifprompt: # Add user message to history st.session_state.history.append({"role":"user","content": prompt}) withst.chat_message("user"): st.write(prompt)
ifst.session_state.rag_enabled:
# Existing RAG flow remains unchanged withst.spinner("?Evaluating the Query..."): try: rewritten_query = prompt
withst.expander("Evaluating the query"): st.write(f"User's Prompt:{prompt}") exceptExceptionase: st.error(f"❌ Error rewriting query:{str(e)}") rewritten_query = prompt
# Step 2: Choose search strategy based on force_web_search toggle context ="" docs = [] ifnotst.session_state.force_web_searchandst.session_state.vector_store: # Try document search first retriever = st.session_state.vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={ "k":5, "score_threshold": st.session_state.similarity_threshold } ) docs = retriever.invoke(rewritten_query) ifdocs: context ="\n\n".join([d.page_contentfordindocs]) st.info(f"? Found{len(docs)}relevant documents (similarity >{st.session_state.similarity_threshold})") elifst.session_state.use_web_search: st.info("? No relevant documents found in database, falling back to web search...")
# Step 3: Use web search if: # 1. Web search is forced ON via toggle, or # 2. No relevant documents found AND web search is enabled in settings if(st.session_state.force_web_searchornotcontext)andst.session_state.use_web_searchandst.session_state.exa_api_key: withst.spinner("? Searching the web..."): try: web_search_agent = get_web_search_agent() web_results = web_search_agent.run(rewritten_query).content ifweb_results: context =f"Web Search Results:\n{web_results}" ifst.session_state.force_web_search: st.info("ℹ️ Using web search as requested via toggle.") else: st.info("ℹ️ Using web search as fallback since no relevant documents were found.") exceptExceptionase: st.error(f"❌ Web search error:{str(e)}")
# Step 4: Generate response using the RAG agent withst.spinner("? Thinking..."): try: rag_agent = get_rag_agent()
ifcontext: full_prompt =f"""Context:{context}
Original Question:{prompt}Please provide a comprehensive answer based on the available information.""" else: full_prompt =f"Original Question:{prompt}\n" st.info("ℹ️ No relevant information found in documents or web search.")
response = rag_agent.run(full_prompt)
# Add assistant response to history st.session_state.history.append({ "role":"assistant", "content": response.content })
# Display assistant response withst.chat_message("assistant"): st.write(response.content)
# Show sources if available ifnotst.session_state.force_web_searchand'docs'inlocals()anddocs: withst.expander("? See document sources"): fori, docinenumerate(docs,1): source_type = doc.metadata.get("source_type","unknown") source_icon ="?"ifsource_type =="pdf"else"?" source_name = doc.metadata.get("file_name"ifsource_type =="pdf"else"url","unknown") st.write(f"{source_icon}Source{i}from{source_name}:") st.write(f"{doc.page_content[:200]}...")
exceptExceptionase: st.error(f"❌ Error generating response:{str(e)}")
else: # Simple mode without RAG withst.spinner("? Thinking..."): try: rag_agent = get_rag_agent() web_search_agent = get_web_search_agent()ifst.session_state.use_web_searchelseNone
# Handle web search if forced or enabled context ="" ifst.session_state.force_web_searchandweb_search_agent: withst.spinner("? Searching the web..."): try: web_results = web_search_agent.run(prompt).content ifweb_results: context =f"Web Search Results:\n{web_results}" st.info("ℹ️ Using web search as requested.") exceptExceptionase: st.error(f"❌ Web search error:{str(e)}")
# Generate response ifcontext: full_prompt =f"""Context:{context}
Question:{prompt}
Please provide a comprehensive answer based on the available information.""" else: full_prompt = prompt
response = rag_agent.run(full_prompt) response_content = response.content
# Extract thinking process and final response importre think_pattern =r'<think>(.*?)</think>' think_match = re.search(think_pattern, response_content, re.DOTALL)
ifthink_match: thinking_process = think_match.group(1).strip() final_response = re.sub(think_pattern,'', response_content, flags=re.DOTALL).strip() else: thinking_process =None final_response = response_content
# Add assistant response to history (only the final response) st.session_state.history.append({ "role":"assistant", "content": final_response })
# Display assistant response withst.chat_message("assistant"): ifthinking_process: withst.expander("? See thinking process"): st.markdown(thinking_process) st.markdown(final_response)
exceptExceptionase: st.error(f"❌ Error generating response:{str(e)}")
else: st.warning("You can directly talk to qwen and gemma models locally! Toggle the RAG mode to upload documents!")

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ