最近,MCP SDK 新版本更新发布(最新为v1.9.0),其中最大的更新莫过于终于提供了新版协议中的传输模式 — streamable HTTP。不过由于MCP SDK的文档一直以来”语焉不详“的风格,很多开发者知其然却不知其所以然,很容易在应用中踩坑。本文将对这种模式进行全面剖析与实测,帮助大家深入认识这种新的模式。从版本SDK 1.8.0开始,MCP支持三种传输模式:stdio、sse与streamable-http。这三种模式的服务端与客户端必须匹配使用,即streamable HTTP的客户端无法与sse模式服务端交互,所以在一些客户端工具中配置MCP时,不要盲目采用新模式。服务端开启streamable HTTP模式仍然建议借助FastMCP高层API。方法如下:# 创建FastMCP实例 app = FastMCP( name="SimpleServer", port=5050, stateless_http=False, json_response=False, streamable_http_path="/mcp" )
....工具....
if__name__ =='__main__': app.run(transport='streamable-http') 主要变化来自三个参数:其中transport参数增加了streamable-http选项;而stateless_http和json_response则控制了不同的工作模式(默认都为False)。对应的客户端代码修改如下,注意这里的server_url端点默认为/mcp,比如:http://localhost:5050/mcp try: asyncwithstreamablehttp_client(url=server_url)as(read, write, get_session_id): asyncwithClientSession(read, write)assession: print(f"连接成功!") # 初始化会话 awaitsession.initialize() print("会话初始化完成")
# 获取会话ID session_id = get_session_id() print(f"会话ID:{session_id}") ......
- 需要使用streamablehttp_client这一客户端传输模块
- 新增了可回调的get_session_id方法,用来获取session-id
经过FastMCP的精心“包装”,streamable HTTP的使用看上去挺简洁,但背后的实现要比SSE更复杂(这也让很多人质疑为什么不直接使用WebSocket)。首先看到stateless_http与json_response这两个服务端的核心控制参数。还记得SSE模式中的“伪双工”通信吗:Post通道用作客户端到服务端的JSON-RPC消息传输,SSE通道则用作服务端到客户端的JSON-RPC消息传输:在streamable HTTP模式下的规则变为:- Post通道:用作客户端到服务端的所有消息传输;同时服务端对客户端的响应消息也通过Post通道直接返回(基本类似Restful HTTP Post交互)。响应形式可能是SSE流,也可能是JSON数据。
- SSE通道:变为可选的通道,仅用作服务端发往客户端的通知(Notification,比如进度更新)、请求(Request,比如Sampling请求)。很显然这里的形式只能是SSE流。(实际还有一种“会话恢复”功能会用到SSE通道)
那么什么时候会有SSE通道;Post的响应形式怎么确定?这就是上面两个控制参数的作用。这两个参数的不同组合,产生的效果用下图简洁的来表示:- 控制是否会建立长连接的SSE通道。为False时则会开启SSE通道。
- 控制是否会管理客户端的会话。为False时会对客户端会话进行管理。
此外,当stateless_http与json_response同时为False时,可具备一项额外的能力:会话恢复。即:服务端返回的事件流具备“断点”续传的功能。该功能需要自行实现服务端事件的持久化接口,且完善性有待验证,本文暂不对此深入。session-id是streamable HTTP模式下服务端用来跟踪与管理客户端会话使用。客户端可以使用连接时获得的回调方法get_session_id来获得。- 什么时候有:只有当stateless_http=False时才会有session-id
- 什么时候生成:在发起初始化请求时服务端生成并在headers中返回
- 客户端如何使用:后续所有的客户端发起的请求会自动携带该session-id;你也可开发其他用途,比如用来关联一组与MCP服务的多次交互
- 什么时候失效:客户端退出streamablehttp_client的上下文区域时会自动触发一个HTTP Delete请求,服务端会删除会话,session-id失效
- 在服务端的作用:后续的请求无需每次建立新的连接与会话,而是从一个“池”里查询出已经创建的会话模块用来处理请求
我们用一个例子来体验streamable HTTP的效果以加强认识。首先我们在服务端中创建一个简单的工具,并使用streamable-http模式启动:@app.tool(name='hello') asyncdefhello(ctx: Context, name: str)-> str:
steps =10 awaitctx.report_progress(0.0, steps,'MCP Server正在处理请求...')
# 模拟计算过程的多个步骤 forstepinrange(1, steps +1): awaitasyncio.sleep(1) logger.info(f"正在处理第{step}步,发送进度通知...") awaitctx.report_progress(float(step), float(steps),f'正在处理第{step}步...')
awaitctx.report_progress(steps, steps,'MCP Server请求处理完成!')
returnf'Hello,{name}' 这个工具模拟一个长时间处理的任务,并在任务过程中定期报告进度,用来模拟服务端发送通知消息(Notification)的过程。【服务端:stateless_http=Fase,json_response=False】在这种服务端模式下,我们启动客户端,调用“hello”工具,过程如下图。可以看到,能够获取到服务端生成的session-id,而且可以接收到服务端发送的进度通知(这里用进度条做美化)。这表示该模式下成功建立了SSE通道。注意:服务端的通知消息一定是通过SSE通道以流的形式发送。接着调用另外一个工具,过程如下图。可以看到session-id没有变化:现在我们选择主菜单中的“开启新的会话”功能。该功能会退出streamablehttp_client管理的上下文,并重新进入:此时再次测试上面的工具,就会看到session-id已经发生变化:同时观察服务端的日志输出,会发现如下图所示的信息。图中信息的含义是:服务端先接收到了一个DELETE请求,终止了一个会话;然后接收到新的POST请求(初始化),开始创建了一个新的session-id与传输模块(transport):【服务端:stateless_http=True,json_response=False】现在我们开启服务端的无状态模式,其他不变。我们测试相同的“hello”工具,此时发现,尽管仍能够获得最终结果,但无法接收到进度通知:观察服务端后台的日志,可以看到如下图的提示信息。该信息表明,没有找到对应的发送流(stream,服务端的一种内部通信机制。这里的"_GET_stream"是独立的SSE通道使用的流名称)。这表明无状态模式下不会建立SSE通道。再次强调:服务端发起的通知与请求消息都只会通过独立的SSE通道进行,而客户端Post请求的响应也不会“借用”SSE通道(哪怕是流形式的响应)。用下图来整理streamable HTTP模式下客户端与服务端的交互过程。其中:- 红色代表SSE通道的交互;其他都是HTTP Post/Get/Delete。
- 虚线代表只有在有状态模式下才会有出现的交互或信息。
1. 连接:在streamable HTTP模式下,并没有和SSE模式类似的“连接”过程(在sse_client调用时),因为无需事先创建SSE连接;2. 客户端发起初始化请求(Initialize)。如果是有状态模式,会在返回消息的HTTP头中携带session-id;3. 客户端发起初始化确认(Initialized)。此时如果已有session-id(有状态),客户端会首先发起一次HTTP Get请求,以建立独立的SSE通道;4. 后续正常交互:普通的交互都是通过Post通道来进行,只有两种情况会使用SSE通道:服务端发起的通知与请求、以及会话恢复的事件发送。Low-level API开发服务端要比SSE模式下更为简洁。这是由于服务端现在引入了SessionManager模块来管理客户端会话。因此只需要:创建SessionManager模块,并把所有/mcp端点的请求都路由到该模块的handle_request方法即可。此处注意SessionManager模块需要首先通过run方法初始化。... mcp_server = Server(name="example")
...call_tool等实现...
try:
# 创建会话管理器 session_manager = StreamableHTTPSessionManager( app=mcp_server, json_response=True, stateless=False ) starlette_app = Starlette( debug=True, routes=[ Mount("/mcp", app=session_manager.handle_request), ], lifespan=lambdaapp:session_manager.run(), )
config = uvicorn.Config(starlette_app, host="127.0.0.1", port=5050) server = uvicorn.Server(config) awaitserver.serve() logger.info("MCP server is running on http://127.0.0.1:5050") ...... 现在MCP服务端可以支持多应用实例模式:你可以创建多个FastMCP的应用实例,不同实例采用不同的参数,这有助灵活的根据不同的场景需求来设计MCP服务端,比如可以把企业API访问的工具全部用无状态模式提供;其他需要长连接的工具使用有状态模式提供。参考如下实现:...... app = FastMCP( name="SimpleServer",... stateless_http=True, json_response=False )
app2 = FastMCP( name="SimpleServer2",... stateless_http=False, json_response=False )
if__name__ =='__main__': ...... @asynccontextmanager asyncdeflifespan(server): asyncwithcontextlib.AsyncExitStack()asstack: awaitstack.enter_async_context(app.session_manager.run()) awaitstack.enter_async_context(app2.session_manager.run()) yield
server = FastAPI(lifespan=lifespan) server.mount("/server1", app.streamable_http_app()) server.mount("/server2", app2.streamable_http_app()) print("Starting FastAPI server on http://localhost:5050") print("- App1 available at: http://localhost:5050/server1") print("- App2 available at: http://localhost:5050/server2") uvicorn.run(server, host="0.0.0.0", port=5050) 多应用实例模式下,你不能直接使用FastMCP提供的run方法。你只能调用其streamable_http_app()函数获得内部的Starlette应用模块,然后映射到不同的路径。有两个问题需要注意:- 每一个FastMCP应用实例内的session_manager都必须在启动时初始化(借助lifespan生命周期管理器);
- 这里的mount映射是将应用实例挂载到指定路径;与streamable HTTP默认的服务端点/mcp不矛盾(类似FastAPI中的@app.post("/mcp")。因此这种模式下客户端连接的URL要变为(以上面的代码中的server1为例):
http://xxxx:port/server1/mcp多应用实例的好处是相互独立,每一个都有自己的配置和生命周期,又可以同时运行在一个服务器实例中。以上就是我们对最新MCP SDK中推出的streamable HTTP通信模式的详细解析。新的模式在灵活性上有了较大提升,比如你可以选择彻底退化成无状态模式,以获得更好的横向扩展能力。在实际测试中,也发现了一些Bug,还有一些功能尚未完善(比如会话恢复),期待后面的更新吧。ingFang SC', 'Hiragino Sans GB', 'Microsoft YaHei UI', 'Microsoft YaHei', Arial, sans-serif;font-size: 17px;">
|