from typing import List, Dict, Optional from datetime import datetime import logging from app.models import SearchResponse from app.main import SessionManager from app.vectorstore import VectorStore from app.llm import OllamaMistral from app.embeddings import EmbeddingHandler class RAGSystem: """ Main RAG (Retrieval-Augmented Generation) system class that orchestrates: - Session management - Vector store operations - LLM interactions - Chat history management """ def __init__(self): """Initialize the RAG system components.""" self.logger = logging.getLogger(__name__) # Initialize session manager for tracking conversations self.session_manager = SessionManager() # Initialize vector store for document storage and retrieval self.vectorstore = VectorStore() # This will create its own EmbeddingHandler # Initialize LLM for generating responses self.llm = OllamaMistral() async def chat(self, session_id: str, query: str) -> SearchResponse: """ Handle a chat query using RAG pipeline: 1. Retrieve relevant documents 2. Generate response using LLM 3. Store conversation history Args: session_id: Unique identifier for the conversation session query: User's question/input Returns: SearchResponse: Contains answer, context, and sources """ print(f"[RAGSystem] Starting chat for session: {session_id} with query: {query}") try: # Search for relevant documents in vector store print(f"[RAGSystem] Searching for query: {query} in session: {session_id}") search_results = await self.vectorstore.search_similar( session_id=session_id, query=query, k=3 # Number of similar documents to retrieve ) print(f"[RAGSystem] Search results: {search_results}") if search_results["status"] == "error": return SearchResponse(**search_results) # Prepare context from search results context = "\n".join([ result["text"] for result in search_results["results"] if "text" in result ]) or "No relevant context found" # Generate response using LLM with context prompt = ( f"You are assisting with a website analysis. Here's relevant context from the website:\n" f"{context}\n\n" f"Question: {query}\n" f"Please provide a detailed answer based on the website content:" ) response = await self.llm.generate_response(prompt) # Save conversation to history self.session_manager.add_to_history( session_id=session_id, question=query, answer=response ) return SearchResponse( status="success", results=[{ "answer": response, "context": context, "sources": search_results["results"] }] ) except Exception as e: self.logger.error(f"Error in chat: {str(e)}", exc_info=True) return SearchResponse( status="error", message=f"Chat error: {str(e)}" ) def _validate_session(self, session_id: str) -> bool: """ Validate and potentially initialize a session. Args: session_id: Session identifier to validate Returns: bool: True if session is valid/exists, False otherwise """ try: # Initialize session if it doesn't exist if not self.session_manager.session_exists(session_id): self.logger.info(f"Session {session_id} not found, initializing.") self.session_manager.get_session(session_id) # Verify vectorstore collection exists if not self.vectorstore.collection_exists(session_id): self.logger.warning(f"Vectorstore collection missing for session: {session_id}") return False return True except Exception as e: self.logger.error(f"Session validation failed: {str(e)}") return False async def get_chat_history(self, session_id: str, limit: int = 100) -> List[Dict]: """ Retrieve chat history for a session. Args: session_id: Session identifier limit: Maximum number of history items to return Returns: List[Dict]: Chat history entries or empty list if error occurs """ try: if not self._validate_session(session_id): return [] return self.session_manager.get_history(session_id, limit) except Exception as e: self.logger.error(f"Error getting chat history: {str(e)}") return []