Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| import logging | |
| from app.models import SearchResponse | |
| from app.main import SessionManager | |
| from app.vectorstore import VectorStore | |
| from app.llm import OllamaMistral | |
| from app.embeddings import EmbeddingHandler | |
| class RAGSystem: | |
| """ | |
| Main RAG (Retrieval-Augmented Generation) system class that orchestrates: | |
| - Session management | |
| - Vector store operations | |
| - LLM interactions | |
| - Chat history management | |
| """ | |
| def __init__(self): | |
| """Initialize the RAG system components.""" | |
| self.logger = logging.getLogger(__name__) | |
| # Initialize session manager for tracking conversations | |
| self.session_manager = SessionManager() | |
| # Initialize vector store for document storage and retrieval | |
| self.vectorstore = VectorStore() # This will create its own EmbeddingHandler | |
| # Initialize LLM for generating responses | |
| self.llm = OllamaMistral() | |
| async def chat(self, session_id: str, query: str) -> SearchResponse: | |
| """ | |
| Handle a chat query using RAG pipeline: | |
| 1. Retrieve relevant documents | |
| 2. Generate response using LLM | |
| 3. Store conversation history | |
| Args: | |
| session_id: Unique identifier for the conversation session | |
| query: User's question/input | |
| Returns: | |
| SearchResponse: Contains answer, context, and sources | |
| """ | |
| print(f"[RAGSystem] Starting chat for session: {session_id} with query: {query}") | |
| try: | |
| # Search for relevant documents in vector store | |
| print(f"[RAGSystem] Searching for query: {query} in session: {session_id}") | |
| search_results = await self.vectorstore.search_similar( | |
| session_id=session_id, | |
| query=query, | |
| k=3 # Number of similar documents to retrieve | |
| ) | |
| print(f"[RAGSystem] Search results: {search_results}") | |
| if search_results["status"] == "error": | |
| return SearchResponse(**search_results) | |
| # Prepare context from search results | |
| context = "\n".join([ | |
| result["text"] | |
| for result in search_results["results"] | |
| if "text" in result | |
| ]) or "No relevant context found" | |
| # Generate response using LLM with context | |
| prompt = ( | |
| f"You are assisting with a website analysis. Here's relevant context from the website:\n" | |
| f"{context}\n\n" | |
| f"Question: {query}\n" | |
| f"Please provide a detailed answer based on the website content:" | |
| ) | |
| response = await self.llm.generate_response(prompt) | |
| # Save conversation to history | |
| self.session_manager.add_to_history( | |
| session_id=session_id, | |
| question=query, | |
| answer=response | |
| ) | |
| return SearchResponse( | |
| status="success", | |
| results=[{ | |
| "answer": response, | |
| "context": context, | |
| "sources": search_results["results"] | |
| }] | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error in chat: {str(e)}", exc_info=True) | |
| return SearchResponse( | |
| status="error", | |
| message=f"Chat error: {str(e)}" | |
| ) | |
| def _validate_session(self, session_id: str) -> bool: | |
| """ | |
| Validate and potentially initialize a session. | |
| Args: | |
| session_id: Session identifier to validate | |
| Returns: | |
| bool: True if session is valid/exists, False otherwise | |
| """ | |
| try: | |
| # Initialize session if it doesn't exist | |
| if not self.session_manager.session_exists(session_id): | |
| self.logger.info(f"Session {session_id} not found, initializing.") | |
| self.session_manager.get_session(session_id) | |
| # Verify vectorstore collection exists | |
| if not self.vectorstore.collection_exists(session_id): | |
| self.logger.warning(f"Vectorstore collection missing for session: {session_id}") | |
| return False | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Session validation failed: {str(e)}") | |
| return False | |
| async def get_chat_history(self, session_id: str, limit: int = 100) -> List[Dict]: | |
| """ | |
| Retrieve chat history for a session. | |
| Args: | |
| session_id: Session identifier | |
| limit: Maximum number of history items to return | |
| Returns: | |
| List[Dict]: Chat history entries or empty list if error occurs | |
| """ | |
| try: | |
| if not self._validate_session(session_id): | |
| return [] | |
| return self.session_manager.get_history(session_id, limit) | |
| except Exception as e: | |
| self.logger.error(f"Error getting chat history: {str(e)}") | |
| return [] |