Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Swiss BFS API MCP Server | |
| Provides broad access to Swiss Federal Statistical Office data via PxWeb API | |
| Refactored to use FastMCP for consistency with OpenParlData server. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from enum import Enum | |
| import httpx | |
| from mcp.server.fastmcp import FastMCP | |
| from pydantic import BaseModel, Field, ConfigDict | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastMCP server | |
| mcp = FastMCP("swiss-bfs-api") | |
| # API Configuration | |
| BASE_URL = "https://www.pxweb.bfs.admin.ch/api/v1" | |
| class Language(str, Enum): | |
| DE = "de" | |
| FR = "fr" | |
| IT = "it" | |
| EN = "en" | |
| class DataFormat(str, Enum): | |
| CSV = "csv" | |
| JSON = "json" | |
| JSON_STAT = "json-stat" | |
| JSON_STAT2 = "json-stat2" | |
| PX = "px" | |
| class FilterType(str, Enum): | |
| ALL = "all" | |
| ITEM = "item" | |
| TOP = "top" | |
| # Datacube knowledge base: Maps keywords to known datacube IDs with descriptions | |
| # This helps with semantic search since the API only returns cryptic IDs | |
| DATACUBE_KNOWLEDGE_BASE = { | |
| # Population & Demographics (px-x-01) | |
| "population": [ | |
| ("px-x-0102010000_101", "Permanent resident population by canton"), | |
| ("px-x-0102020000_101", "Population by age and sex"), | |
| ("px-x-0102020202_106", "Population statistics and scenarios"), | |
| ("px-x-0102020300_101", "Population growth and change"), | |
| ], | |
| "demographics": [ | |
| ("px-x-0102010000_101", "Permanent resident population by canton"), | |
| ("px-x-0102020000_101", "Population by age and sex"), | |
| ], | |
| "birth": [ | |
| ("px-x-0102020000_101", "Birth rates and statistics"), | |
| ], | |
| "death": [ | |
| ("px-x-0102020000_101", "Mortality rates and statistics"), | |
| ], | |
| # Employment & Labor (px-x-03) | |
| "employment": [ | |
| ("px-x-0301000000_103", "Employment by sector"), | |
| ("px-x-0301000000_104", "Employment statistics"), | |
| ], | |
| "unemployment": [ | |
| ("px-x-0301000000_103", "Unemployment rates"), | |
| ], | |
| "labor": [ | |
| ("px-x-0301000000_103", "Labor market statistics"), | |
| ], | |
| "work": [ | |
| ("px-x-0301000000_103", "Employment and work statistics"), | |
| ], | |
| # Prices & Inflation (px-x-05) | |
| "inflation": [ | |
| ("px-x-0502010000_101", "Consumer price index (CPI)"), | |
| ], | |
| "prices": [ | |
| ("px-x-0502010000_101", "Price statistics and indices"), | |
| ], | |
| "cost": [ | |
| ("px-x-0502010000_101", "Cost of living indices"), | |
| ], | |
| # Income & Consumption (px-x-20) | |
| "income": [ | |
| ("px-x-2105000000_101", "Income distribution"), | |
| ("px-x-2105000000_102", "Household income"), | |
| ], | |
| "wages": [ | |
| ("px-x-2105000000_101", "Wage statistics"), | |
| ], | |
| "salary": [ | |
| ("px-x-2105000000_101", "Salary and compensation"), | |
| ], | |
| # Education (px-x-15) | |
| "education": [ | |
| ("px-x-1502010000_101", "Education statistics"), | |
| ("px-x-1502010100_101", "Students and schools"), | |
| ], | |
| "students": [ | |
| ("px-x-1502010100_101", "Student enrollment"), | |
| ], | |
| "schools": [ | |
| ("px-x-1502010100_101", "School statistics"), | |
| ], | |
| "university": [ | |
| ("px-x-1502010100_101", "Higher education statistics"), | |
| ], | |
| # Health (px-x-14) | |
| "health": [ | |
| ("px-x-1404010100_101", "Health statistics"), | |
| ("px-x-1404050000_101", "Healthcare costs"), | |
| ], | |
| "hospital": [ | |
| ("px-x-1404010100_101", "Hospital statistics"), | |
| ], | |
| "medical": [ | |
| ("px-x-1404010100_101", "Medical care statistics"), | |
| ], | |
| # Energy (px-x-07) | |
| "energy": [ | |
| ("px-x-0702000000_101", "Energy statistics"), | |
| ], | |
| "electricity": [ | |
| ("px-x-0702000000_101", "Electricity production and consumption"), | |
| ], | |
| "power": [ | |
| ("px-x-0702000000_101", "Power generation"), | |
| ], | |
| # Housing (px-x-09) | |
| "housing": [ | |
| ("px-x-0902020100_104", "Housing statistics"), | |
| ], | |
| "rent": [ | |
| ("px-x-0902020100_104", "Rental prices"), | |
| ], | |
| "construction": [ | |
| ("px-x-0902020100_104", "Construction statistics"), | |
| ], | |
| } | |
| # Global HTTP client | |
| http_client: Optional[httpx.AsyncClient] = None | |
| def get_client() -> httpx.AsyncClient: | |
| """Get or create HTTP client.""" | |
| global http_client | |
| if http_client is None: | |
| http_client = httpx.AsyncClient( | |
| timeout=60.0, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (compatible; BFS-MCP/1.0; +https://github.com/user/bfs-mcp)", | |
| "Accept": "application/json", | |
| "Accept-Language": "en,de,fr,it" | |
| } | |
| ) | |
| return http_client | |
| # Pydantic models for input validation | |
| class ListDatacubesInput(BaseModel): | |
| """Input for listing BFS datacubes.""" | |
| model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') | |
| path: str = Field("", description="Category path to explore (e.g., '' for root, 'px-x-01' for population)") | |
| language: Language = Field(Language.EN, description="Response language") | |
| class GetMetadataInput(BaseModel): | |
| """Input for getting datacube metadata.""" | |
| model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') | |
| datacube_id: str = Field(..., description="The BFS datacube identifier (e.g., px-x-0102030000_101)", min_length=1) | |
| language: Language = Field(Language.EN, description="Response language") | |
| class DimensionFilter(BaseModel): | |
| """Filter for a single dimension.""" | |
| code: str = Field(..., description="Dimension code (e.g., 'Jahr', 'Region', 'Geschlecht')") | |
| filter: FilterType = Field(..., description="Filter type") | |
| values: List[str] = Field(..., description="Values to select") | |
| class QueryDataInput(BaseModel): | |
| """Input for querying BFS datacube data.""" | |
| model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') | |
| datacube_id: str = Field(..., description="The BFS datacube identifier", min_length=1) | |
| filters: List[DimensionFilter] = Field(default=[], description="Query filters for dimensions") | |
| format: DataFormat = Field(DataFormat.CSV, description="Response format") | |
| language: Language = Field(Language.EN, description="Response language") | |
| class SearchDatacubesInput(BaseModel): | |
| """Input for searching BFS datacubes.""" | |
| model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') | |
| keywords: str = Field(..., description="Search keywords (e.g., 'inflation', 'employment', 'education', 'health')", min_length=1) | |
| language: Language = Field(Language.EN, description="Response language") | |
| class GetConfigInput(BaseModel): | |
| """Input for getting API configuration.""" | |
| model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid') | |
| language: Language = Field(Language.EN, description="Response language") | |
| # Tool implementations | |
| async def list_datacubes(params: ListDatacubesInput) -> str: | |
| """ | |
| List available datacubes from a BFS category path. | |
| Browse the Swiss Federal Statistical Office data catalog by category. | |
| The BFS API has datacube IDs at the root level. | |
| Examples: | |
| - List all datacubes: path="" | |
| - Get specific datacube: path="px-x-0102030000_101" | |
| """ | |
| url = f"{BASE_URL}/{params.language.value}" | |
| if params.path: | |
| url += f"/{params.path}" | |
| try: | |
| client = get_client() | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| result = f"Available datacubes (showing first 50):\n\n" | |
| if isinstance(data, list): | |
| # Limit to first 50 to avoid overwhelming response | |
| for item in data[:50]: | |
| if isinstance(item, dict): | |
| dbid = item.get('dbid') or item.get('id', 'N/A') | |
| text = item.get('text', 'N/A') | |
| result += f"• **{dbid}**: {text}\n" | |
| if item.get('type') == 't': | |
| result += " ↳ Use bfs_query_data with this datacube_id\n" | |
| if len(data) > 50: | |
| result += f"\n... and {len(data) - 50} more datacubes\n" | |
| else: | |
| result += json.dumps(data, indent=2) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error listing datacubes: {e}") | |
| return f"Error listing datacubes: {str(e)}" | |
| async def get_metadata(params: GetMetadataInput) -> str: | |
| """ | |
| Get metadata about a BFS datacube including dimensions and available values. | |
| Returns detailed information about a specific datacube including: | |
| - Title and description | |
| - Available dimensions (time, region, category, etc.) | |
| - Possible values for each dimension | |
| - Data structure information | |
| Use this before querying data to understand what filters are available. | |
| """ | |
| url = f"{BASE_URL}/{params.language.value}/{params.datacube_id}/{params.datacube_id}.px" | |
| try: | |
| client = get_client() | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| metadata = response.json() | |
| result = f"Metadata for {params.datacube_id}:\n\n" | |
| # Extract key information | |
| if "title" in metadata: | |
| result += f"Title: {metadata['title']}\n\n" | |
| if "variables" in metadata: | |
| result += "Available dimensions:\n" | |
| for var in metadata["variables"]: | |
| result += f"\n• {var.get('code', 'N/A')}: {var.get('text', 'N/A')}\n" | |
| if "values" in var and len(var["values"]) <= 10: | |
| result += f" Values: {', '.join(var['values'][:10])}\n" | |
| elif "values" in var: | |
| result += f" Values: {len(var['values'])} options available\n" | |
| result += f"\n\nFull metadata:\n{json.dumps(metadata, indent=2)}" | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error fetching metadata: {e}") | |
| return f"Error fetching metadata: {str(e)}" | |
| async def query_data(params: QueryDataInput) -> str: | |
| """ | |
| Query any BFS datacube with custom filters. | |
| Retrieve actual statistical data from a datacube. You can filter by: | |
| - Time periods (years, months, quarters) | |
| - Geographic regions (cantons, municipalities) | |
| - Categories (age groups, sectors, types, etc.) | |
| Returns data in the specified format (CSV, JSON, JSON-stat). | |
| Note: If no filters are provided, will attempt to return recent data. | |
| """ | |
| url = f"{BASE_URL}/{params.language.value}/{params.datacube_id}/{params.datacube_id}.px" | |
| # Build query | |
| query = { | |
| "query": [], | |
| "response": {"format": params.format.value} | |
| } | |
| # Convert filters to query format | |
| for f in params.filters: | |
| query["query"].append({ | |
| "code": f.code, | |
| "selection": { | |
| "filter": f.filter.value, | |
| "values": f.values | |
| } | |
| }) | |
| # If no filters, try to get recent/limited data | |
| if not params.filters: | |
| # Try to get metadata first to find a time dimension | |
| try: | |
| client = get_client() | |
| meta_response = await client.get(url) | |
| if meta_response.status_code == 200: | |
| metadata = meta_response.json() | |
| # Look for time-related dimension | |
| for var in metadata.get("variables", []): | |
| if var.get("code", "").lower() in ["jahr", "year", "zeit", "time", "periode"]: | |
| query["query"] = [{ | |
| "code": var["code"], | |
| "selection": {"filter": "top", "values": ["5"]} | |
| }] | |
| break | |
| except: | |
| pass | |
| try: | |
| client = get_client() | |
| response = await client.post(url, json=query) | |
| response.raise_for_status() | |
| if params.format == DataFormat.CSV: | |
| return response.text | |
| else: | |
| return json.dumps(response.json(), indent=2) | |
| except httpx.HTTPStatusError as e: | |
| error_msg = f"HTTP Error {e.response.status_code}: " | |
| try: | |
| error_detail = e.response.json() | |
| error_msg += json.dumps(error_detail, indent=2) | |
| except: | |
| error_msg += e.response.text | |
| logger.error(error_msg) | |
| return error_msg | |
| except Exception as e: | |
| logger.error(f"Error querying data: {e}") | |
| return f"Error querying data: {str(e)}" | |
| async def search_datacubes(params: SearchDatacubesInput) -> str: | |
| """ | |
| Search for BFS datacubes by topic keywords using built-in knowledge base. | |
| Find relevant datacubes for topics like: | |
| - Population statistics | |
| - Employment and unemployment | |
| - Education and science | |
| - Health statistics | |
| - Economic indicators | |
| - Inflation and prices | |
| - Energy consumption | |
| - Housing and construction | |
| Returns matching datacubes with descriptions. | |
| """ | |
| try: | |
| # Search in knowledge base | |
| keywords_lower = params.keywords.lower().strip() | |
| matches = [] | |
| # Split search keywords and match against knowledge base | |
| search_words = [w for w in keywords_lower.split() if len(w) > 2] | |
| # Check each keyword in knowledge base | |
| for keyword, datacubes in DATACUBE_KNOWLEDGE_BASE.items(): | |
| # Match if any search word appears in the knowledge base keyword | |
| if any(word in keyword for word in search_words) or any(keyword in word for word in search_words): | |
| for datacube_id, description in datacubes: | |
| # Avoid duplicates | |
| if not any(m['id'] == datacube_id for m in matches): | |
| matches.append({ | |
| 'id': datacube_id, | |
| 'text': description, | |
| 'keyword': keyword | |
| }) | |
| # Format results | |
| result = f"Search results for '{params.keywords}':\n\n" | |
| if matches: | |
| result += f"Found {len(matches)} matching datacube(s):\n\n" | |
| for i, match in enumerate(matches[:20], 1): # Limit to 20 results | |
| result += f"{i}. **{match['id']}**\n" | |
| result += f" {match['text']}\n" | |
| result += f" ↳ To get data: Use bfs_query_data(datacube_id='{match['id']}')\n" | |
| result += "\n" | |
| if len(matches) > 20: | |
| result += f"... and {len(matches) - 20} more results (showing first 20)\n" | |
| else: | |
| result += "No datacubes found matching your keywords.\n\n" | |
| result += "Try these topics: population, employment, unemployment, health, inflation, " | |
| result += "education, energy, housing, income, wages, prices, cost\n" | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error searching datacubes: {e}") | |
| return f"Error searching datacubes: {str(e)}" | |
| async def get_config(params: GetConfigInput) -> str: | |
| """ | |
| Get API configuration and limits. | |
| Returns information about the BFS API including: | |
| - API version | |
| - Rate limits | |
| - Data access restrictions | |
| - Available features | |
| """ | |
| url = f"{BASE_URL}/{params.language.value}/?config" | |
| try: | |
| client = get_client() | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| config = response.json() | |
| result = "BFS API Configuration:\n\n" | |
| result += json.dumps(config, indent=2) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error fetching config: {e}") | |
| return f"Error fetching config: {str(e)}" | |
| # Cleanup function | |
| async def cleanup(): | |
| """Cleanup resources on shutdown.""" | |
| global http_client | |
| if http_client: | |
| await http_client.aclose() | |
| http_client = None | |
| # Main execution | |
| if __name__ == "__main__": | |
| import atexit | |
| # Register cleanup to run when server exits | |
| def cleanup_sync(): | |
| import asyncio | |
| try: | |
| asyncio.run(cleanup()) | |
| except: | |
| pass | |
| atexit.register(cleanup_sync) | |
| # Run FastMCP server (synchronous, blocking call) | |
| mcp.run() | |