cojournalist-data / mcp_bfs /bfs_mcp_server.py
Tom
Implement engine-per-dataset architecture with argument sanitization and enhanced UI
81d39a3
#!/usr/bin/env python3
"""
Swiss BFS API MCP Server
Provides broad access to Swiss Federal Statistical Office data via PxWeb API
Refactored to use FastMCP for consistency with OpenParlData server.
"""
import asyncio
import json
import logging
from typing import Dict, List, Any, Optional
from enum import Enum
import httpx
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field, ConfigDict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("swiss-bfs-api")
# API Configuration
BASE_URL = "https://www.pxweb.bfs.admin.ch/api/v1"
class Language(str, Enum):
DE = "de"
FR = "fr"
IT = "it"
EN = "en"
class DataFormat(str, Enum):
CSV = "csv"
JSON = "json"
JSON_STAT = "json-stat"
JSON_STAT2 = "json-stat2"
PX = "px"
class FilterType(str, Enum):
ALL = "all"
ITEM = "item"
TOP = "top"
# Datacube knowledge base: Maps keywords to known datacube IDs with descriptions
# This helps with semantic search since the API only returns cryptic IDs
DATACUBE_KNOWLEDGE_BASE = {
# Population & Demographics (px-x-01)
"population": [
("px-x-0102010000_101", "Permanent resident population by canton"),
("px-x-0102020000_101", "Population by age and sex"),
("px-x-0102020202_106", "Population statistics and scenarios"),
("px-x-0102020300_101", "Population growth and change"),
],
"demographics": [
("px-x-0102010000_101", "Permanent resident population by canton"),
("px-x-0102020000_101", "Population by age and sex"),
],
"birth": [
("px-x-0102020000_101", "Birth rates and statistics"),
],
"death": [
("px-x-0102020000_101", "Mortality rates and statistics"),
],
# Employment & Labor (px-x-03)
"employment": [
("px-x-0301000000_103", "Employment by sector"),
("px-x-0301000000_104", "Employment statistics"),
],
"unemployment": [
("px-x-0301000000_103", "Unemployment rates"),
],
"labor": [
("px-x-0301000000_103", "Labor market statistics"),
],
"work": [
("px-x-0301000000_103", "Employment and work statistics"),
],
# Prices & Inflation (px-x-05)
"inflation": [
("px-x-0502010000_101", "Consumer price index (CPI)"),
],
"prices": [
("px-x-0502010000_101", "Price statistics and indices"),
],
"cost": [
("px-x-0502010000_101", "Cost of living indices"),
],
# Income & Consumption (px-x-20)
"income": [
("px-x-2105000000_101", "Income distribution"),
("px-x-2105000000_102", "Household income"),
],
"wages": [
("px-x-2105000000_101", "Wage statistics"),
],
"salary": [
("px-x-2105000000_101", "Salary and compensation"),
],
# Education (px-x-15)
"education": [
("px-x-1502010000_101", "Education statistics"),
("px-x-1502010100_101", "Students and schools"),
],
"students": [
("px-x-1502010100_101", "Student enrollment"),
],
"schools": [
("px-x-1502010100_101", "School statistics"),
],
"university": [
("px-x-1502010100_101", "Higher education statistics"),
],
# Health (px-x-14)
"health": [
("px-x-1404010100_101", "Health statistics"),
("px-x-1404050000_101", "Healthcare costs"),
],
"hospital": [
("px-x-1404010100_101", "Hospital statistics"),
],
"medical": [
("px-x-1404010100_101", "Medical care statistics"),
],
# Energy (px-x-07)
"energy": [
("px-x-0702000000_101", "Energy statistics"),
],
"electricity": [
("px-x-0702000000_101", "Electricity production and consumption"),
],
"power": [
("px-x-0702000000_101", "Power generation"),
],
# Housing (px-x-09)
"housing": [
("px-x-0902020100_104", "Housing statistics"),
],
"rent": [
("px-x-0902020100_104", "Rental prices"),
],
"construction": [
("px-x-0902020100_104", "Construction statistics"),
],
}
# Global HTTP client
http_client: Optional[httpx.AsyncClient] = None
def get_client() -> httpx.AsyncClient:
"""Get or create HTTP client."""
global http_client
if http_client is None:
http_client = httpx.AsyncClient(
timeout=60.0,
headers={
"User-Agent": "Mozilla/5.0 (compatible; BFS-MCP/1.0; +https://github.com/user/bfs-mcp)",
"Accept": "application/json",
"Accept-Language": "en,de,fr,it"
}
)
return http_client
# Pydantic models for input validation
class ListDatacubesInput(BaseModel):
"""Input for listing BFS datacubes."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid')
path: str = Field("", description="Category path to explore (e.g., '' for root, 'px-x-01' for population)")
language: Language = Field(Language.EN, description="Response language")
class GetMetadataInput(BaseModel):
"""Input for getting datacube metadata."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid')
datacube_id: str = Field(..., description="The BFS datacube identifier (e.g., px-x-0102030000_101)", min_length=1)
language: Language = Field(Language.EN, description="Response language")
class DimensionFilter(BaseModel):
"""Filter for a single dimension."""
code: str = Field(..., description="Dimension code (e.g., 'Jahr', 'Region', 'Geschlecht')")
filter: FilterType = Field(..., description="Filter type")
values: List[str] = Field(..., description="Values to select")
class QueryDataInput(BaseModel):
"""Input for querying BFS datacube data."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid')
datacube_id: str = Field(..., description="The BFS datacube identifier", min_length=1)
filters: List[DimensionFilter] = Field(default=[], description="Query filters for dimensions")
format: DataFormat = Field(DataFormat.CSV, description="Response format")
language: Language = Field(Language.EN, description="Response language")
class SearchDatacubesInput(BaseModel):
"""Input for searching BFS datacubes."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid')
keywords: str = Field(..., description="Search keywords (e.g., 'inflation', 'employment', 'education', 'health')", min_length=1)
language: Language = Field(Language.EN, description="Response language")
class GetConfigInput(BaseModel):
"""Input for getting API configuration."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True, extra='forbid')
language: Language = Field(Language.EN, description="Response language")
# Tool implementations
@mcp.tool(
name="bfs_list_datacubes",
annotations={
"title": "List BFS Datacubes",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def list_datacubes(params: ListDatacubesInput) -> str:
"""
List available datacubes from a BFS category path.
Browse the Swiss Federal Statistical Office data catalog by category.
The BFS API has datacube IDs at the root level.
Examples:
- List all datacubes: path=""
- Get specific datacube: path="px-x-0102030000_101"
"""
url = f"{BASE_URL}/{params.language.value}"
if params.path:
url += f"/{params.path}"
try:
client = get_client()
response = await client.get(url)
response.raise_for_status()
data = response.json()
result = f"Available datacubes (showing first 50):\n\n"
if isinstance(data, list):
# Limit to first 50 to avoid overwhelming response
for item in data[:50]:
if isinstance(item, dict):
dbid = item.get('dbid') or item.get('id', 'N/A')
text = item.get('text', 'N/A')
result += f"• **{dbid}**: {text}\n"
if item.get('type') == 't':
result += " ↳ Use bfs_query_data with this datacube_id\n"
if len(data) > 50:
result += f"\n... and {len(data) - 50} more datacubes\n"
else:
result += json.dumps(data, indent=2)
return result
except Exception as e:
logger.error(f"Error listing datacubes: {e}")
return f"Error listing datacubes: {str(e)}"
@mcp.tool(
name="bfs_get_metadata",
annotations={
"title": "Get BFS Datacube Metadata",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def get_metadata(params: GetMetadataInput) -> str:
"""
Get metadata about a BFS datacube including dimensions and available values.
Returns detailed information about a specific datacube including:
- Title and description
- Available dimensions (time, region, category, etc.)
- Possible values for each dimension
- Data structure information
Use this before querying data to understand what filters are available.
"""
url = f"{BASE_URL}/{params.language.value}/{params.datacube_id}/{params.datacube_id}.px"
try:
client = get_client()
response = await client.get(url)
response.raise_for_status()
metadata = response.json()
result = f"Metadata for {params.datacube_id}:\n\n"
# Extract key information
if "title" in metadata:
result += f"Title: {metadata['title']}\n\n"
if "variables" in metadata:
result += "Available dimensions:\n"
for var in metadata["variables"]:
result += f"\n• {var.get('code', 'N/A')}: {var.get('text', 'N/A')}\n"
if "values" in var and len(var["values"]) <= 10:
result += f" Values: {', '.join(var['values'][:10])}\n"
elif "values" in var:
result += f" Values: {len(var['values'])} options available\n"
result += f"\n\nFull metadata:\n{json.dumps(metadata, indent=2)}"
return result
except Exception as e:
logger.error(f"Error fetching metadata: {e}")
return f"Error fetching metadata: {str(e)}"
@mcp.tool(
name="bfs_query_data",
annotations={
"title": "Query BFS Datacube Data",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def query_data(params: QueryDataInput) -> str:
"""
Query any BFS datacube with custom filters.
Retrieve actual statistical data from a datacube. You can filter by:
- Time periods (years, months, quarters)
- Geographic regions (cantons, municipalities)
- Categories (age groups, sectors, types, etc.)
Returns data in the specified format (CSV, JSON, JSON-stat).
Note: If no filters are provided, will attempt to return recent data.
"""
url = f"{BASE_URL}/{params.language.value}/{params.datacube_id}/{params.datacube_id}.px"
# Build query
query = {
"query": [],
"response": {"format": params.format.value}
}
# Convert filters to query format
for f in params.filters:
query["query"].append({
"code": f.code,
"selection": {
"filter": f.filter.value,
"values": f.values
}
})
# If no filters, try to get recent/limited data
if not params.filters:
# Try to get metadata first to find a time dimension
try:
client = get_client()
meta_response = await client.get(url)
if meta_response.status_code == 200:
metadata = meta_response.json()
# Look for time-related dimension
for var in metadata.get("variables", []):
if var.get("code", "").lower() in ["jahr", "year", "zeit", "time", "periode"]:
query["query"] = [{
"code": var["code"],
"selection": {"filter": "top", "values": ["5"]}
}]
break
except:
pass
try:
client = get_client()
response = await client.post(url, json=query)
response.raise_for_status()
if params.format == DataFormat.CSV:
return response.text
else:
return json.dumps(response.json(), indent=2)
except httpx.HTTPStatusError as e:
error_msg = f"HTTP Error {e.response.status_code}: "
try:
error_detail = e.response.json()
error_msg += json.dumps(error_detail, indent=2)
except:
error_msg += e.response.text
logger.error(error_msg)
return error_msg
except Exception as e:
logger.error(f"Error querying data: {e}")
return f"Error querying data: {str(e)}"
@mcp.tool(
name="bfs_search",
annotations={
"title": "Search BFS Datacubes",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def search_datacubes(params: SearchDatacubesInput) -> str:
"""
Search for BFS datacubes by topic keywords using built-in knowledge base.
Find relevant datacubes for topics like:
- Population statistics
- Employment and unemployment
- Education and science
- Health statistics
- Economic indicators
- Inflation and prices
- Energy consumption
- Housing and construction
Returns matching datacubes with descriptions.
"""
try:
# Search in knowledge base
keywords_lower = params.keywords.lower().strip()
matches = []
# Split search keywords and match against knowledge base
search_words = [w for w in keywords_lower.split() if len(w) > 2]
# Check each keyword in knowledge base
for keyword, datacubes in DATACUBE_KNOWLEDGE_BASE.items():
# Match if any search word appears in the knowledge base keyword
if any(word in keyword for word in search_words) or any(keyword in word for word in search_words):
for datacube_id, description in datacubes:
# Avoid duplicates
if not any(m['id'] == datacube_id for m in matches):
matches.append({
'id': datacube_id,
'text': description,
'keyword': keyword
})
# Format results
result = f"Search results for '{params.keywords}':\n\n"
if matches:
result += f"Found {len(matches)} matching datacube(s):\n\n"
for i, match in enumerate(matches[:20], 1): # Limit to 20 results
result += f"{i}. **{match['id']}**\n"
result += f" {match['text']}\n"
result += f" ↳ To get data: Use bfs_query_data(datacube_id='{match['id']}')\n"
result += "\n"
if len(matches) > 20:
result += f"... and {len(matches) - 20} more results (showing first 20)\n"
else:
result += "No datacubes found matching your keywords.\n\n"
result += "Try these topics: population, employment, unemployment, health, inflation, "
result += "education, energy, housing, income, wages, prices, cost\n"
return result
except Exception as e:
logger.error(f"Error searching datacubes: {e}")
return f"Error searching datacubes: {str(e)}"
@mcp.tool(
name="bfs_get_config",
annotations={
"title": "Get BFS API Configuration",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def get_config(params: GetConfigInput) -> str:
"""
Get API configuration and limits.
Returns information about the BFS API including:
- API version
- Rate limits
- Data access restrictions
- Available features
"""
url = f"{BASE_URL}/{params.language.value}/?config"
try:
client = get_client()
response = await client.get(url)
response.raise_for_status()
config = response.json()
result = "BFS API Configuration:\n\n"
result += json.dumps(config, indent=2)
return result
except Exception as e:
logger.error(f"Error fetching config: {e}")
return f"Error fetching config: {str(e)}"
# Cleanup function
async def cleanup():
"""Cleanup resources on shutdown."""
global http_client
if http_client:
await http_client.aclose()
http_client = None
# Main execution
if __name__ == "__main__":
import atexit
# Register cleanup to run when server exits
def cleanup_sync():
import asyncio
try:
asyncio.run(cleanup())
except:
pass
atexit.register(cleanup_sync)
# Run FastMCP server (synchronous, blocking call)
mcp.run()