func(f*FreeAskUsecase)freeAskHandle(ctxcontext.Context,req*FreeAskReq,output*FreeAskResp){//上下文控制ctx,cancel:=context.WithTimeout(ctx,time.Minute*2)defercancel()//初始化请求管道modelCh:=make(chan*ModelResponse,10)interruptCh:=make(chanstruct{},1)varmsgIdStrstringvarmsg*Message//前置处理:创建消息、安全审核等msg,err:=f.createInitialMessage(ctx,req)iferr!=nil{f.log.Errorf("创建初始消息失败:%v",err)output.Status=StatusFailedoutput.Message="创建消息失败"return}msgIdStr=msg.Msg.Idoutput.MsgId=msgIdStr//获取历史对话记录chatHistory,err:=f.repo.GetRecentChatHistory(ctx,req.TalId,req.SubjectId,2)iferr!=nil{f.log.Warnf("获取历史对话记录失败:%v",err)//继续处理,不影响主流程}//构建消息历史messages:=make([]Message,0,len(chatHistory)*2+1)//系统提示词systemPrompt:=f.buildSystemPrompt(req.SubjectId,intent,intentDetails)messages=append(messages,Message{Role:"system",//系统角色Content:systemPrompt,})//添加历史对话for_,chat:=rangechatHistory{//用户问题messages=append(messages,Message{Role:"user",//用户角色Content:chat.Question,})//AI回答messages=append(messages,Message{Role:"assistant",//模型角色Content:chat.Answer,})}//添加当前问题messages=append(messages,Message{Role:"user",Content:req.Question,})//用户意图识别intent,intentDetails:=f.recognizeUserIntent(ctx,req.Question)f.log.Infof("用户意图识别结果:%s,详情:%+v",intent,intentDetails)//根据意图处理特殊请求iff.handleSpecialIntent(ctx,intent,intentDetails,req,output){//如果特殊意图已处理完毕,直接返回return}//安全审核safeResult,err:=f.safetyCheck(ctx,req.Question)iferr!=nil||!safeResult.IsSafe{f.log.Errorf("内容安全审核未通过:%v",err)output.Status=StatusRejectedoutput.Message="内容包含不安全信息,请修改后重试"//更新消息状态为拒绝f.repo.UpdateMessageStatus(ctx,msgIdStr,MessageStatusRejected)return}//2.创建中断监听//用户可能会打断模型输出gof.listenForInterruption(ctx,req.TalId,msgIdStr,interruptCh)//3.构建模型提示词//将用户意图信息添加到提示词中promptOptions:=&
romptOptions{Intent:intent,IntentDetails:intentDetails,}prompt,err:=f.buildPromptWithOptions(ctx,req,promptOptions)iferr!=nil{f.log.Errorf("构建提示词失败:%v",err)output.Status=StatusFailedoutput.Message="系统处理异常"return}//创建DeepSeek-R1模型请求modelRequest:=&DeepSeekModelRequest{Model:"deepseek-r1",Messages:messages,MaxTokens:2048,Temperature:0.7,Stream:true,//流式输出}//构建模型上下文modelCtx,modelCancel:=context.WithCancel(ctx)defermodelCancel()//添加中断处理gofunc(){select{case<-interruptCh://接收到中断信号,取消模型请求modelCancel()case<-ctx.Done()://上下文已结束return}}()//创建响应通道modelCh:=make(chan*DeepSeekResponse,10)//异步调用模型gofunc(){deferclose(modelCh)//调用DeepSeek-R1模型进行流式生成err:=f.deepSeekClient.GenerateStream(modelCtx,modelRequest,func(chunk*DeepSeekChunk)error{ifchunk.Error!=nil{modelCh<-&DeepSeekResponse{Error:chunk.Error,}returnchunk.Error}//处理模型流式响应modelCh<-&DeepSeekResponse{Content:chunk.Content,IsFinal:chunk.IsFinal,ToolCalls:chunk.ToolCalls,GeneratedText:chunk.GeneratedText,Usage:chunk.Usage,}returnnil})iferr!=nil&&!errors.Is(err,context.Canceled){f.log.Errorf("DeepSeek-R1模型调用失败:%v",err)modelCh<-&DeepSeekResponse{Error:err,}}}()//5.处理模型响应varfullContentstrings.BuilderisFirstChunk:=truefor{select{case<-ctx.Done()://处理超时f.log.Warnf("请求处理超时:%s",msgIdStr)output.Status=StatusTimeoutoutput.Message="处理超时,请稍后重试"//通过SSE发送超时事件sseWriter.WriteEvent(&SSEEvent{Event:"timeout",Data:map[string]interface{}{"msg_id":msgIdStr,"message":"处理超时,请稍后重试",},})//更新消息状态f.repo.UpdateMessageStatus(ctx,msgIdStr,MessageStatusFailed)returncaseresp,ok:=<-modelCh:if!ok{//处理响应结束gotoEND}//处理模型返回的错误ifresp.Error!=nil{f.log.Errorf("模型返回错误:%v",resp.Error)output.Status=StatusFailedoutput.Message="AI生成回答失败"//通过SSE发送错误事件sseWriter.WriteEvent(&SSEEvent{Event:"error",Data:map[string]interface{}{"msg_id":msgIdStr,"message":"AI生成回答失败",},})f.repo.UpdateMessageStatus(ctx,msgIdStr,MessageStatusFailed)return}//处理模型返回的数据包//追加内容、安全检查、发送给客户端等content:=resp.Content//安全检查每个片段iflen(content)>0{safeResult,_:=f.safetyCheck(ctx,content)if!safeResult.IsSafe{f.log.Warnf("模型回复内容存在安全风险:%s",content)content="对不起,我无法提供这方面的回答。"}}//追加到完整内容fullContent.WriteString(content)//如果是第一个数据包,更新消息状态为进行中ifisFirstChunk{isFirstChunk=falsef.repo.UpdateMessageStatus(ctx,msgIdStr,MessageStatusInProgress)//返回初始响应给客户端output.Status=StatusSuccessoutput.AnswerBegin=content//通过SSE发送开始事件sseWriter.WriteEvent(&SSEEvent{Event:"answer_begin",Data:map[string]interface{}{"msg_id":msgIdStr,"content":content,},})}else{//非首个数据包,通过SSE发送内容片段sseWriter.WriteEvent(&SSEEvent{Event:"answer_chunk",Data:map[string]interface{}{"msg_id":msgIdStr,"content":content,},})}//如果响应中包含特殊标记,处理特殊逻辑ifresp.HasSpecialFunction{f.handleSpecialFunction(ctx,resp.SpecialFunction,msgIdStr,req.TalId,sseWriter)}case_,ok:=<-interruptCh:if!ok{continue}//处理用户中断f.log.Infof("用户中断请求:%s",msgIdStr)msg.Msg.IsInterrupt=1//通过SSE发送中断事件sseWriter.WriteEvent(&SSEEvent{Event:"interrupted",Data:map[string]interface{}{"msg_id":msgIdStr,},})f.handelInterrupt(ctx,msgIdStr,msg.SubjectId)gotoEND}}END://处理结束逻辑//生成提示词、更新消息等finalContent:=fullContent.String()//更新最终消息内容和状态err=f.repo.UpdateMessageContent(ctx,msgIdStr,finalContent)iferr!=nil{f.log.Errorf("更新消息内容失败:%v",err)}ifmsg.Msg.IsInterrupt==0{//正常结束f.repo.UpdateMessageStatus(ctx,msgIdStr,MessageStatusCompleted)//发送完成事件sseWriter.WriteEvent(&SSEEvent{Event:"answer_complete",Data:map[string]interface{}{"msg_id":msgIdStr,"content":finalContent,},})//记录会话历史f.updateSessionHistory(ctx,req.TalId,req.Question,finalContent,msgIdStr)}else{//中断结束f.repo.UpdateMessageStatus(ctx,msgIdStr,MessageStatusInterrupted)}//设置输出信息output.FullAnswer=finalContentoutput.Status=StatusSuccessoutput.Suggestions=suggestionsoutput.Knowledge=knowledge}