Spaces:
Runtime error
Runtime error
| """ | |
| Health Data Store - Persistent storage for health data | |
| Uses JSON files for simplicity, can be upgraded to SQLAlchemy + PostgreSQL | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import List, Optional, Dict, Any | |
| from pathlib import Path | |
| from .models import ( | |
| UserHealthProfile, HealthRecord, UserPreferences, | |
| FitnessProgress, HealthMetrics | |
| ) | |
| class HealthDataStore: | |
| """Persistent storage for all health data""" | |
| def __init__(self, data_dir: str = "health_data/storage"): | |
| self.data_dir = Path(data_dir) | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| # Create subdirectories | |
| (self.data_dir / "profiles").mkdir(exist_ok=True) | |
| (self.data_dir / "records").mkdir(exist_ok=True) | |
| (self.data_dir / "preferences").mkdir(exist_ok=True) | |
| (self.data_dir / "fitness").mkdir(exist_ok=True) | |
| (self.data_dir / "metrics").mkdir(exist_ok=True) | |
| # ===== User Profile Operations ===== | |
| def save_user_profile(self, profile: UserHealthProfile) -> None: | |
| """Save user profile to storage""" | |
| profile.updated_at = datetime.now() | |
| path = self.data_dir / "profiles" / f"{profile.user_id}.json" | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(profile.to_dict(), f, ensure_ascii=False, indent=2) | |
| def get_user_profile(self, user_id: str) -> Optional[UserHealthProfile]: | |
| """Get user profile from storage""" | |
| path = self.data_dir / "profiles" / f"{user_id}.json" | |
| if not path.exists(): | |
| return None | |
| with open(path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return UserHealthProfile.from_dict(data) | |
| def update_user_profile(self, user_id: str, **kwargs) -> None: | |
| """Update specific fields in user profile""" | |
| profile = self.get_user_profile(user_id) | |
| if not profile: | |
| profile = UserHealthProfile(user_id) | |
| for key, value in kwargs.items(): | |
| if hasattr(profile, key): | |
| setattr(profile, key, value) | |
| self.save_user_profile(profile) | |
| # ===== Health History Operations ===== | |
| def add_health_record(self, record: HealthRecord) -> None: | |
| """Add health record to storage""" | |
| user_dir = self.data_dir / "records" / record.user_id | |
| user_dir.mkdir(exist_ok=True) | |
| path = user_dir / f"{record.record_id}.json" | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(record.to_dict(), f, ensure_ascii=False, indent=2) | |
| def get_health_history(self, user_id: str, days: int = 30) -> List[HealthRecord]: | |
| """Get health history for user (last N days)""" | |
| user_dir = self.data_dir / "records" / user_id | |
| if not user_dir.exists(): | |
| return [] | |
| cutoff_date = datetime.now() - timedelta(days=days) | |
| records = [] | |
| for file_path in user_dir.glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| record = HealthRecord.from_dict(data) | |
| if record.timestamp >= cutoff_date: | |
| records.append(record) | |
| # Sort by timestamp descending | |
| records.sort(key=lambda r: r.timestamp, reverse=True) | |
| return records | |
| def get_records_by_type(self, user_id: str, record_type: str) -> List[HealthRecord]: | |
| """Get records of specific type""" | |
| all_records = self.get_health_history(user_id, days=365) | |
| return [r for r in all_records if r.record_type == record_type] | |
| # ===== Preferences Operations ===== | |
| def save_preferences(self, prefs: UserPreferences) -> None: | |
| """Save user preferences""" | |
| prefs.updated_at = datetime.now() | |
| path = self.data_dir / "preferences" / f"{prefs.user_id}.json" | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(prefs.to_dict(), f, ensure_ascii=False, indent=2) | |
| def get_preferences(self, user_id: str) -> Optional[UserPreferences]: | |
| """Get user preferences""" | |
| path = self.data_dir / "preferences" / f"{user_id}.json" | |
| if not path.exists(): | |
| return None | |
| with open(path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return UserPreferences.from_dict(data) | |
| # ===== Fitness Operations ===== | |
| def add_fitness_record(self, record: FitnessProgress) -> None: | |
| """Add fitness record""" | |
| user_dir = self.data_dir / "fitness" / record.user_id | |
| user_dir.mkdir(exist_ok=True) | |
| path = user_dir / f"{record.progress_id}.json" | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(record.to_dict(), f, ensure_ascii=False, indent=2) | |
| def get_fitness_history(self, user_id: str, days: int = 30) -> List[FitnessProgress]: | |
| """Get fitness history""" | |
| user_dir = self.data_dir / "fitness" / user_id | |
| if not user_dir.exists(): | |
| return [] | |
| cutoff_date = datetime.now() - timedelta(days=days) | |
| records = [] | |
| for file_path in user_dir.glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| record = FitnessProgress.from_dict(data) | |
| if record.timestamp >= cutoff_date: | |
| records.append(record) | |
| records.sort(key=lambda r: r.timestamp, reverse=True) | |
| return records | |
| # ===== Metrics Operations ===== | |
| def add_metric(self, metric: HealthMetrics) -> None: | |
| """Add health metric""" | |
| user_dir = self.data_dir / "metrics" / metric.user_id | |
| user_dir.mkdir(exist_ok=True) | |
| path = user_dir / f"{metric.metric_id}.json" | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(metric.to_dict(), f, ensure_ascii=False, indent=2) | |
| def get_metrics(self, user_id: str, metric_type: Optional[str] = None) -> List[HealthMetrics]: | |
| """Get health metrics""" | |
| user_dir = self.data_dir / "metrics" / user_id | |
| if not user_dir.exists(): | |
| return [] | |
| metrics = [] | |
| for file_path in user_dir.glob("*.json"): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| metric = HealthMetrics.from_dict(data) | |
| if metric_type is None or metric.metric_type == metric_type: | |
| metrics.append(metric) | |
| metrics.sort(key=lambda m: m.timestamp, reverse=True) | |
| return metrics | |
| # ===== Utility Methods ===== | |
| def export_user_data(self, user_id: str) -> Dict[str, Any]: | |
| """Export all user data""" | |
| profile = self.get_user_profile(user_id) | |
| history = self.get_health_history(user_id, days=365) | |
| preferences = self.get_preferences(user_id) | |
| fitness = self.get_fitness_history(user_id, days=365) | |
| metrics = self.get_metrics(user_id) | |
| return { | |
| 'profile': profile.to_dict() if profile else None, | |
| 'health_history': [r.to_dict() for r in history], | |
| 'preferences': preferences.to_dict() if preferences else None, | |
| 'fitness_history': [f.to_dict() for f in fitness], | |
| 'metrics': [m.to_dict() for m in metrics], | |
| 'exported_at': datetime.now().isoformat() | |
| } | |
| def delete_user_data(self, user_id: str) -> None: | |
| """Delete all user data (GDPR compliance)""" | |
| import shutil | |
| # Delete profile | |
| profile_path = self.data_dir / "profiles" / f"{user_id}.json" | |
| if profile_path.exists(): | |
| profile_path.unlink() | |
| # Delete records | |
| records_dir = self.data_dir / "records" / user_id | |
| if records_dir.exists(): | |
| shutil.rmtree(records_dir) | |
| # Delete preferences | |
| prefs_path = self.data_dir / "preferences" / f"{user_id}.json" | |
| if prefs_path.exists(): | |
| prefs_path.unlink() | |
| # Delete fitness | |
| fitness_dir = self.data_dir / "fitness" / user_id | |
| if fitness_dir.exists(): | |
| shutil.rmtree(fitness_dir) | |
| # Delete metrics | |
| metrics_dir = self.data_dir / "metrics" / user_id | |
| if metrics_dir.exists(): | |
| shutil.rmtree(metrics_dir) | |