""" Data Collector for Fine-tuning Collects and stores conversation data for training custom models """ import json import os from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional class ConversationDataCollector: """Collects conversation data for fine-tuning""" def __init__(self, data_dir: str = "fine_tuning/data"): self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories for each agent self.agent_dirs = { 'nutrition': self.data_dir / 'nutrition', 'exercise': self.data_dir / 'exercise', 'symptom': self.data_dir / 'symptom', 'mental_health': self.data_dir / 'mental_health', 'general_health': self.data_dir / 'general_health' } for agent_dir in self.agent_dirs.values(): agent_dir.mkdir(exist_ok=True) def log_conversation( self, agent_name: str, user_message: str, agent_response: str, user_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None ) -> None: """ Log a conversation turn for fine-tuning Args: agent_name: Name of the agent (nutrition, exercise, etc.) user_message: User's message agent_response: Agent's response user_data: User profile data (age, gender, etc.) metadata: Additional metadata (rating, feedback, etc.) """ conversation_entry = { 'timestamp': datetime.now().isoformat(), 'agent': agent_name, 'user_message': user_message, 'agent_response': agent_response, 'user_data': user_data or {}, 'metadata': metadata or {} } # Save to agent-specific file agent_key = agent_name.replace('_agent', '') if agent_key in self.agent_dirs: filename = f"conversations_{datetime.now().strftime('%Y%m%d')}.jsonl" filepath = self.agent_dirs[agent_key] / filename with open(filepath, 'a', encoding='utf-8') as f: f.write(json.dumps(conversation_entry, ensure_ascii=False) + '\n') def log_multi_turn_conversation( self, agent_name: str, conversation_history: List[tuple], user_data: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None ) -> None: """ Log a multi-turn conversation Args: agent_name: Name of the agent conversation_history: List of (user_msg, agent_msg) tuples user_data: User profile data metadata: Additional metadata """ multi_turn_entry = { 'timestamp': datetime.now().isoformat(), 'agent': agent_name, 'conversation': [ {'user': user_msg, 'agent': agent_msg} for user_msg, agent_msg in conversation_history ], 'user_data': user_data or {}, 'metadata': metadata or {} } agent_key = agent_name.replace('_agent', '') if agent_key in self.agent_dirs: filename = f"multi_turn_{datetime.now().strftime('%Y%m%d')}.jsonl" filepath = self.agent_dirs[agent_key] / filename with open(filepath, 'a', encoding='utf-8') as f: f.write(json.dumps(multi_turn_entry, ensure_ascii=False) + '\n') def get_conversation_count(self, agent_name: Optional[str] = None) -> Dict[str, int]: """ Get count of logged conversations Args: agent_name: Optional agent name to filter by Returns: Dict with agent names and conversation counts """ counts = {} agents_to_check = [agent_name.replace('_agent', '')] if agent_name else self.agent_dirs.keys() for agent_key in agents_to_check: if agent_key in self.agent_dirs: agent_dir = self.agent_dirs[agent_key] count = 0 for file in agent_dir.glob('conversations_*.jsonl'): with open(file, 'r', encoding='utf-8') as f: count += sum(1 for _ in f) counts[agent_key] = count return counts def export_for_openai_finetuning( self, agent_name: str, output_file: Optional[str] = None, min_quality_rating: Optional[float] = None ) -> str: """ Export conversations in OpenAI fine-tuning format Args: agent_name: Agent to export data for output_file: Output file path min_quality_rating: Minimum quality rating to include Returns: Path to exported file """ agent_key = agent_name.replace('_agent', '') if agent_key not in self.agent_dirs: raise ValueError(f"Unknown agent: {agent_name}") if output_file is None: output_file = self.data_dir / f"{agent_key}_finetuning_{datetime.now().strftime('%Y%m%d')}.jsonl" agent_dir = self.agent_dirs[agent_key] exported_count = 0 with open(output_file, 'w', encoding='utf-8') as out_f: # Process single-turn conversations for file in agent_dir.glob('conversations_*.jsonl'): with open(file, 'r', encoding='utf-8') as in_f: for line in in_f: entry = json.loads(line) # Filter by quality rating if specified if min_quality_rating: rating = entry.get('metadata', {}).get('rating') if rating is None or rating < min_quality_rating: continue # Convert to OpenAI format openai_format = { "messages": [ {"role": "system", "content": f"You are a {agent_key} specialist."}, {"role": "user", "content": entry['user_message']}, {"role": "assistant", "content": entry['agent_response']} ] } out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n') exported_count += 1 # Process multi-turn conversations for file in agent_dir.glob('multi_turn_*.jsonl'): with open(file, 'r', encoding='utf-8') as in_f: for line in in_f: entry = json.loads(line) # Filter by quality rating if specified if min_quality_rating: rating = entry.get('metadata', {}).get('rating') if rating is None or rating < min_quality_rating: continue # Convert to OpenAI format messages = [{"role": "system", "content": f"You are a {agent_key} specialist."}] for turn in entry['conversation']: messages.append({"role": "user", "content": turn['user']}) messages.append({"role": "assistant", "content": turn['agent']}) openai_format = {"messages": messages} out_f.write(json.dumps(openai_format, ensure_ascii=False) + '\n') exported_count += 1 print(f"✅ Exported {exported_count} conversations to {output_file}") return str(output_file) # Global instance _collector = None def get_data_collector() -> ConversationDataCollector: """Get global data collector instance""" global _collector if _collector is None: _collector = ConversationDataCollector() return _collector