ConvoBot / src /rag.py
ashish-ninehertz
changes
e272f4f
from typing import List, Dict, Optional
from datetime import datetime
import logging
from app.models import SearchResponse
from app.main import SessionManager
from app.vectorstore import VectorStore
from app.llm import OllamaMistral
from app.embeddings import EmbeddingHandler
class RAGSystem:
"""
Main RAG (Retrieval-Augmented Generation) system class that orchestrates:
- Session management
- Vector store operations
- LLM interactions
- Chat history management
"""
def __init__(self):
"""Initialize the RAG system components."""
self.logger = logging.getLogger(__name__)
# Initialize session manager for tracking conversations
self.session_manager = SessionManager()
# Initialize vector store for document storage and retrieval
self.vectorstore = VectorStore() # This will create its own EmbeddingHandler
# Initialize LLM for generating responses
self.llm = OllamaMistral()
async def chat(self, session_id: str, query: str) -> SearchResponse:
"""
Handle a chat query using RAG pipeline:
1. Retrieve relevant documents
2. Generate response using LLM
3. Store conversation history
Args:
session_id: Unique identifier for the conversation session
query: User's question/input
Returns:
SearchResponse: Contains answer, context, and sources
"""
print(f"[RAGSystem] Starting chat for session: {session_id} with query: {query}")
try:
# Search for relevant documents in vector store
print(f"[RAGSystem] Searching for query: {query} in session: {session_id}")
search_results = await self.vectorstore.search_similar(
session_id=session_id,
query=query,
k=3 # Number of similar documents to retrieve
)
print(f"[RAGSystem] Search results: {search_results}")
if search_results["status"] == "error":
return SearchResponse(**search_results)
# Prepare context from search results
context = "\n".join([
result["text"]
for result in search_results["results"]
if "text" in result
]) or "No relevant context found"
# Generate response using LLM with context
prompt = (
f"You are assisting with a website analysis. Here's relevant context from the website:\n"
f"{context}\n\n"
f"Question: {query}\n"
f"Please provide a detailed answer based on the website content:"
)
response = await self.llm.generate_response(prompt)
# Save conversation to history
self.session_manager.add_to_history(
session_id=session_id,
question=query,
answer=response
)
return SearchResponse(
status="success",
results=[{
"answer": response,
"context": context,
"sources": search_results["results"]
}]
)
except Exception as e:
self.logger.error(f"Error in chat: {str(e)}", exc_info=True)
return SearchResponse(
status="error",
message=f"Chat error: {str(e)}"
)
def _validate_session(self, session_id: str) -> bool:
"""
Validate and potentially initialize a session.
Args:
session_id: Session identifier to validate
Returns:
bool: True if session is valid/exists, False otherwise
"""
try:
# Initialize session if it doesn't exist
if not self.session_manager.session_exists(session_id):
self.logger.info(f"Session {session_id} not found, initializing.")
self.session_manager.get_session(session_id)
# Verify vectorstore collection exists
if not self.vectorstore.collection_exists(session_id):
self.logger.warning(f"Vectorstore collection missing for session: {session_id}")
return False
return True
except Exception as e:
self.logger.error(f"Session validation failed: {str(e)}")
return False
async def get_chat_history(self, session_id: str, limit: int = 100) -> List[Dict]:
"""
Retrieve chat history for a session.
Args:
session_id: Session identifier
limit: Maximum number of history items to return
Returns:
List[Dict]: Chat history entries or empty list if error occurs
"""
try:
if not self._validate_session(session_id):
return []
return self.session_manager.get_history(session_id, limit)
except Exception as e:
self.logger.error(f"Error getting chat history: {str(e)}")
return []