首先,让我们看看应用程序的整体架构。
我们的架构与之前一样,涉及双向 WebSocket 连接:一个在客户端和服务器之间,另一个在服务器和 Gemini API 之间。服务器充当中介,转发消息并管理实时流。更具体地说,服务器的代码与我们开发的基本多模态聊天机器人的前一个视频几乎相同。因此,如果您已经阅读过它,可以跳过这个快速回顾,直接进入客户端开发部分。
服务器使用 Python 实现,负责两个主要任务:处理客户端的 WebSocket 连接和管理 Gemini API 连接。
您需要安装并导入WebSockets和google-genai库。为模型gemini-2.0-flash_exp设置 API 密钥,并使用 API 版本v1alpha创建一个 Gemini 客户端。
#### pip install --upgrade google-genai==0.3.0##
importasyncio
importjson
importos
importwebsockets
fromgoogleimportgenai
importbase64
### 从环境中加载 API 密钥
os.environ['GOOGLE_API_KEY'] =''
MODEL ="gemini-2.0-flash-exp"# 使用您的模型 ID
client = genai.Client(
http_options={
'api_version':'v1alpha',
}
)在代码底部,我们定义了一个websockets.serve函数,以在指定端口上建立服务器。每个来自客户端的 WebSocket 连接都会触发处理程序gemini_session_handler。
asyncdefmain() ->None:
asyncwithwebsockets.serve(gemini_session_handler,"localhost",9083):
print("正在运行 websocket 服务器 localhost:9083...")
awaitasyncio.Future() # 保持服务器无限运行
if__name__ =="__main__":
asyncio.run(main())在gemini_session_handler中,我们使用client.aio.live.connect()函数与 Gemini API 建立连接,配置数据包括来自客户端第一条消息的response_modalities和我们设置的system_instruction,以指示模型充当屏幕共享助手。
之后,处理程序将专注于消息转发操作:
send_to_gemini函数捕获来自客户端的消息,提取音频和图像数据,并将其发送到 Gemini API。receive_from_gemini函数监听来自 Gemini API 的响应,并解包文本或音频数据以发送给客户端。为了实现真正的实时交互和中断启用,所有这些任务都在两个并行线程中异步处理。以下是代码:
asyncdefgemini_session_handler(client_websocket: websockets.WebSocketServerProtocol):
"""在 websocket 会话中处理与 Gemini API 的交互。
参数:
client_websocket: 与客户端的 websocket 连接。
"""
try:
config_message =awaitclient_websocket.recv()
config_data = json.loads(config_message)
config = config_data.get("setup", {})
config["system_instruction"] ="""您是屏幕共享会话的有用助手。您的角色是:
1) 分析并描述共享屏幕上的内容
2) 回答有关共享内容的问题
3) 提供与所显示内容相关的信息和背景
4) 协助处理与屏幕共享相关的技术问题
5) 保持专业和乐于助人的语气。专注于简洁明了地回答。"""
asyncwithclient.aio.live.connect(model=MODEL, config=config)assession:
print("已连接到 Gemini API")
asyncdefsend_to_gemini():
"""将来自客户端 websocket 的消息发送到 Gemini API。"""
try:
asyncformessageinclient_websocket:
try:
data = json.loads(message)
if"realtime_input"indata:
forchunkindata["realtime_input"]["media_chunks"]:
ifchunk["mime_type"] =="audio/pcm":
awaitsession.send({"mime_type":"audio/pcm","data": chunk["data"]})
elifchunk["mime_type"] =="image/jpeg":
awaitsession.send({"mime_type":"image/jpeg","data": chunk["data"]})
exceptExceptionase:
print(f"发送到 Gemini 时出错:{e}")
print("客户端连接关闭(发送)")
exceptExceptionase:
print(f"发送到 Gemini 时出错:{e}")
finally:
print("send_to_gemini 关闭")
asyncdefreceive_from_gemini():
"""接收来自 Gemini API 的响应并将其转发给客户端,循环直到回合完成。"""
try:
whileTrue:
try:
print("从 Gemini 接收")
asyncforresponseinsession.receive():
ifresponse.server_contentisNone:
print(f'未处理的服务器消息! -{response}')
continue
model_turn = response.server_content.model_turn
ifmodel_turn:
forpartinmodel_turn.parts:
ifhasattr(part,'text')andpart.textisnotNone:
awaitclient_websocket.send(json.dumps({"text": part.text}))
elifhasattr(part,'inline_data')andpart.inline_dataisnotNone:
print("音频 mime_type:", part.inline_data.mime_type)
base64_audio = base64.b64encode(part.inline_data.data).decode('utf-8')
awaitclient_websocket.send(json.dumps({
"audio": base64_audio,
}))
print("音频已接收")
ifresponse.server_content.turn_complete:
print('\n<回合完成>')
exceptwebsockets.exceptions.ConnectionClosedOK:
print("客户端连接正常关闭(接收)")
break# 如果连接关闭,则退出循环
exceptExceptionase:
print(f"接收来自 Gemini 时出错:{e}")
break
exceptExceptionase:
print(f"接收来自 Gemini 时出错:{e}")
finally:
print("Gemini 连接关闭(接收)")
# 启动发送循环
send_task = asyncio.create_task(send_to_gemini())
# 将接收循环作为后台任务启动
receive_task = asyncio.create_task(receive_from_gemini())
awaitasyncio.gather(send_task, receive_task)
exceptExceptionase:
print(f"Gemini 会话中出错:{e}")
finally:
print("Gemini 会话关闭。")对于HTML和Javascript中的客户端开发,我们将重点关注上一个教程中原始前端代码的主要更改,您可以在我的GitHub 仓库中找到原始代码和修改后的代码。
我们将首先探索此版本中的主要新功能,该功能通过实现startScreenShare函数,替代了之前使用网络摄像头的实现,改为屏幕共享功能。
startScreenShare
asyncfunction startScreenShare() {
try{
stream =awaitnavigator.mediaDevices.getDisplayMedia({
video: {
width: {max:640},
height: {max:480},
},
});
video.srcObject = stream;
awaitnew Promise(resolve => {
video.onloadedmetadata = () => {
console.log("video loaded metadata");
resolve();
}
});
} catch (err) {
console.error("Error accessing the screen: ", err);
}
}这个异步函数利用navigator.mediaDevices.getDisplayMedia()方法获取屏幕捕获流。然后,将 HTML 视频元素的源设置为此流,并等待视频的元数据加载,确保后续操作可以安全地访问视频尺寸。
接下来,让我们检查captureImage()函数,该函数负责定期捕获视频帧并将其转换为 base64 编码的数据,以便传输到服务器。
captureImage
function captureImage() {
if(stream && video.videoWidth >0&& video.videoHeight >0&& context) {
canvas.width =640;
canvas.height =480;
context.drawImage(video,0,0, canvas.width, canvas.height);
const imageData = canvas.toDataURL("image/jpeg").split(",")[1].trim();
currentFrameB64 = imageData;
}
else{
console.log("no stream or video metadata not loaded");
}
}此函数已修改为包含流检查,并在调用drawImage()方法之前验证视频元数据是否已加载。宽度和高度现在固定为 640x480。然后,我们将视频转换为 jpeg 的 base64 表示形式,以便发送到服务器。
定义了这两个函数后,以下是我们如何初始化屏幕共享功能和与 WebSocket 服务器的连接。
window.addEventListener("load",async() => {
awaitstartScreenShare();
setInterval(captureImage,3000);
connect();
});此事件监听器调用startScreenShare来设置用户显示器的初始视频流,设置一个每三秒调用一次captureImage的间隔,当然,您可以更改为更小的值,以根据屏幕操作频率获得更频繁的更新,并调用 WebSocket 连接功能,该功能基本保持不变。
initializeAudioContext
asyncfunction initializeAudioContext() {
if(initialized)return;
audioInputContext = new (window.AudioContext ||
window.webkitAudioContext)({
sampleRate:24000
});
awaitaudioInputContext.audioWorklet.addModule("pcm-processor.js");
workletNode = new AudioWorkletNode(audioInputContext,"pcm-processor");
workletNode.connect(audioInputContext.destination);
initialized = true;
}对于音频部分,我们还展示了音频工作单元初始化函数,该函数与之前版本保持不变,使用相同的sampleRate和pcm-processor.js文件中的 PCM 处理函数。
sendVoiceMessage
function sendVoiceMessage(b64PCM) {
if(webSocket == null) {
console.log("websocket not initialized");
return;
}
payload = {
realtime_input: {
media_chunks: [{
mime_type:"audio/pcm",
data: b64PCM,
},
{
mime_type:"image/jpeg",
data: currentFrameB64,
},
],
},
};
webSocket.send(JSON.stringify(payload));
console.log("sent: ", payload);
}sendVoiceMessage()函数在将音频和图像base64数据发送到服务器之前进行打包。
receiveMessage
function receiveMessage(event) {
const messageData = JSON.parse(event.data);
const response = new Response(messageData);
if(response.text) {
displayMessage("GEMINI: "+ response.text);
}
if(response.audioData) {
injestAudioChuckToPlay(response.audioData);
}
}客户端从服务器接收 JSON 格式的消息,解析后根据内容显示文本或播放音频。
sendInitialSetupMessage
function sendInitialSetupMessage() {
console.log("sending setup message");
setup_client_message = {
setup: {
generation_config: { response_modalities: ["AUDIO"] },
},
};
webSocket.send(JSON.stringify(setup_client_message));
}请不要忘记在客户端的第一次配置消息中选择您首选的响应方式。在这里,我选择了AUDIO,当然您也可以选择TEXT,以便在网页上查看文本输出。但请特别注意,即使它允许列表参数。目前,仅支持单个方式,如果您同时放入文本和音频,模型将只会返回错误。
现在,我们已经成功从网络摄像头实现转向屏幕共享实现,这使我们能够捕获可以发送给 Gemini 的屏幕流,以获得适当的响应。
这种屏幕共享助手的最佳使用方式是帮助您完成日常工作,例如记笔记、浏览网页,甚至玩游戏。对我来说,最有趣的应用是帮助我进行日常论文研究工作。
让我们开始吧!
通过运行 Python 文件启动服务器。WebSocket 服务器将在我们在代码中定义的 8093 端口上运行。
通过运行以下命令启动客户端:
python-mhttp.server
现在我们可以在 8000 端口访问本地服务器。
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |