返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

4o-realtime API调用分析

[复制链接]
链载Ai 显示全部楼层 发表于 昨天 11:45 |阅读模式 打印 上一主题 下一主题


4o-realtime相关的类主要有以下三个:

RealtimeAPI Class、RealtimeConversation Class、RealtimeClient Class,调用逻辑示意图如下:

调用流程示例:

示例:用户查询订单状态

  1. 客户端发送音频数据

  • 用户通过客户端应用发送语音查询:“我的订单状态是什么?”

  • 客户端将音频数据传递给RealtimeClientappend_input_audio方法。

awaitrealtime_client.append_input_audio(audio_chunk)


2.RealtimeClient处理音频数据

  • RealtimeClient调用RealtimeAPIsend方法,将音频数据通过WebSocket协议发送到服务器。

awaitself.realtime.send("input_audio_buffer.append",{"audio":array_buffer_to_base64(np.array(audio_chunk))})


3.服务器处理请求并返回响应

  • RealtimeAPI通过WebSocket协议接收来自服务器的消息。

  • 服务器处理音频数据,识别用户的查询,并生成响应。

  1. RealtimeAPI分发事件

  • RealtimeAPI接收到服务器的响应消息后,分发事件到RealtimeClient

async for message in self.ws:
event = json.loads(message)
self.dispatch(f"server.{event['type']}", event)
self.dispatch("server.*", event)


5.RealtimeClient处理服务器响应

  • RealtimeClient调用RealtimeConversation的方法来处理事件并更新对话状态。

item, delta = self.conversation.process_event(event)
if item:
self.dispatch("conversation.updated", {"item": item, "delta": delta})


6.RealtimeConversation管理对话状态

  • RealtimeConversation处理不同类型的事件(如消息创建、转录完成、音频流等),并更新对话状态。

def process_event(self, event, *args):
event_processor = self.EventProcessors.get(event['type'])
if not event_processor:
raise Exception(f"Missing conversation event processor for {event['type']}")
return event_processor(self, event, *args)


7.客户端接收并播放响应

  • RealtimeClient将处理后的响应发送回客户端。

  • 客户端接收响应并播放音频或显示文本。

awaitcl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16",data=audio,track=cl.user_session.get("track_id")))

使用协议说明:

  • WebSocket协议

    • WebSocket是一种在单个TCP连接上进行全双工通信的协议。

    • 在本项目中,RealtimeAPI类使用WebSocket协议与OpenAI的实时API进行通信。

    • WebSocket连接的建立和维护由RealtimeAPI类负责,确保低延迟和高效的数据传输。

    • 通过WebSocket协议,RealtimeAPI类能够实时发送和接收音频数据和文本消息,确保对话的流畅性和自然性。


示例:用户通过文本查询订单状态

  1. 客户端发送文本数据

  • 用户通过客户端应用输入文本查询:“我的订单状态是什么?”

  • 客户端将文本数据传递给RealtimeClientsend_user_message_content方法。

awaitrealtime_client.send_user_message_content([{"type":"text","text":"我的订单状态是什么?"}])


2.RealtimeClient处理文本数据

  • RealtimeClient调用RealtimeAPIsend方法,将文本数据通过WebSocket协议发送到服务器。

await self.realtime.send("conversation.item.create", {
"item": {
"type": "message",
"role": "user",
"content": [{"type": "text", "text": "我的订单状态是什么?"}]
}
})


3.服务器处理请求并返回响应

  • RealtimeAPI通过WebSocket协议接收来自服务器的消息。

  • 服务器处理文本数据,识别用户的查询,并生成响应。

  1. RealtimeAPI分发事件

  • RealtimeAPI接收到服务器的响应消息后,分发事件到RealtimeClient

async for message in self.ws:
event = json.loads(message)
self.dispatch(f"server.{event['type']}", event)
self.dispatch("server.*", event)


5.RealtimeClient处理服务器响应

  • RealtimeClient调用RealtimeConversation的方法来处理事件并更新对话状态。

item, delta = self.conversation.process_event(event)
if item:
self.dispatch("conversation.updated", {"item": item, "delta": delta})


6.RealtimeConversation管理对话状态

  • RealtimeConversation处理不同类型的事件(如消息创建、转录完成、音频流等),并更新对话状态。

def process_event(self, event, *args):
event_processor = self.EventProcessors.get(event['type'])
if not event_processor:
raise Exception(f"Missing conversation event processor for {event['type']}")
return event_processor(self, event, *args)


7.客户端接收并显示响应

  • RealtimeClient将处理后的响应发送回客户端。

  • 客户端接收响应并显示文本。

awaitcl.context.emitter.send_text_message(cl.OutputTextMessage(content=response_text))

使用协议说明:

  • WebSocket协议

    • WebSocket是一种在单个TCP连接上进行全双工通信的协议。

    • 在本项目中,RealtimeAPI类使用WebSocket协议与OpenAI的实时API进行通信。

    • WebSocket连接的建立和维护由RealtimeAPI类负责,确保低延迟和高效的数据传输。

    • 通过WebSocket协议,RealtimeAPI类能够实时发送和接收文本消息,确保对话的流畅性和自然性。


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ