importjson
importhttpx
importre
fromtypingimportAsyncGenerator, Callable, Awaitable
frompydanticimportBaseModel, Field
importasyncio
importtraceback
classPipe:
classValves(BaseModel):
DEEPSEEK_API_BASE_URL: str = Field(
default="https://api.deepseek.com/v1",
description="DeepSeek API的基础请求地址",
)
DEEPSEEK_API_KEY: str = Field(
default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
)
DEEPSEEK_API_MODEL: str = Field(
default="deepseek-reasoner",
description="API请求的模型名称,默认为 deepseek-reasoner,多模型名可使用`,`分隔",
)
def__init__(self):
self.valves = self.Valves()
self.data_prefix ="data:"
self.emitter =None
defpipes(self):
models = self.valves.DEEPSEEK_API_MODEL.split(",")
return[
{
"id": model.strip(),
"name": model.strip(),
}
formodelinmodels
]
asyncdefpipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
)-> AsyncGenerator[str,None]:
"""主处理管道(已移除缓冲)"""
thinking_state = {"thinking":-1} # 用于存储thinking状态
self.emitter = __event_emitter__
# 用于存储联网模式下返回的参考资料列表
stored_references = []
# 联网搜索供应商 0-无 1-火山引擎 2-PPLX引擎 3-硅基流动
search_providers =0
waiting_for_reference =False
# 用于处理硅基的 [citation:1] 的栈
citation_stack_reference = [
"[",
"c",
"i",
"t",
"a",
"t",
"i",
"o",
"n",
":",
"",
"]",
]
citation_stack = []
# 临时保存的未处理的字符串
unprocessed_content =""
# 验证配置
ifnotself.valves.DEEPSEEK_API_KEY:
yieldjson.dumps({"error":"未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization":f"Bearer{self.valves.DEEPSEEK_API_KEY}",
"Content-Type":"application/json",
}
try:
# 模型ID提取
model_id = body["model"].split(".",1)[-1]
payload = {**body,"model": model_id}
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i =0
whilei < len(messages) -1:
ifmessages[i]["role"] == messages[i +1]["role"]:
# 插入具有替代角色的占位符消息
alternate_role = (
"assistant"ifmessages[i]["role"] =="user"else"user"
)
messages.insert(
i +1,
{"role": alternate_role,"content":"[Unfinished thinking]"},
)
i +=1
# 发起API请求
asyncwithhttpx.AsyncClient(http2=True)asclient:
asyncwithclient.stream(
"
OST",
f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=300,
)asresponse:
# 错误处理
ifresponse.status_code !=200:
error =awaitresponse.aread()
yieldself._format_error(response.status_code, error)
return
# 流式处理响应
asyncforlineinresponse.aiter_lines():
ifnotline.startswith(self.data_prefix):
continue
# 截取 JSON 字符串
json_str = line[len(self.data_prefix) :].strip()
# 去除首尾空格后检查是否为结束标记
ifjson_str =="[DONE]":
return
try:
data = json.loads(json_str)
exceptjson.JSONDecodeErrorase:
error_detail =f"解析失败 - 内容:{json_str},原因:{e}"
yieldself._format_error("JSONDecodeError", error_detail)
return
ifsearch_providers ==0:
# 检查 delta 中的搜索结果
choices = data.get("choices")
ifnotchoicesorlen(choices) ==0:
continue# 跳过没有 choices 的数据块
delta = choices[0].get("delta", {})
ifdelta.get("type") =="search_result":
search_results = delta.get("search_results", [])
ifsearch_results:
ref_count = len(search_results)
yield'<details type="search">\n'
yieldf"<summary>已搜索{ref_count}个网站</summary>\n"
foridx, resultinenumerate(search_results,1):
yieldf'>{idx}. [{result["title"]}]({result["url"]})\n'
yield"</details>\n"
search_providers =3
stored_references = search_results
continue
# 处理参考资料
stored_references = data.get("references", []) + data.get(
"citations", []
)
ifstored_references:
ref_count = len(stored_references)
yield'<details type="search">\n'
yieldf"<summary>已搜索{ref_count}个网站</summary>\n"
# 如果data中有references,则说明是火山引擎的返回结果
ifdata.get("references"):
foridx, referenceinenumerate(stored_references,1):
yieldf'>{idx}. [{reference["title"]}]({reference["url"]})\n'
yield"</details>\n"
search_providers =1
# 如果data中有citations,则说明是PPLX引擎的返回结果
elifdata.get("citations"):
foridx, referenceinenumerate(stored_references,1):
yieldf">{idx}.{reference}\n"
yield"</details>\n"
search_providers =2
# 方案 A: 检查 choices 是否存在且非空
choices = data.get("choices")
ifnotchoicesorlen(choices) ==0:
continue# 跳过没有 choices 的数据块
choice = choices[0]
# 结束条件判断
ifchoice.get("finish_reason"):
return
# 状态机处理
state_output =awaitself._update_thinking_state(
choice.get("delta", {}), thinking_state
)
ifstate_output:
yieldstate_output
ifstate_output =="<think>":
yield"\n"
# 处理并立即发送内容
content = self._process_content(choice["delta"])
ifcontent:
# 处理思考状态标记
ifcontent.startswith("<think>"):
content = re.sub(r"^<think>","", content)
yield"<think>"
awaitasyncio.sleep(0.1)
yield"\n"
elifcontent.startswith("</think>"):
content = re.sub(r"^</think>","", content)
yield"</think>"
awaitasyncio.sleep(0.1)
yield"\n"
# 处理参考资料
ifsearch_providers ==1:
# 火山引擎的参考资料处理
# 如果文本中包含"摘要",设置等待标志
if"摘要"incontent:
waiting_for_reference =True
yieldcontent
continue
# 如果正在等待参考资料的数字
ifwaiting_for_reference:
# 如果内容仅包含数字或"、"
ifre.match(r"^(\d+|、)$", content.strip()):
numbers = re.findall(r"\d+", content)
ifnumbers:
num = numbers[0]
ref_index = int(num) -1
if0<= ref_index < len(stored_references):
ref_url = stored_references[ref_index][
"url"
]
else:
ref_url =""
content =f"[[{num}]]({ref_url})"
# 保持等待状态继续处理后续数字
# 如果遇到非数字且非"、"的内容且不含"摘要",停止等待
elifnot"摘要"incontent:
waiting_for_reference =False
elifsearch_providers ==2:
# PPLX引擎的参考资料处理
defreplace_ref(m):
idx = int(m.group(1)) -1
if0<= idx < len(stored_references):
returnf"[[{m.group(1)}]]({stored_references[idx]})"
returnf"[[{m.group(1)}]]()"
content = re.sub(r"\[(\d+)\]", replace_ref, content)
elifsearch_providers ==3:
skip_outer =False
iflen(unprocessed_content) >0:
content = unprocessed_content + content
unprocessed_content =""
foriinrange(len(content)):
# 检查 content[i] 是否可访问
ifi >= len(content):
break
# 检查 citation_stack_reference[len(citation_stack)] 是否可访问
iflen(citation_stack) >= len(
citation_stack_reference
):
break
if(
content[i]
== citation_stack_reference[len(citation_stack)]
):
citation_stack.append(content[i])
# 如果 citation_stack 的位数等于 citation_stack_reference 的位数,则修改为 URL 格式返回
iflen(citation_stack) == len(
citation_stack_reference
):
# 检查 citation_stack[10] 是否可访问
iflen(citation_stack) >10:
ref_index = int(citation_stack[10]) -1
# 检查 stored_references[ref_index] 是否可访问
if(
0
<= ref_index
< len(stored_references)
):
ref_url = stored_references[
ref_index
]["url"]
else:
ref_url =""
# 将content中剩余的部分保存到unprocessed_content中
unprocessed_content ="".join(
content[i +1:]
)
content =f"[[{citation_stack[10]}]]({ref_url})"
citation_stack = []
skip_outer =False
break
else:
skip_outer =True
elif(
citation_stack_reference[len(citation_stack)]
==""
):
# 判断是否为数字
ifcontent[i].isdigit():
citation_stack.append(content[i])
skip_outer =True
else:
# 将 citation_stack 中全部元素拼接成字符串
content ="".join(citation_stack) + content
citation_stack = []
elif(
citation_stack_reference[len(citation_stack)]
=="]"
):
# 判断前一位是否为数字
ifcitation_stack[-1].isdigit():
citation_stack[-1] += content[i]
skip_outer =True
else:
content ="".join(citation_stack) + content
citation_stack = []
else:
iflen(citation_stack) >0:
# 将 citation_stack 中全部元素拼接成字符串
content ="".join(citation_stack) + content
citation_stack = []
ifskip_outer:
continue
yieldcontent
exceptExceptionase:
yieldself._format_exception(e)
asyncdef_update_thinking_state(self, delta: dict, thinking_state: dict)-> str:
"""更新思考状态机(简化版)"""
state_output =""
ifthinking_state["thinking"] ==-1anddelta.get("reasoning_content"):
thinking_state["thinking"] =0
state_output ="<think>"
elif(
thinking_state["thinking"] ==0
andnotdelta.get("reasoning_content")
anddelta.get("content")
):
thinking_state["thinking"] =1
state_output ="\n</think>\n\n"
returnstate_output
def_process_content(self, delta: dict)-> str:
"""直接返回处理后的内容"""
returndelta.get("reasoning_content","")ordelta.get("content","")
def_emit_status(self, description: str, done: bool = False)-> Awaitable[None]:
"""发送状态更新"""
ifself.emitter:
returnself.emitter(
{
"type":"status",
"data": {
"description": description,
"done": done,
},
}
)
returnNone
def_format_error(self, status_code: int, error: bytes)-> str:
ifisinstance(error, str):
error_str = error
else:
error_str = error.decode(errors="ignore")
try:
err_msg = json.loads(error_str).get("message", error_str)[:200]
exceptException:
err_msg = error_str[:200]
returnjson.dumps(
{"error":f"HTTP{status_code}:{err_msg}"}, ensure_ascii=False
)
def_format_exception(self, e: Exception)-> str:
tb_lines = traceback.format_exception(type(e), e, e.__traceback__)
detailed_error ="".join(tb_lines)
returnjson.dumps({"error": detailed_error}, ensure_ascii=False)