# 使用和风天气的查询接口进行天气查询 from typing import Any import httpx from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server mcp = FastMCP("weather")
# Constants API_BASE ="https://devapi.qweather.com/v7" API_KEY ="your api key"
async def query_weather(url: str) -> dict[str, Any] | None: """Make a request to the NWS API with proper error handling.""" headers = { "X-QW-Api-Key": API_KEY, } async with httpx.AsyncClient() as client: try: response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status() returnresponse.json() except Exception: returnNone
@mcp.tool() async def get_forecast(latitude:float, longitude:float) -> str: """Get weather forecast for a location.
Args: latitude: Latitude of the location longitude: Longitude of the location """ # First get the forecast grid endpoint weather_url = f"{API_BASE}/weather/7d?location={longitude},{latitude}" weather_data = await query_weather(weather_url)
forecasts = [] forperiodinweather_data['daily']: # Only show next 5 periods forecast = f""" {period['fxDate']} {period['textDay']}: Temperature: {period['tempMin']}~{period['tempMax']}°C Wind: {period['windSpeedDay']} {period['windDirDay']} """ forecasts.append(forecast)
return"\n---\n".join(forecasts)
if__name__ =="__main__": # Initialize and run the server mcp.run(transport="stdio")