AI应用已经渗透到我们生活的方方面面。其中,AI对话系统因其自然、流畅的交流方式而备受瞩目。前段在交流群中有人问到,如何实现AI对话中那种逐字输出的效果,这背后,流式返回技术发挥了关键作用。其实这背后并不是前端做了什么特效,而是采用的流式返回,即不是一次性返回完整的响应。流式返回允许服务器在一次连接中逐步发送数据,而不是一次性返回全部结果。这种方式使得前端可以在等待完整响应的过程中,逐步展示生成的内容,从而极大地提升了用户体验。 那么,前端接收流式返回具体有哪些方式呢?接下来,本文将详细探讨几种常见的技术手段,帮助你更好地理解并应用流式返回技术。 大多数场景下,前端用的最多的就是axios来发送请求,但是axios只有在在Node.js环境中支持设置responseType: 'stream'来接收流式响应。 constaxios=require('axios');constfs=require('fs');axios.get('http://localhost:3000/stream',{responseType:'stream',//设置响应类型为流}).then((response)=>{//将响应流写入文件response.data.pipe(fs.createWriteStream('output.txt'));}).catch((error)=>{console.error('Streamerror:',error);});特点仅限 Node.js:浏览器中的 axios 不支持 responseType: 'stream' 适合文件下载:适合处理大文件下载。
WebSocket 是一种全双工通信协议,适合需要双向实时通信的场景。 前端代码: constsocket=newWebSocket('ws://localhost:3000');socket.onopen=()=>{console.log('WebSocketconnected');};socket.onmessage=(event)=>{console.log('Receiveddata:',event.data);};socket.onerror=(error)=>{console.error('WebSocketerror:',error);};socket.onclose=()=>{console.log('WebSocketclosed');};服务器代码 constWebSocket=require('ws');constwss=newWebSocket.Server({port:3000});wss.on('connection',(ws)=>{console.log('Clientconnected');letcounter=0;constintervalId=setInterval(()=>{counter++;ws.send(JSON.stringify({message:'Hello',counter}));if(counter>=5){clearInterval(intervalId);ws.close();}},1000);ws.on('close',()=>{console.log('Clientdisconnected');clearInterval(intervalId);});});虽然WebSocket作为一种在单个TCP连接上进行全双工通信的协议,具有实时双向数据传输的能力,但AI对话情况下可能并不选择它进行通信。主要有以下几点原因: 特点双向通信:适合实时双向数据传输 低延迟:基于 TCP 协议,延迟低 复杂场景:适合聊天、实时游戏等复杂场景
虽然XMLHttpRequest不能直接支持流式返回,但可以通过监听progress事件模拟逐块接收数据 constxhr=newXMLHttpRequest();xhr.open('GET','/stream',true);xhr.onprogress=(event)=>{constchunk=xhr.responseText;//获取当前接收到的数据console.log(chunk);};xhr.onload=()=>{console.log('Requestcomplete');};xhr.send();服务器代码(Koa 示例): router.get("/XMLHttpRequest",async(ctx,next)=>{ctx.set({"Content-Type":"text/event-stream","Cache-Control":"no-cache",Connection:"keep-alive",});//创建一个PassThrough流conststream=newPassThrough();ctx.body=stream;letcounter=0;constintervalId=setInterval(()=>{counter++;ctx.res.write(JSON.stringify({message:"Hello",counter}));if(counter>=5){clearInterval(intervalId);ctx.res.end();}},1000);ctx.req.on("close",()=>{clearInterval(intervalId);ctx.res.end();});});可以看到以下的输出结果,在onprogress中每次可以拿到当前已经接收到的数据。它并不支持真正的流式响应,用于AI对话场景中,每次都需要将以显示的内容全部替换,或者需要做一些额外的处理。 如果想提前终止请求,可以使用xhr.abort()方法; setTimeout(()=>{xhr.abort();},3000);特点兼容性好:支持所有浏览器 非真正流式:XMLHttpRequest 仍然需要等待整个响应完成,progress 事件只是提供了部分数据的访问能力 内存占用高:不适合处理大文件
SSE 是一种服务器向客户端推送事件的协议,基于 HTTP 长连接。它适合服务器向客户端单向推送实时数据 前端代码: consteventSource=newEventSource('/sse');eventSource.onmessage=(event)=>{console.log('Receiveddata:',event.data);};eventSource.onerror=(event)=>{console.error('EventSourcefailed:',event);};服务器代码(Koa 示例): router.get('/sse',(ctx)=>{ctx.set({'Content-Type':'text/event-stream','Cache-Control':'no-cache','Connection':'keep-alive',});letcounter=0;constintervalId=setInterval(()=>{counter++;ctx.res.write(`data {JSON.stringify({message:'Hello',counter})}\n\n`);if(counter>=5){clearInterval(intervalId);ctx.res.end();}},1000);ctx.req.on('close',()=>{clearInterval(intervalId);ctx.res.end();});})
EventSource 也具有主动关闭请求的能力,在结果没有完全返回前,用户可以提前终止内容的返回。 //在需要时中止请求setTimeout(()=>{eventSource.close();//主动关闭请求},3000);//3秒后中止请求虽然EventSource支持流式请求,但AI对话场景不使用它有以下几点原因: 注意点 返回给EventSource的值必须遵循data:开头并以\n\n结尾的格式,这是因为Server-Sent Events (SSE)协议规定了这种格式。SSE 是一种基于 HTTP 的轻量级协议,用于服务器向客户端推送事件。为了确保客户端能够正确解析服务器发送的数据,SSE 协议定义了一套严格的格式规范。SSE 协议规定,服务器发送的每条消息必须遵循以下格式:
其中field是字段名,value是对应的值。常见的字段包括: data::消息的内容(必须)。
event::事件类型(可选)。
id::消息的唯一标识符(可选)。
retry::客户端重连的时间间隔(可选)。
每条消息必须以两个换行符 (\n\n)结尾,表示消息结束 以下是一个完整的 SSE 消息示例: id:1\nevent:update\ndata:{"message":"Hello","counter":1}\n\n特点单向通信:适合服务器向客户端推送数据 简单易用:基于 HTTP 协议,无需额外协议支持 自动重连:EventSource 会自动处理连接断开和重连
fetchAPI 是现代浏览器提供的原生方法,支持流式响应。通过response.body,可以获取一个ReadableStream,然后逐块读取数据。
前端代码: //发送流式请求fetch("http://localhost:3000/stream/fetch",{method:" OST",signal,}).then(async(response:any)=>{constreader=response.body.getReader();while(true){const{done,value}=awaitreader.read();if(done)break;console.log(newTextDecoder().decode(value));}}).catch((error)=>{console.error("Fetcherror:",error);});服务器代码(Koa 示例): router.post("/fetch",async(ctx)=>{ctx.set({"Content-Type":"text/event-stream","Cache-Control":"no-cache",Connection:"keep-alive",});//创建一个PassThrough流conststream=newPassThrough();ctx.body=stream;letcounter=0;constintervalId=setInterval(()=>{counter++;ctx.res.write(JSON.stringify({message:"Hello",counter}));if(counter>=5){clearInterval(intervalId);ctx.res.end();}},1000);ctx.req.on("close",()=>{clearInterval(intervalId);ctx.res.end();});});fetch也同样可以在客户端主动关闭请求。 //创建一个AbortController实例constcontroller=newAbortController();const{signal}=controller;//发送流式请求fetch("http://localhost:3000/stream/fetch",{method:" OST",signal,}).then(async(response:any)=>{constreader=response.body.getReader();while(true){const{done,value}=awaitreader.read();if(done)break;console.log(newTextDecoder().decode(value));}}).catch((error)=>{console.error("Fetcherror:",error);});//在需要时中止请求setTimeout(()=>{controller.abort();//主动关闭请求},3000);//3秒后中止请求打开控制台,可以看到在Response中可以看到返回的全部数据,在EventStream中没有任何内容。 这是由于返回的信息SSE协议规范,具体规范见上文的Server-Sent Events模块中有介绍到 ctx.res.write(`data {JSON.stringify({message:"Hello",counter})}\n\n`); 但是客户端fetch请求中接收到的数据也包含了规范中的内容,需要前端对数据进一步的处理一下 特点原生支持:现代浏览器均支持 fetch 和 ReadableStream 逐块处理:可以实时处理每个数据块,而不需要等待整个响应完成 内存效率高:适合处理大文件或实时数据
综上所述,在 AI 对话场景中,fetch请求是主流的技术选择,而不是XMLHttpRequest或EventSource。以下是原因和详细分析: fetch是现代浏览器提供的原生 API,基于 Promise,代码更简洁、易读
fetch支持ReadableStream,可以实现流式请求和响应
fetch支持自定义请求头、请求方法(GET、POST 等)和请求体
fetch结合AbortController可以方便地中止请求
fetch的响应对象提供了response.ok和response.status,可以更方便地处理错误
| 方式 | 特点 | 适用场景 |
|---|
fetch | 原生支持,逐块处理,内存效率高 | 大文件下载、实时数据推送 | XMLHttpRequest | 兼容性好,非真正流式,内存占用高 | 旧版浏览器兼容 | | Server-Sent Events (SSE) | 单向通信,简单易用,自动重连 | 服务器向客户端推送实时数据 | | WebSocket | 双向通信,低延迟,适合复杂场景 | 聊天、实时游戏 | axios(Node.js) | 仅限 Node.js,适合文件下载 | Node.js 环境中的大文件下载 |
最后来看一个接入deekseek的完整例子:
服务器代码(Koa 示例): constopenai=newOpenAI({baseURL:"https://api.deepseek.com",apiKey:"这里是你申请的deepseek的apiKey",});//流式请求DeepSeek接口并流式返回router.post("/fetchStream",async(ctx)=>{//设置响应头ctx.set({"Content-Type":"text/event-stream","Cache-Control":"no-cache",Connection:"keep-alive",});try{//创建一个PassThrough流conststream=newPassThrough();ctx.body=stream;//调用OpenAIAPI,启用流式输出constcompletion=awaitopenai.chat.completions.create({model:"deepseek-chat",//或'gpt-3.5-turbo'messages:[{role:"user",content:"请用100字介绍OpenAI"}],stream:true,//启用流式输出});//逐块处理流式数据forawait(constchunkofcompletion){constcontent=chunk.choices[0]?.delta?.content||"";//获取当前块的内容ctx.res.write(content);process.stdout.write(content);//将内容输出到控制台}ctx.res.end();}catch(err){console.error("Requestfailed:",err);ctx.status=500;ctx.res.write({error:"Failedtostreamdata"});}});前端代码: constcontroller=newAbortController();const{signal}=controller;constChat=()=>{const[text,setText]=useState<string>("");const[message,setMessage]=useState<string>("");const[loading,setLoading]=useState<boolean>(false);functionsend(){if(!message)return;setText("");//创建一个AbortController实例setLoading(true);//发送流式请求fetch("http://localhost:3000/deepseek/fetchStream",{method:" OST",headers:{"Content-Type":"application/json",},body:JSON.stringify({message,}),signal,}).then(async(response:any)=>{constreader=response.body.getReader();while(true){const{done,value}=awaitreader.read();if(done)break;constdata=newTextDecoder().decode(value);console.log(data);setText((t)=>t+data);}}).catch((error)=>{console.error("Fetcherror:",error);}).finally(()=>{setLoading(false);});}functionstop(){controller.abort();setLoading(false);}return(<div><Inputvalue={message}onChange={(e)=>setMessage(e.target.value)}/><ButtononClick={send}type="primary"loading={loading}disabled={loading}>发送</Button><ButtononClick={stop}danger>停止回答</Button><div>{text}</div></div>);};
|