# clone the repo and move to the project directory gitclonehttps://github.com/piyushagni5/langgraph-ai.git cdmcp/04-build-streammable-http-mcp-client
# create virtual env using uv package manager uv venv
# Activate virtual environment (On macOS/Linux) source.venv/bin/activate # On Windows: .venv\Scripts\activate
步骤 2:安装依赖
虚拟环境创建完成后,安装requirements.txt中列出的依赖:
# AI and MCP Core google-adk>=1.5.0 google-cloud-aiplatform>=1.38.0 google-generativeai>=0.3.0 mcp>=1.0.0 fastmcp>=0.9.0 # Data and Validation pydantic>=2.5.0 # CLI and UI click>=8.1.7 rich>=13.7.0 # Environment and Configuration python-dotenv>=1.0.0 # Async Support asyncio-mqtt>=0.13.0 # Development (optional) pytest>=7.4.0 black>=23.0.0 flake8>=6.1.0
安装依赖:
uvpipinstall-rrequirements.txt
步骤 3:环境变量
创建.env文件保存敏感信息(如 API Key)。本文使用两项免费服务:Google Gemini API与Tavily Search API。前者可在 Google AI Studio 申请(登录 Gmail → “Get API Key” → 同意条款 → 复制密钥至.env);后者在 Tavily 官网注册获取。
""" Terminal MCP Server Provides secure local command execution capabilities through MCP stdio transport. """
importos importsubprocess importlogging frompathlibimportPath frommcp.server.fastmcpimportFastMCP frompydanticimportBaseModel, Field
# Configure logging for stdio server logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)
# Create FastMCP server for stdio transport (no HTTP, uses stdin/stdout) mcp = FastMCP("terminal")
# Get workspace directory from environment or use default WORKSPACE_DIR = Path(os.getenv("WORKSPACE_DIR","workspace")).resolve()
# Ensure workspace exists and log location WORKSPACE_DIR.mkdir(exist_ok=True) logger.info(f"Terminal server workspace:{WORKSPACE_DIR}")
classCommandInput(BaseModel): """Input model for terminal commands with validation.""" command:str= Field( ..., description="Shell command to execute in the workspace directory", min_length=1 )
classCommandResult(BaseModel): """Output model for command execution results with full details.""" command:str= Field(..., description="The command that was executed") exit_code:int= Field(..., description="Process exit code (0 = success)") stdout:str= Field(..., description="Standard output from the command") stderr:str= Field(..., description="Standard error from the command") working_directory:str= Field(..., description="Directory where command was executed")
@mcp.tool( description="Execute a shell command in the secure workspace directory. Use for file operations, text processing, and system tasks.", title="Terminal Command Executor" ) asyncdefrun_command(params: CommandInput) -> CommandResult: """ Execute a terminal command within the workspace directory.
Security features: - Commands execute only within the workspace directory - 30-second timeout to prevent hanging processes - Full command output captured for transparency
Args: params: CommandInput containing the command to execute
Returns: CommandResult with execution details and output """ command = params.command.strip()
# Log the command execution for debugging logger.info(f"Executing command:{command}")
try: # Execute command in workspace directory with timeout result = subprocess.run( command, shell=True, cwd=WORKSPACE_DIR, # Sandbox to workspace directory capture_output=True, # Capture both stdout and stderr text=True, # Return strings instead of bytes timeout=30# 30 second timeout for safety )
# Log result summary for monitoring status ="SUCCESS"ifresult.returncode ==0else"ERROR" logger.info(f"{status}: Command '{command}' completed with exit code{result.returncode}")
returncmd_result
exceptsubprocess.TimeoutExpired: logger.error(f"Command '{command}' timed out after 30 seconds") returnCommandResult( command=command, exit_code=-1, stdout="", stderr="Command timed out after 30 seconds", working_directory=str(WORKSPACE_DIR) ) exceptExceptionase: logger.error(f"Error executing command '{command}':{e}") returnCommandResult( command=command, exit_code=-1, stdout="", stderr=f"Execution error:{str(e)}", working_directory=str(WORKSPACE_DIR) )
if__name__ =="__main__": logger.info("Starting Terminal MCP Server (stdio transport)") logger.info(f"Workspace:{WORKSPACE_DIR}") # Run with stdio transport (communicates via stdin/stdout) mcp.run(transport="stdio")
""" Temperature Conversion MCP Server Provides comprehensive temperature conversion tools supporting Celsius, Fahrenheit, and Kelvin. """
import click import logging from typing importUnion from pydantic importBaseModel,Field, validator from mcp.server.fastmcp importFastMCP
@click.command() @click.option("--port", default=8000, help="Port to run the server on") @click.option("--host", default="localhost", help="Host to bind the server to") @click.option("--log-level", default="INFO", help="Logging level")
defmain(port:int,host:str,log_level:str) ->None: """Launch the Temperature Conversion MCP Server."""
# Create FastMCP server with streamable HTTP transport mcp =FastMCP( "Temperature Converter", host=host, port=port, stateless_http=True# Enable streamable HTTP protocol )
# Input/Output Models for type safety and validation classTemperatureInput(BaseModel): """Input model for temperature conversion with validation.""" temperature:float =Field( ..., description="Temperature value to convert" )
@validator('temperature') defvalidate_temperature_range(cls, v): """Validate temperature is within physically reasonable bounds.""" ifv < -273.15: # Below absolute zero in Celsius raiseValueError("Temperature cannot be below absolute zero (-273.15°C)") returnv
classTemperatureResult(BaseModel): """Output model for temperature conversion results.""" original_value:float =Field(..., description="Original temperature value") original_scale:str =Field(..., description="Original temperature scale") converted_value:float =Field(..., description="Converted temperature value") converted_scale:str =Field(..., description="Target temperature scale") formula:str =Field(..., description="Conversion formula used")
# Core conversion functions (business logic) defcelsius_to_fahrenheit_calc(celsius:float) ->float: """Convert Celsius to Fahrenheit using standard formula.""" return(celsius *9/5) +32
deffahrenheit_to_celsius_calc(fahrenheit:float) ->float: """Convert Fahrenheit to Celsius using standard formula.""" return(fahrenheit -32) *5/9
defcelsius_to_kelvin_calc(celsius:float) ->float: """Convert Celsius to Kelvin by adding absolute zero offset.""" returncelsius +273.15
defkelvin_to_celsius_calc(kelvin:float) ->float: """Convert Kelvin to Celsius by subtracting absolute zero offset.""" returnkelvin -273.15
deffahrenheit_to_kelvin_calc(fahrenheit:float) ->float: """Convert Fahrenheit to Kelvin via Celsius intermediate.""" celsius = fahrenheit_to_celsius_calc(fahrenheit) returncelsius_to_kelvin_calc(celsius)
defkelvin_to_fahrenheit_calc(kelvin:float) ->float: """Convert Kelvin to Fahrenheit via Celsius intermediate.""" celsius = kelvin_to_celsius_calc(kelvin) returncelsius_to_fahrenheit_calc(celsius)
# MCP Tool Registrations - These become available to clients
@mcp.tool( description="Convert temperature from Celsius to Fahrenheit", title="Celsius to Fahrenheit Converter" ) defcelsius_to_fahrenheit(params:TemperatureInput) ->TemperatureResult: """Convert Celsius to Fahrenheit with validation and formula info.""" converted = celsius_to_fahrenheit_calc(params.temperature) returnTemperatureResult( original_value=params.temperature, original_scale="Celsius", converted_value=round(converted,2), converted_scale="Fahrenheit", formula="°F = (°C × 9/5) + 32" )
@mcp.tool( description="Convert temperature from Fahrenheit to Celsius", title="Fahrenheit to Celsius Converter" ) deffahrenheit_to_celsius(params:TemperatureInput) ->TemperatureResult: """Convert Fahrenheit to Celsius with additional validation.""" # Additional validation for Fahrenheit absolute zero ifparams.temperature < -459.67: # Below absolute zero in Fahrenheit raiseValueError("Temperature cannot be below absolute zero (-459.67°F)")
@mcp.tool( description="Convert temperature from Celsius to Kelvin", title="Celsius to Kelvin Converter" ) defcelsius_to_kelvin(params:TemperatureInput) ->TemperatureResult: """Convert Celsius to Kelvin - simple offset addition.""" converted = celsius_to_kelvin_calc(params.temperature) returnTemperatureResult( original_value=params.temperature, original_scale="Celsius", converted_value=round(converted,2), converted_scale="Kelvin", formula="K = °C + 273.15" )
@mcp.tool( description="Convert temperature from Kelvin to Celsius", title="Kelvin to Celsius Converter" ) defkelvin_to_celsius(params:TemperatureInput) ->TemperatureResult: """Convert Kelvin to Celsius with non-negative validation.""" # Kelvin cannot be negative by definition ifparams.temperature <0: raiseValueError("Kelvin temperature cannot be negative")
@mcp.tool( description="Convert temperature from Fahrenheit to Kelvin", title="Fahrenheit to Kelvin Converter" ) deffahrenheit_to_kelvin(params:TemperatureInput) ->TemperatureResult: """Convert Fahrenheit to Kelvin via two-step conversion.""" ifparams.temperature < -459.67: raiseValueError("Temperature cannot be below absolute zero (-459.67°F)")
@mcp.tool( description="Convert temperature from Kelvin to Fahrenheit", title="Kelvin to Fahrenheit Converter" ) defkelvin_to_fahrenheit(params:TemperatureInput) ->TemperatureResult: """Convert Kelvin to Fahrenheit via two-step conversion.""" ifparams.temperature <0: raiseValueError("Kelvin temperature cannot be negative")
# Server startup with error handling try: logger.info(f"Temperature server running on {host}:{port}") logger.info("Available conversions: °C↔°F, °C↔K, °F↔K") mcp.run(transport="streamable-http") # Use new streamable HTTP transport exceptKeyboardInterrupt: logger.info("Server shutting down gracefully...") exceptExceptionase: logger.error(f"Server error: {e}") raise finally: logger.info("Temperature conversion server stopped")
defstart_temperature_server(self,port:int =8000,host:str ="localhost") ->bool: """Start the temperature conversion server with health monitoring.""" try: server_path =Path(__file__).parent /"temperature_server.py"
logger.info(f"Starting temperature server on {host}:{port}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True )
self.processes.append(process)
# Wait for server to be ready and verify health returnself._wait_for_server(host, port)
exceptExceptionase: logger.error(f"Failed to start temperature server: {e}") returnFalse
def_wait_for_server(self,host:str,port:int,timeout:int =10) ->bool: """Wait for server to become available with health checking.""" url = f"http://{host}:{port}/mcp" start_time = time.time()
whiletime.time() - start_time <timeout: try: # Try to connect to the MCP endpoint # We expect a 406 "Not Acceptable" response which means the server is running # but needs proper MCP headers (this confirms the MCP server is active) response = requests.get(url, timeout=1) ifresponse.status_code ==406: # MCP server expects proper headers logger.info(f"Server ready at {host}:{port}") returnTrue except requests.RequestException: pass
time.sleep(0.5)
logger.warning(f"Server at {host}:{port} not ready within {timeout}s") returnFalse
defstop_all_servers(self) ->None: """Stop all managed server processes gracefully.""" forprocessinself.processes: try: process.terminate() # Send SIGTERM for graceful shutdown process.wait(timeout=5) # Wait up to 5 seconds logger.info(f"Stopped server process {process.pid}") exceptExceptionase: logger.error(f"Error stopping process {process.pid}: {e}") try: process.kill() # Force kill if graceful shutdown fails except: pass
self.processes.clear()
# Global launcher instance launcher =ServerLauncher()
# Google ADK imports fromgoogle.adk.agents.llm_agentimportLlmAgent fromgoogle.adk.tools.mcp_tool.mcp_toolsetimportMCPToolset fromgoogle.adk.tools.mcp_tool.mcp_session_managerimportStreamableHTTPServerParams fromgoogle.adk.tools.mcp_toolimportStdioConnectionParams frommcpimportStdioServerParameters
# Local imports fromsrc.utils.config_loaderimportconfig_loader fromsrc.utils.formattersimportformatter
logger = logging.getLogger(__name__)
classAgentWrapper: """ Enhanced wrapper for Google ADK agent with MCP toolset management.
This class orchestrates the connection between the ADK agent and multiple MCP servers, providing automatic server discovery, connection health monitoring, and tool filtering. """
def__init__(self, tool_filter:Optional[List[str]] =None): """ Initialize the agent wrapper.
Args: tool_filter: Optional list of tool names to allow. If None, all tools are loaded. """ self.tool_filter = tool_filter self.agent:Optional[LlmAgent] =None self.toolsetsist[MCPToolset] = [] self.server_statusict[str,str] = {}
asyncdefbuild(self) ->None: """ Build the ADK agent with MCP toolsets.
This method orchestrates the entire agent building process: 1. Loads server configurations from config file 2. Establishes connections to each configured server 3. Discovers and filters available tools from each server 4. Creates the ADK agent with all loaded toolsets """ logger.info("Building agent with MCP toolsets...")
try: # Load toolsets from all configured servers toolsets =awaitself._load_toolsets()
ifnottoolsets: logger.warning("No toolsets loaded - agent will have no tools available")
# Create the ADK agent with Gemini 2.0 Flash Exp self.agent = LlmAgent( model="gemini-2.0-flash-exp", # Use latest Gemini model name="universal_mcp_assistant", instruction=self._get_agent_instruction(), tools=toolsets # Provide all loaded toolsets )
self.toolsets = toolsets logger.info(f"Agent built successfully with{len(toolsets)}toolsets")
exceptExceptionase: logger.error(f"Failed to build agent:{e}") raise
def_get_agent_instruction(self) ->str: """Get the system instruction that defines the agent's behavior and capabilities.""" return"""You are a helpful assistant with access to temperature conversion tools and local file operations.
Your capabilities include: - Converting temperatures between Celsius, Fahrenheit, and Kelvin - Executing local commands for file operations - Providing detailed explanations of conversions and formulas
When handling temperature conversions: - Always validate input values for physical reasonableness - Show the conversion formula used - Round results to appropriate precision - Handle multiple conversions in sequence when requested
When working with files: - Use the terminal tools to create, read, and modify files - Format output clearly and professionally - Confirm successful file operations
Be precise, helpful, and educational in your responses. Show your work and explain the steps you're taking."""
asyncdef_load_toolsets(self) ->List[MCPToolset]: """ Load toolsets from all configured MCP servers.
This method iterates through all configured servers, attempts to connect to each one, and loads their available tools into MCPToolset instances.
Returns: List of successfully connected MCPToolset instances. """ servers = config_loader.get_servers() toolsets = []
# Create MCPToolset and connect to server toolset = MCPToolset( connection_params=connection_params, tool_filter=self.tool_filter # Apply tool filtering if specified )
# Test connection by attempting to get available tools tools =awaittoolset.get_tools() tool_names = [tool.namefortoolintools]
iftools: toolsets.append(toolset) self.server_status[server_name] ="connected" formatter.print_tool_summary(server_name, tool_names) logger.info(f"Connected to{server_name}:{len(tool_names)}tools loaded") else: logger.warning(f"No tools found on server '{server_name}'") self.server_status[server_name] ="no_tools"
exceptExceptionase: logger.error(f"Failed to connect to server '{server_name}':{e}") self.server_status[server_name] =f"error:{str(e)}" continue
asyncdef_create_connection_params( self, server_name:str, server_configict[str,Any] ) ->Optional[Any]: """ Create appropriate connection parameters based on server transport type.
Args: server_name: Name of the server for logging server_config: Server configuration dictionary
Returns: Connection parameters object or None if creation failed """ server_type = server_config["type"]
exceptExceptionase: logger.error(f"Error creating connection params for '{server_name}':{e}") returnNone
asyncdefclose(self) ->None: """ Gracefully close all toolset connections and cleanup resources. """ logger.info("Shutting down agent and closing toolset connections...")
""" MCP Client Enhanced client interface with detailed debugging of MCP interactions. """
import logging from typing import Optional, List, AsyncGenerator, Any from google.genai.types import Content, Part from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService
from src.agent.agent_wrapper import AgentWrapper from src.utils.formatters import formatter
logger = logging.getLogger(__name__)
class MCPClient: """ Enhanced MCP client with session management, streaming responses, and detailed debugging.
This client provides a high-level interface for interacting with the ADK agent while showing detailed information about MCP server interactions. """
def __init__( self, app_name:str="universal_mcp_client", user_id:str="default_user", session_id:str="default_session", tool_filter: Optional[List[str]] =None, debug_mode:bool= False ): """ Initialize the MCP client with debugging capabilities.
Args: app_name: Application identifier for ADK user_id: User identifier for session context session_id: Session identifier for conversation context tool_filter: Optional list of allowed tool names debug_mode: Enable detailed debugging of MCP interactions """ self.app_name = app_name self.user_id = user_id self.session_id = session_id self.debug_mode = debug_mode
logger.info(f"MCPClient initialized for user '{user_id}', session '{session_id}'") ifdebug_mode: logger.info("Debug mode enabled - detailed MCP interactions will be shown")
asyncdefinitialize(self)->None: """ Initialize the client session and agent.
This method must be called before using send_message(). It sets up the session, builds the agent, and prepares the runner. """ ifself.is_initialized: logger.warning("Client already initialized") return
# Print server status summary status =self.agent_wrapper.get_server_status() connected =sum(1forsinstatus.values()ifs =="connected") logger.info(f"Server status: {connected}/{len(status)} servers connected")
except Exceptionase: logger.error(f"Failed to initialize MCP client: {e}") awaitself.shutdown() raise
asyncdefsend_message(self, message:str)->AsyncGenerator[Any,None]: """ Send a message to the agent and stream the response with detailed debugging.
Args: message: User message to send to the agent
Yields: Streaming response events from the agent with MCP interaction details
Raises: RuntimeError: If client is not initialized """ ifnotself.is_initialized: raiseRuntimeError("Client not initialized. Call initialize() first.")
ifnot message.strip(): raiseValueError("Message cannot be empty")
logger.info(f"Sending message: {message[:100]}{'...' if len(message) > 100 else ''}")
def _analyze_event(self, event: Any, event_count: int)->None: """ Analyze and display detailed information about MCP events.
Args: event: The event object from the agent event_count: Sequential event number for tracking """ try: # Checkifthis is a tool-related event ifhasattr(event,'tool_calls') and event.tool_calls: for tool_call in event.tool_calls: formatter.print_mcp_interaction( "tool_call", { "tool_name": tool_call.name if hasattr(tool_call, 'name') else "Unknown", "parameters": tool_call.args if hasattr(tool_call, 'args') else {}, "server": "MCP Server", "event_number": event_count } )
# Check for tool responses if hasattr(event, 'tool_responses') and event.tool_responses: for tool_response in event.tool_responses: formatter.print_mcp_interaction( "tool_response", { "tool_name": getattr(tool_response, 'name', 'Unknown'), "result": str(tool_response.content) if hasattr(tool_response, 'content') else "No result", "status": "success" if not hasattr(tool_response, 'error') else "error", "event_number": event_count } )
# Check for agent thinking/processing if hasattr(event, 'content') and hasattr(event.content, 'parts'): if event.content.parts and not getattr(event, 'is_final_response', lambda: False)(): formatter.print_mcp_interaction( "agent_thinking", { "content": event.content.parts[0].text if event.content.parts else "Processing...", "event_number": event_count } )
# Check for final response if hasattr(event, 'is_final_response') and event.is_final_response(): content = "" if hasattr(event, 'content') and hasattr(event.content, 'parts') and event.content.parts: content = event.content.parts[0].text
asyncdefstart_servers(self) ->bool: """Start required HTTP servers with health monitoring.""" try: logger.info("Starting HTTP servers...")
# Start temperature server iflauncher.start_temperature_server(port=8000): self.server_started =True logger.info("Temperature server started successfully") returnTrue else: logger.error("Failed to start temperature server") returnFalse
# In debug mode, detailed interactions are shown by the client # Here we just count events and look for the final response ifhasattr(event,'is_final_response')andevent.is_final_response(): final_response = event break
# Display final response iffinal_responseandhasattr(final_response,'content'): ifhasattr(final_response.content,'parts')andfinal_response.content.parts: response_text = final_response.content.parts[0].text print(f"\nFinal Response:\n{response_text}\n") else: print("Task completed (no text response)\n") else: print("No final response received\n")
def_show_help(self) ->None: """Show example requests and debugging tips.""" examples = [ "Convert 25 degrees Celsius to Fahrenheit", "What is 100°F in Celsius and Kelvin?", "Convert 300 Kelvin to Celsius and Fahrenheit", "Convert 0°C to all other temperature scales", "Convert room temperature (20°C) to Fahrenheit and save to file", "Create a temperature conversion table for 0, 25, 50, 75, 100°C" ]
debug_tips = [ "Use 'debug on' to see detailed MCP server interactions", "Watch for tool calls, parameters, and server responses", "Each event shows the communication between client and servers", "Use 'debug off' to see only final responses" ]