Spaces:
Runtime error
Runtime error
File size: 11,977 Bytes
eeb0f9c |
|
"""
Personalization Engine - Progressive learning from user interactions
Continuously improves recommendations based on user data and feedback
"""
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from collections import Counter
import json
from health_data import HealthContext, UserPreferences
class PersonalizationEngine:
"""
Progressive personalization engine that learns from user interactions
Improves recommendations over time based on accumulated user data
"""
def __init__(self, health_context: HealthContext):
self.health_context = health_context
self.user_id = health_context.user_id
# ===== Pattern Analysis =====
def analyze_user_patterns(self, days: int = 90) -> Dict[str, Any]:
"""Analyze user interaction patterns"""
history = self.health_context.get_health_history(days)
if not history:
return {
'total_interactions': 0,
'interaction_types': {},
'most_common_topics': [],
'engagement_level': 'low'
}
# Count interactions by type
type_counts = Counter(r.record_type for r in history)
# Calculate engagement level
total_interactions = len(history)
days_active = len(set(r.timestamp.date() for r in history))
engagement_score = min(total_interactions / (days / 7), 1.0) # Normalize to 0-1
engagement_level = 'high' if engagement_score > 0.7 else 'medium' if engagement_score > 0.3 else 'low'
return {
'total_interactions': total_interactions,
'days_active': days_active,
'interaction_types': dict(type_counts),
'engagement_score': round(engagement_score, 2),
'engagement_level': engagement_level,
'most_common_topics': [t[0] for t in type_counts.most_common(3)]
}
def extract_preferences(self) -> UserPreferences:
"""Extract and update user preferences from interaction history"""
prefs = self.health_context.get_preferences()
# Analyze exercise preferences from history
exercise_records = self.health_context.get_records_by_type('exercise')
if exercise_records:
# Extract exercise types mentioned
exercise_types = set()
for record in exercise_records[-10:]: # Last 10 exercise records
if 'exercise_type' in record.data:
exercise_types.add(record.data['exercise_type'])
prefs.preferred_exercise_types = list(exercise_types)
# Analyze nutrition preferences
nutrition_records = self.health_context.get_records_by_type('nutrition')
if nutrition_records:
dietary_prefs = set()
for record in nutrition_records[-10:]:
if 'dietary_preference' in record.data:
dietary_prefs.add(record.data['dietary_preference'])
prefs.dietary_preferences = list(dietary_prefs)
# Analyze goals from history
goals = set()
for record in self.health_context.get_health_history(days=180):
if 'goal' in record.data:
goals.add(record.data['goal'])
prefs.goals = list(goals)
self.health_context.update_preferences(
preferred_exercise_types=prefs.preferred_exercise_types,
dietary_preferences=prefs.dietary_preferences,
goals=prefs.goals
)
return prefs
def identify_health_trends(self, days: int = 90) -> Dict[str, Any]:
"""Identify health trends from historical data"""
trends = {
'symptom_frequency': {},
'health_improvements': [],
'health_concerns': [],
'activity_trends': {}
}
# Analyze symptom frequency
symptom_records = self.health_context.get_records_by_type('symptom')
symptom_counts = Counter()
for record in symptom_records:
if 'symptom' in record.data:
symptom_counts[record.data['symptom']] += 1
trends['symptom_frequency'] = dict(symptom_counts.most_common(5))
# Analyze fitness trends
fitness_history = self.health_context.get_fitness_history(days)
if fitness_history:
total_workouts = len(fitness_history)
total_minutes = sum(f.duration_minutes for f in fitness_history)
avg_intensity = sum(1 for f in fitness_history if f.intensity == 'high') / total_workouts if total_workouts > 0 else 0
trends['activity_trends'] = {
'total_workouts': total_workouts,
'total_minutes': total_minutes,
'avg_intensity': round(avg_intensity, 2),
'adherence': self.health_context.get_workout_adherence(days)
}
return trends
def calculate_engagement_score(self, days: int = 30) -> float:
"""Calculate user engagement score (0-1)"""
patterns = self.analyze_user_patterns(days)
return patterns['engagement_score']
# ===== Adaptation Methods =====
def adapt_nutrition_plan(self, current_plan: Dict[str, Any]) -> Dict[str, Any]:
"""Adapt nutrition plan based on user history"""
adapted_plan = current_plan.copy()
# Get user preferences
prefs = self.health_context.get_preferences()
# Apply dietary restrictions
if prefs.dietary_preferences:
adapted_plan['dietary_preferences'] = prefs.dietary_preferences
# Analyze nutrition history for effectiveness
nutrition_records = self.health_context.get_records_by_type('nutrition')
if nutrition_records:
# Check if user is following recommendations
adherence = len(nutrition_records) / max(1, (30 / 7)) # Expected ~1 per week
adapted_plan['adherence_score'] = min(adherence, 1.0)
# Add personalization note
adapted_plan['personalized'] = True
adapted_plan['personalization_date'] = datetime.now().isoformat()
return adapted_plan
def adapt_exercise_plan(self, current_plan: Dict[str, Any]) -> Dict[str, Any]:
"""Adapt exercise plan based on progress"""
adapted_plan = current_plan.copy()
# Get fitness history
fitness_history = self.health_context.get_fitness_history(days=30)
if fitness_history:
# Calculate adherence
adherence = self.health_context.get_workout_adherence(days=30)
# Adjust difficulty based on adherence
if adherence > 0.8:
adapted_plan['difficulty_adjustment'] = 'increase'
adapted_plan['recommendation'] = 'Great adherence! Consider increasing intensity.'
elif adherence < 0.3:
adapted_plan['difficulty_adjustment'] = 'decrease'
adapted_plan['recommendation'] = 'Let\'s make the plan more manageable.'
else:
adapted_plan['difficulty_adjustment'] = 'maintain'
adapted_plan['recommendation'] = 'Keep up the good work!'
# Add exercise preferences
prefs = self.health_context.get_preferences()
if prefs.preferred_exercise_types:
adapted_plan['preferred_exercises'] = prefs.preferred_exercise_types
adapted_plan['personalized'] = True
adapted_plan['personalization_date'] = datetime.now().isoformat()
return adapted_plan
def adapt_communication_style(self) -> str:
"""Adapt communication style based on user interactions"""
patterns = self.analyze_user_patterns(days=30)
# Analyze interaction frequency
if patterns['engagement_level'] == 'high':
return 'detailed' # More detailed responses
elif patterns['engagement_level'] == 'low':
return 'brief' # Shorter, more concise responses
else:
return 'balanced' # Standard responses
def generate_personalized_insights(self) -> List[str]:
"""Generate personalized health insights"""
insights = []
# Analyze trends
trends = self.identify_health_trends(days=90)
# Symptom insights
if trends['symptom_frequency']:
top_symptom = list(trends['symptom_frequency'].keys())[0]
count = trends['symptom_frequency'][top_symptom]
insights.append(f"You've reported '{top_symptom}' {count} times in the last 90 days. Consider consulting a specialist.")
# Activity insights
if 'activity_trends' in trends and trends['activity_trends']:
activity = trends['activity_trends']
if activity['adherence'] > 0.7:
insights.append(f"Excellent fitness adherence! You've completed {activity['total_workouts']} workouts in the last month.")
elif activity['adherence'] < 0.3:
insights.append("Your fitness adherence is low. Let's create a more achievable plan together.")
# Goal progress
prefs = self.health_context.get_preferences()
if prefs.goals:
insights.append(f"Your current goals: {', '.join(prefs.goals)}")
return insights
# ===== Feedback Methods =====
def record_user_feedback(self, feedback_type: str, feedback_data: Dict[str, Any]) -> None:
"""Record user feedback for learning"""
feedback_record = {
'feedback_type': feedback_type, # helpful/not_helpful/confusing/etc
'data': feedback_data,
'timestamp': datetime.now().isoformat()
}
self.health_context.add_health_record(
'feedback',
feedback_record,
agent_name='personalization_engine',
confidence=1.0
)
def update_preferences_from_feedback(self) -> None:
"""Update preferences based on accumulated feedback"""
# Get recent feedback
feedback_records = self.health_context.get_records_by_type('feedback')
if not feedback_records:
return
# Analyze feedback patterns
helpful_count = sum(1 for r in feedback_records[-20:] if r.data.get('feedback_type') == 'helpful')
total_feedback = min(len(feedback_records), 20)
if total_feedback > 0:
helpfulness_score = helpful_count / total_feedback
# Update communication style if needed
if helpfulness_score < 0.3:
self.health_context.update_preferences(
communication_style='brief'
)
elif helpfulness_score > 0.7:
self.health_context.update_preferences(
communication_style='detailed'
)
def get_personalization_summary(self) -> Dict[str, Any]:
"""Get summary of personalization status"""
patterns = self.analyze_user_patterns()
trends = self.identify_health_trends()
prefs = self.extract_preferences()
return {
'engagement': patterns,
'trends': trends,
'preferences': {
'goals': prefs.goals,
'exercise_types': prefs.preferred_exercise_types,
'dietary_preferences': prefs.dietary_preferences,
'communication_style': prefs.communication_style
},
'insights': self.generate_personalized_insights(),
'last_updated': datetime.now().isoformat()
}
|