|
本文通过100行代码看到MCP的核心原理并不复杂,但它的设计巧妙深入理解使我们能够超越简单的SDK使用,创建更强大、更灵活的AI应用集成方案。 当我开始研究 Model Context Protocol (MCP)接入的时候,发现一个问题,绝大多数的文档都是以@mcp.tool这样注解的方式注入。但如果当前有很多异步的业务流程,接入会非常麻烦,它并没有一个代码实体的存在可以加注解。难道需要为一个个流程编写同步函数吗? 好奇心驱使我进一步分析MCP的通信原理,看看是不是有什么办法能更方便地接入MCP,理解MCP的原理。 MCP提供了STDIO和SSE两种传输协议,当前很多实验性的工具都是使用STDIO传输。不过如果提供服务的话,基本就是SSE(Server-Sent Events)。所以本文重点分析讨论SSE的MCP接入模式。 在搜索SSE的时候,看到了阮一峰老师在2017年对于SSE的特点归纳: SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。 总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。 这个特点让我更加好奇了,stdio中可以使用stdin来进行输入,使用stdout来进行输出。但是SSE是单向通道,MCP要如何实现双向通信呢?是建立两根SSE通道吗?带着这个疑问,我开始了动手实践。 利用MCP官方提供的工具npx @modelcontextprotocol/inspector可以比较方便地拉起一个验证MCP的管理页。针对这个管理页抓包就能发现一些SSE的通信端倪。 1./sse这个URL只负责推送信息,并不能发送信息,发送信息需要另外的URL。 2.Client连接上ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">/sse这个地址的第一个Event就是告诉Client发送信息需要去哪个URL发,这个URL通常会带上唯一的会话ID。 观察这个抓包情况,我们前面的双向通信疑问基本可以有答案了: 1.只有一根SSE长连接,用来Server向Client推送数据,另外一个Client向Server发送请求的通道是使用普通的HTTP POST请求。 2.Client向Server发送的HTTP POST请求中只使用2xx反馈是否收到指令,所有的数据返回是通过一开始的SSE长连接来推送。 为了验证这个猜想,我还特地做一个实验,使用curl模拟了POST/messsage?sessionId=***发送一个请求包,果不其然在SSE的事件流中多了一条事件。 通过上一个章节的抓包,我们基本摸清了MCP的SSE通信流程: 1./sseURL建立SSE长链之后先返回一个ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">endpoint(常见为ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">/message),数据格式为纯文本的同域名URL字符串。 2.client使用POST向ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">endpoint(/message)发送调用请求,POST中的body满足JSON-RPC规范,包含字段ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">jsonrpc、ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">method、ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">params、ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">id。 3.在ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">/sse长连接中返回的event满足JSON-RPC规范,包含字段ingFang SC", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;letter-spacing: 0.034em;font-style: normal;font-weight: normal;">jsonrpc、result、id、error(执行错误时)。 看起好像并不复杂,我们尝试用Python来实现一下(不使用MCP Python SDK)。 fromfastapiimportFastAPI, Requestimportuuidfromsse_starlette.sseimportEventSourceResponsefrompydanticimportBaseModelimportjson
app = FastAPI()mcpHub = {}
classMcpRequest(BaseModel): id:Optional[int] =None jsonrpc:str method:str params:Optional[dict] =None
classMCPServer: def__init__(self): self.queue = asyncio.Queue() asyncdefreader(self): whileTrue: event =awaitself.queue.get() yieldevent
asyncdefrequest(self, payload: McpRequest): ifpayload.method =="initialize": awaitself.queue.put({"event":"message","data": ..}) elifpayload.method =="tools/list": ...
@app.get("/sse")asyncdefsse(): client_id =str(uuid.uuid4()) mcp = MCPServer() mcpHub[client_id] = mcp awaitmcp.queue.put({"event":"endpoint","data":f"/message?client_id={client_id}"}) returnEventSourceResponse(mcp.reader()) @app.post("/message")asyncdefmessage(request: Request, payload: McpRequest): client_id = request.query_params.get("client_id") ifclient_idnotinmcpHub: return"no client" awaitmcpHub[client_id].request(payload) return"ok"
在这段代码中,我们引入了这样几个设计: 1.我们使用了asyncio.Queue()来解耦业务流和MCP服务流。这个消息队列联动EventSourceResponse的数据流。每往这个消息队列打一个消息,就会自动通过EventSourceResponse的数据流向Client推送一条消息。这样Client在Server侧看起来就是一个标准的订阅MQ的消费者。 2.在内存中维护一个client_id映射消息队列的字典,这样一旦有消息进入就可以知晓使用的是哪个MQ,然后往对应的MQ里面投递消息。在分布式系统中,这个client_id可以是消息队列的全局唯一标识,这样无论打到哪台机器上,都能够找到正确的队列。 3.服务侧在处理之后,将消息投递回消息队列之后,Client就能感知。MCPServer和MCPClient保持长链之后,后方的业务系统侧理论上可以进行无限时长执行(如果Client侧不主动超时退出),一切均以消息投递回来为准。 我们可以参考文档来看看有哪些method需要被支持: 在MCP的resource的method中,有个不起眼的resources/subcribe引起了我的注意。首先。我们来看看什么是resource,官方给出的定义是: Resources represent any kind of data that an MCP server wants to make available to clients. This can include:File contents、Database records、API responses、Live system data、Screenshots and images、Log files、And more
所以,如果我们使用resources/subcribe订阅一个Database,那么这个数据库的所有变动就会源源不断地推送过来,这就非常近似流计算的常见使用形态了。 因为SSE已经让Server建立向Client的单向数据流,所以如果Client发起一个订阅,我们就创建一个Flink流计算任务向MQ打消息,就非常原生地实现了资源的订阅。我们可以扩展一下上面的这个拓扑结构。 1.从大模型视角看流计算:基于MCP协议,大模型实际上能够非常优雅地接入流计算的能力,来完成复杂业务逻辑构建。 2.从流计算视角看大模型:使用MCP协议之后,大模型似乎就变成了一个标准流计算处理节点,能够接收流式消息,也能给向另外的MQ投递消息。 不得不说,这个确实就是MCP设计上的一个优势。感觉MCP有点像RPC,又有点像MQ,那么这到底是什么呢?我们不妨从编程模型的角度来思考一下。 MCP从编程模型的角度来看,本质上是一种有状态的双向RPC(远程过程调用)模型,结合了事件驱动和请求-响应的特性。这种混合模式使其在AI应用与外部系统集成方面具有独特优势。 MCP的核心特征包括: 1.有状态会话:与传统无状态REST API不同,MCP维护会话状态,客户端和服务器之间建立长期连接,会话具有明确的生命周期。 2.双向通信:不仅客户端可以调用服务器(传统RPC模式),服务器也可以调用客户端(反向RPC)。例如,服务器可以请求客户端执行AI采样。 3.基于能力的协商:初始化阶段进行能力协商,动态发现可用功能,适应不同实现和版本。 4.事件通知机制:支持单向通知,资源变更订阅模式,异步事件处理。 5.标准化接口:定义了一组标准操作,使用JSON Schema定义参数和返回值,促进互操作性。 为了更好地理解MCP的定位,我们可以将其与其他常见的编程模型进行比较: MCP在各种编程模型中占据了一个独特的位置: 比REST API更有状态和双向,但比消息队列更直接和轻量。 比WebSocket更结构化和标准化,但比gRPC更灵活和易于理解。 比GraphQL更专注于工具调用,但比RPC更关注资源和上下文。
同时,正因为MCP这样的一个独特的功能位,不要因为当前的一些能力局限性,就放弃了MCP的原生化的适配。异步任务、事件驱动等架构本身就应该能够原生对接MCP。 既然MCP的整个运行原理并不复杂,我们就尝试自己实现一次,致敬一下这个优秀的设计。 fromfastapiimportFastAPI, Requestfromsse_starlette.sseimportEventSourceResponseimportasyncioimportjsonimportuuidfrompydanticimportBaseModelfromtypingimportOptionalimportuvicornimportinspectapp = FastAPI()mcpHub = {}classMcpRequest(BaseModel): id:Optional[int] =None jsonrpc:str method:str params:Optional[dict] =NoneclassMCPServer: def__init__(self, name, message_path, tools): self.queue = asyncio.Queue() self.client_id =str(uuid.uuid4()) self.message_path = message_path self.info = { "protocolVersion":"2024-11-05", "capabilities": { "experimental": {}, "tools": { "listChanged":False } }, "serverInfo": { "name": name, "version":"1.6.0" } } self.tools = tools deflist_tool(self): result = [] fortoolinself.tools: toolInfo = { "name": tool.__name__, "description": tool.__doc__, "inputSchema": {"type":"object","properties":{}}, } forname, paramininspect.signature(tool).parameters.items(): toolInfo["inputSchema"]["properties"][name] = { "title": name, "type":"string", } result.append(toolInfo) returnresult asyncdefreader(self): whileTrue: event =awaitself.queue.get() yieldevent @staticmethod defresponse(result,id): message = { "jsonrpc":"2.0", "result": result, } ifidisnotNone: message["id"] =id returnjson.dumps(message) asyncdefrequest(self, req: McpRequest): ifreq.method =="initialize": awaitself.queue.put({"event":"message","data": self.response(self.info, req.id)}) elifreq.method =="tools/list": awaitself.queue.put({"event":"message","data": self.response({"tools": self.list_tool()}, req.id)}) elifreq.method =="tools/call": fortoolinself.tools: iftool.__name__ == req.params.get("name"): result =awaittool(**req.params["arguments"]) awaitself.queue.put({"event":"message","data": self.response({"content": result,"isError":False}, req.id)}) breakasyncdeftest(state=None): """ description """ result =f"hi{state}" awaitasyncio.sleep(1) result +="!" returnresult@app.get("/receive_test")asyncdefreceive_test(): mcp = MCPServer(name="mcp-test",message_path="/send_test", tools=[test]) mcpHub[mcp.client_id] = mcp awaitmcp.queue.put({"event":"endpoint","data":f"{mcp.message_path}?client_id={mcp.client_id}"}) returnEventSourceResponse(mcp.reader())@app.post("/send_test")asyncdefsend_test(request: Request, payload: McpRequest): client_id = request.query_params.get("client_id") ifclient_idnotinmcpHub: return"no client" awaitmcpHub[client_id].request(payload) return"ok"if__name__ =="__main__": uvicorn.run(app, host="0.0.0.0", port=8001)
如上大概100行左右的代码,我们实现了一个简易版本的MCP服务,较官方的MCP Python SDK,我们获得了几个重要的特性优化: 1.tool注册不再依赖@mcp.tool这样的注解,完全可以动态传入,针对不同的场景,提供不同MCP URL,上面提供不同的Tool。 2.编程模型为MQ驱动的服务,对接异步系统、事件驱动的系统或平台较为友好。参考该Python实现,转化成其他语言的版本也较为方便。 3.不依赖 /sse /message 这些默认路由地址,也能正常运行,证明MCP的URL可以完全自定义。 本文深入探讨MCP的原理、通信机制和编程模型本质之后,我们看到MCP不仅仅是一个简单的API或SDK,而是一个精心设计的协议,它: 1.采用client-host-server架构,支持多种服务器连接; 2.实现了有状态的双向RPC模型,结合了事件驱动特性; 3.提供了标准化的工具调用和资源访问机制; 4.支持动态能力协商和功能发现; 5.相比较MQ、API、WS,占据了独特的功能位置,专为AI应用与外部系统集成而优化; 正如我们通过100行代码看到的,MCP的核心原理并不复杂,但它的设计巧妙,这种深入理解将使我们能够超越简单的SDK使用,创建更强大、更灵活的AI应用集成方案。 参
|