asyncdefmake_nws_request(url: str) ->dict[str, Any] |None:
"""Make a request to the NWS API with proper error handling.""" headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json" }
asyncwith httpx.AsyncClient() as client:
try:
response =await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
returnNonedefformat_alert(feature: dict) ->str:
"""Format an alert feature into a readable string.""" props = feature["properties"]
returnf"""
Event: {props.get('event', 'Unknown')}Area: {props.get('areaDesc', 'Unknown')}Severity: {props.get('severity', 'Unknown')}Description: {props.get('description', 'No description available')}Instructions: {props.get('instruction', 'No specific instructions provided')}"""@mcp.tool()
asyncdefget_alerts(state: str) ->str:
"""Get weather alerts for a US state.
Args:
state: Two-letter US state code (e.g. CA, NY)
""" url =f"{NWS_API_BASE}/alerts/active/area/{state}" data =await make_nws_request(url)
ifnot data or"features"notin data:
return"Unable to fetch alerts or no alerts found."ifnot data["features"]:
return"No active alerts for this state." alerts = [format_alert(feature) for feature in data["features"]]
return"\n---\n".join(alerts)
@mcp.tool()
asyncdefget_forecast(latitude: float, longitude: float) ->str:
"""Get weather forecast for a location.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""# First get the forecast grid endpoint points_url =f"{NWS_API_BASE}/points/{latitude},{longitude}" points_data =await make_nws_request(points_url)
ifnot points_data:
return"Unable to fetch forecast data for this location."# Get the forecast URL from the points response forecast_url = points_data["properties"]["forecast"]
forecast_data =await make_nws_request(forecast_url)
ifnot forecast_data:
return"Unable to fetch detailed forecast."# Format the periods into a readable forecast periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # Only show next 5 periods forecast =f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}Wind: {period['windSpeed']}{period['windDirection']}Forecast: {period['detailedForecast']}""" forecasts.append(forecast)
return"\n---\n".join(forecasts)
if __name__ =="__main__":
# Initialize and run the server mcp.run(transport='stdio')