MCP 的通信基于JSON-RPC 2.0协议,这是一种轻量级的远程过程调用协议,使用JSON格式进行数据交换。该协议支持状态化连接,允许在一个会话中进行多次请求和响应,适用于多轮对话和复杂的工具调用流程。
MCP 支持以下两种主要的传输机制:
标准输入/输出(Stdio)
• 适用场景:客户端和服务器在同一台机器上运行的本地集成。
• 特点:使用标准输入输出流进行通信,适合访问本地文件系统或执行本地脚本的场景。 
HTTP + Server-Sent Events(SSE)
• 适用场景:客户端和服务器分布式部署的远程通信。
• 特点:客户端通过 HTTPPOST向服务器发送请求,服务器通过SSE向客户端推送实时消息,支持实时数据流和事件驱动的通信。
在 MCP 架构中,通信流程如下: 1. 客户端:位于主机应用(如聊天机器人、IDE 助手)中,负责构建请求并发送给 MCP 服务器。
服务器:提供外部数据源或工具的访问接口,接收客户端的请求,处理后返回结构化的响应。
通信协议:客户端和服务器之间通过 JSON-RPC 2.0 协议进行通信,传输机制可以是 Stdio 或 HTTP + SSE。
MCP 提供了一个统一的通信协议和多样的传输机制,支持大型语言模型与外部工具和数据源之间的高效集成。通过标准化的结构和灵活的传输方式,MCP 使得开发者能够构建更强大、可扩展的 AI 应用。
Function Calling(函数调用)是模型内部的函数调用机制,是由 LLM 提供商(如 OpenAI、Anthropic)实现的一种机制,允许模型根据用户输入,生成结构化的函数调用请求。不同平台之间可能存在差异。适合处理边界清晰、描述明确的任务,如数据提取、分类或外部 API 调用等。代码的适配性和复用性较差(我们需要把每个 Function 编码到程序中)。
而MCP(模型上下文协议)是模型与外部系统的通用通信协议。使得不同的 AI 模型和外部系统能够无缝集成,降低了适配成本。 更擅长处理复杂、多步骤的对话场景,尤其是在需要维持上下文连贯性和动态适应用户需求的场景中,其优势尤为明显。
fromtypingimportAny
importhttpx
frommcp.server.fastmcpimportFastMCP
# 初始化 FastMCP server
mcp=FastMCP("weather")
# 获取美国某个州的天气警报
@mcp.tool()
asyncdefget_alerts(state:str)->str:
"""Getweather alertsforaUSstate.
Args:
state:Two-letterUSstatecode(e.g.CA,NY)
"""
url=f"{NWS_API_BASE}/alerts/active/area/{state}"
data=awaitmake_nws_request(url)
ifnot data or"features"notindata:
return"Unable to fetch alerts or no alerts found."
ifnot data["features"]:
return"No active alerts for this state."
alerts=[format_alert(feature)forfeatureindata["features"]]
return"\n---\n".join(alerts)
# 获取某个地点的天气警报
@mcp.tool()
asyncdefget_forecast(latitude:float,longitude:float)->str:
"""Getweather forecastforalocation.
Args:
latitudeatitudeofthelocation
longitudeongitudeofthelocation
"""
#Firstgetthe forecast grid endpoint
points_url=f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data=awaitmake_nws_request(points_url)
ifnot points_data:
return"Unable to fetch forecast data for this location."
#Getthe forecastURLfromthe points response
forecast_url=points_data["properties"]["forecast"]
forecast_data=awaitmake_nws_request(forecast_url)
ifnot forecast_data:
return"Unable to fetch detailed forecast."
#Formatthe periods into a readable forecast
periods=forecast_data["properties"]["periods"]
forecasts=[]
forperiodinperiods[:5]: #Onlyshow next5periods
forecast=f"""
{period['name']}:
Temperature:{period['temperature']}°{period['temperatureUnit']}
Wind:{period['windSpeed']}{period['windDirection']}
Forecast:{period['detailedForecast']}
"""
forecasts.append(forecast)
return"\n---\n".join(forecasts)
使用main方法启动 Weather MCP Server
if__name__=="__main__":
#Initializeand run the server
mcp.run(transport='stdio')
importasyncio
fromtypingimportOptional
fromcontextlibimportAsyncExitStack
frommcpimportClientSession,StdioServerParameters
frommcp.client.stdioimportstdio_client
fromanthropicimportAnthropic
fromdotenvimportload_dotenv
load_dotenv() # load environment variablesfrom.env
classMCPClient:
def__init__(self):
#Initializesession and client objects
self.session:Optional[ClientSession]=None
self.exit_stack=AsyncExitStack()
self.anthropic=Anthropic()
# methods will go here
asyncdefconnect_to_server(self,server_script_path:str):
"""Connectto anMCPserver
Args:
server_script_pathathto the serverscript(.pyor.js)
"""
is_python=server_script_path.endswith('.py')
is_js=server_script_path.endswith('.js')
ifnot(is_python or is_js):
raiseValueError("Server script must be a .py or .js file")
command="python"ifis_pythonelse"node"
server_params=StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport=awaitself.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio,self.write=stdio_transport
self.session=awaitself.exit_stack.enter_async_context(ClientSession(self.stdio,self.write))
awaitself.session.initialize()
#Listavailable tools
response=awaitself.session.list_tools()
tools=response.tools
print("\nConnected to server with tools:",[tool.namefortoolintools])
asyncdefprocess_query(self,query:str)->str:
"""Process a query using Claude and available tools"""
messages=[
{
"role":"user",
"content":query
}
]
response=awaitself.session.list_tools()
available_tools=[{
"name":tool.name,
"description":tool.description,
"input_schema":tool.inputSchema
}fortoolinresponse.tools]
#InitialClaudeAPIcall
response=self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=messages,
tools=available_tools
)
#Processresponse and handle tool calls
final_text=[]
assistant_message_content=[]
forcontentinresponse.content:
ifcontent.type'text':
final_text.append(content.text)
assistant_message_content.append(content)
elif content.type'tool_use':
tool_name=content.name
tool_args=content.input
#Executetool call
result=awaitself.session.call_tool(tool_name,tool_args)
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
assistant_message_content.append(content)
messages.append({
"role":"assistant",
"content":assistant_message_content
})
messages.append({
"role":"user",
"content":[
{
"type":"tool_result",
"tool_use_id":content.id,
"content":result.content
}
]
})
#Getnext responsefromClaude
response=self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=messages,
tools=available_tools
)
final_text.append(response.content[0].text)
return"\n".join(final_text)
注意:不同 LLM 的 request 和 response 的结构是不一样的,DeepSeek 和千问模型遵循 OpenAI 的规范,而 Claude 与 Gemini 的 resposne 与 openai 不一样。 如:
Clauderesponse.content[0].text 与 OpenAIresponse.choices[0].message.content
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
whileTrue:
try:
query=input("\nQuery: ").strip()
ifquery.lower()=='quit':
break
response=await self.process_query(query)
print("\n"+ response)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
asyncdefmain():
iflen(sys.argv)<2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
client=MCPClient()
try:
awaitclient.connect_to_server(sys.argv[1])
awaitclient.chat_loop()
finally:
awaitclient.cleanup()
if__name__=="__main__":
importsys
asyncio.run(main())
python client.py./weather.py
#Relativepath
uv run client.py./server/weather.py
#Absolutepath
uv run client.py/Users/username/projects/mcp-server/weather.py
#Windowspath(either format works)
uv run client.pyC:/projects/mcp-server/weather.py
uv run client.pyC:\\projects\\mcp-server\\weather.py
MCP Server Github提供很多现成的MCP Server,GitHub MCP Server是一个模型上下文协议 (MCP) 服务器,可与 GitHub API 无缝集成,为开发人员和工具提供高级自动化和交互功能。
使用npx直接运行 GitHub MCP Server。需要创建一个 GitHub 个人访问令牌
(https://github.com/settings/tokens)
npx-y @modelcontextprotocol/server-github
使用官方提供的MCP Inspector工具来测试和调试 MCP Server。
npx @modelcontextprotocol/inspector npx-y @modelcontextprotocol/server-github
浏览器访问http://localhost:5173
LLM 集成 MCP Server 有很多种方式,可以自定义,也可以直接使用已有的库。
使用mcp-use进行 LLM 与 MCP 的集成,mcp-use是一个开源的 Python 库,可以非常轻松地将任何 LLM 连接到本地和远程的任何 MCP 服务器。
importasyncio
importos
fromdotenvimportload_dotenv
fromlangchain_openaiimportChatOpenAI
frommcp_useimportMCPAgent,MCPClient
"""
pip install mcp-use
"""
asyncdefmain():
#Loadenvironment variables
load_dotenv()
#Createconfiguration dictionary
config={
"mcpServers":{
"github":{
"command":"npx",
"args":["-y","@modelcontextprotocol/server-github"],
"env":{
"GITHUB_PERSONAL_ACCESS_TOKEN":"<GITHUB PERSONAL ACCESS TOKEN>"
}
}
}
}
#CreateMCPClientfromconfiguration dictionary
client=MCPClient.from_dict(config)
#CreateLLM
llm=ChatOpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com",
model="deepseek-chat")
#Createagentwiththe client
agent=MCPAgent(llm=llm,client=client,max_steps=30)
#Runthe query
result=awaitagent.run(
"search ai-agent-demo repo",
)
print(f"\nResult: {result}")
if__name__=="__main__":
asyncio.run(main())
| 欢迎光临 链载Ai (https://www.lianzai.com/) | Powered by Discuz! X3.5 |