返回顶部
热门问答 更多热门问答
技术文章 更多技术文章

MCP协议之MCP Client一文入门

[复制链接]
链载Ai 显示全部楼层 发表于 9 小时前 |阅读模式 打印 上一主题 下一主题

本章将自己动手开发一个MCP客户端,希望大家喜欢。

一、工作原理

核心部分由两部分构成:

  • Session: MCP客户端与MCP服务端的会话,连接后可以获得MCP Server中的工具(tools)、资源(resource)以及提示(prompts)以及远程调用tools的能力。
  • LLM: MCP客户端的大脑,基于用户提出的需求自动判断是否需要调用工具以及调用哪个工具,所以此LLM必须是要能支持Tool Call才可以正常使用。

二、MCP Client构建

1、创建McpClient类

import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI

class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
# 此处使用阿里的qwen-plus大模型
api_key="your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

2、创建连接到MCP服务端函数

MCP服务端传输协议支持stdio和sse,所以此处创建2个函数

async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
ifnot (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")

command="python"ifis_pythonelse"node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)

stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)

await self.session.initialize()

# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.namefortoolintools])

async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)

streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()

await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.namefortoolintools])

3、处理用户需求

async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role":"user",
"content": query
}
]

response = await self.session.list_tools()
available_tools = [{
"type":"function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
}fortoolinresponse.tools]

# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools# 将工具列表传递给 LLM
)


final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or"")

# 处理响应并处理工具调用
ifmessage.tool_calls:
# 处理每个工具调用
fortool_callinmessage.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

# 将工具调用和结果添加到消息历史
messages.append({
"role":"assistant",
"tool_calls": [
{
"id": tool_call.id,
"type":"function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role":"tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})

# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools
)

message = response.choices[0].message
ifmessage.content:
final_text.append(message.content)

return"\n".join(final_text)

4、循环聊天函数

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")

whileTrue:
try:
query = input("\nQuery: ").strip()

ifquery.lower() =='quit':
break

response = await self.process_query(query)
print("\n"+ response)

except Exception as e:
print(f"\nError: {str(e)}")

5、会话清理

async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()

6、入口函数

async def main():
iflen(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)

client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()


if__name__ =="__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

7、程序执行

# MCP Server传输协议为stdio,启动命令为:
uv run client.py"完整server端脚本路径"

# MCP Server传输协议为sse,启动命令为:
uv run client.py http://127.0.0.1:8000/sse
# 上面的sse地址根据自己的进行调整

8、运行截图

9、完整源码

import asyncio
import json
import sys
import time
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI


class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
# 如果没有配置环境变量,请用百炼API Key替换:api_key="sk-xxx"
api_key="sk-e6fdadbd5f774ee0be35f6ecc1c52fe7",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
ifnot (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")

command="python"ifis_pythonelse"node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)

stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)

await self.session.initialize()

# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.namefortoolintools])

async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)

streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()

await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.namefortoolintools])

async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role":"user",
"content": query
}
]

response = await self.session.list_tools()
available_tools = [{
"type":"function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
}fortoolinresponse.tools]

# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools# 将工具列表传递给 LLM
)


final_text = []
message = response.choices[0].message
print(response.choices[0])
final_text.append(message.content or"")

# 处理响应并处理工具调用
ifmessage.tool_calls:
# 处理每个工具调用
fortool_callinmessage.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# 执行工具调用
start_time = time.time()
result = await self.session.call_tool(tool_name, tool_args)
end_time = time.time()
print(f"Tool {tool_name} took {end_time - start_time} seconds to execute")
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

# 将工具调用和结果添加到消息历史
messages.append({
"role":"assistant",
"tool_calls": [
{
"id": tool_call.id,
"type":"function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role":"tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})

# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen-plus",
messages=messages,
tools=available_tools
)

message = response.choices[0].message
ifmessage.content:
final_text.append(message.content)

return"\n".join(final_text)

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")

whileTrue:
try:
query = input("\nQuery: ").strip()

ifquery.lower() =='quit':
break

response = await self.process_query(query)
print("\n"+ response)

except Exception as e:
print(f"\nError: {str(e)}")

async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()


async def main():
iflen(sys.argv) < 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)

client = MCPClient()
try:
# 根据MCP Server传输协议进行选择
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()


if__name__ =="__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

结语

本文主要讲述了如何通过代码来构建MCP Client,通过对整体构建步骤的分析,相信每个人都能理解MCP Client的工作机制。大家后续可以尝试自己构建一个个性化的MCP Client,配合MCP Server,尽情的发挥你的想象。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

链载AI是专业的生成式人工智能教程平台。提供Stable Diffusion、Midjourney AI绘画教程,Suno AI音乐生成指南,以及Runway、Pika等AI视频制作与动画生成实战案例。从提示词编写到参数调整,手把手助您从入门到精通。
  • 官方手机版

  • 微信公众号

  • 商务合作

  • Powered by Discuz! X3.5 | Copyright © 2025-2025. | 链载Ai
  • 桂ICP备2024021734号 | 营业执照 | |广西笔趣文化传媒有限公司|| QQ