Spaces:
Sleeping
Sleeping
| """ | |
| Application factory for creating and configuring the Flask app. | |
| This approach allows for easier testing and management of application state. | |
| """ | |
| import concurrent.futures | |
| import logging | |
| import os | |
| import time | |
| from typing import Any, Dict | |
| from dotenv import load_dotenv | |
| from flask import Flask, jsonify, render_template, request | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| class InitializationTimeoutError(Exception): | |
| """Custom exception for initialization timeouts.""" | |
| pass | |
| def ensure_embeddings_on_startup(): | |
| """ | |
| Ensure embeddings exist and have the correct dimension on app startup. | |
| This is critical for Hugging Face deployments where the vector store needs to be built on startup. | |
| Uses a file-based lock to prevent race conditions between workers. | |
| """ | |
| import fcntl | |
| logging.info(f"[PID {os.getpid()}] Starting ensure_embeddings_on_startup function") | |
| lock_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "locks") | |
| if not os.path.exists(lock_dir): | |
| try: | |
| os.makedirs(lock_dir) | |
| logging.info(f"[PID {os.getpid()}] Created lock directory: {lock_dir}") | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to create lock directory: {e}") | |
| return | |
| lock_file = os.path.join(lock_dir, "ingestion.lock") | |
| lock_timeout = 300 # 5 minutes for Hugging Face with more resources | |
| logging.info(f"[PID {os.getpid()}] Attempting to acquire lock: {lock_file}") | |
| # Use proper file locking with fcntl for better reliability | |
| try: | |
| lock_fd = open(lock_file, "w") | |
| fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) | |
| logging.info(f"[PID {os.getpid()}] Successfully acquired exclusive lock") | |
| # Write PID to lock file for debugging | |
| lock_fd.write(f"{os.getpid()}\n") | |
| lock_fd.flush() | |
| except (IOError, OSError): | |
| logging.info(f"[PID {os.getpid()}] Lock is held by another process, waiting...") | |
| lock_fd.close() | |
| # Wait for lock to be released | |
| start_time = time.time() | |
| while time.time() - start_time < lock_timeout: | |
| try: | |
| lock_fd = open(lock_file, "w") | |
| fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) | |
| logging.info(f"[PID {os.getpid()}] Lock acquired after waiting {time.time() - start_time:.1f}s") | |
| break | |
| except (IOError, OSError): | |
| lock_fd.close() | |
| time.sleep(2) | |
| else: | |
| logging.error(f"[PID {os.getpid()}] Timeout waiting for lock after {lock_timeout}s") | |
| return | |
| try: | |
| logging.info(f"[PID {os.getpid()}] Lock acquired, starting ingestion process") | |
| from src.config import ( | |
| COLLECTION_NAME, | |
| CORPUS_DIRECTORY, | |
| DEFAULT_CHUNK_SIZE, | |
| DEFAULT_OVERLAP, | |
| EMBEDDING_DIMENSION, | |
| EMBEDDING_MODEL_NAME, | |
| RANDOM_SEED, | |
| VECTOR_DB_PERSIST_PATH, | |
| ) | |
| from src.ingestion.ingestion_pipeline import IngestionPipeline | |
| from src.vector_store.vector_db import VectorDatabase | |
| logging.info(f"[PID {os.getpid()}] Imported modules successfully") | |
| logging.info(f"[PID {os.getpid()}] Checking vector store at: {VECTOR_DB_PERSIST_PATH}") | |
| logging.info(f"[PID {os.getpid()}] Collection name: {COLLECTION_NAME}") | |
| logging.info(f"[PID {os.getpid()}] Corpus directory: {CORPUS_DIRECTORY}") | |
| logging.info(f"[PID {os.getpid()}] Expected embedding dimension: {EMBEDDING_DIMENSION}") | |
| # Initialize vector database to check its state | |
| try: | |
| vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) | |
| logging.info(f"[PID {os.getpid()}] Vector database initialized successfully") | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to initialize vector database: {e}") | |
| raise | |
| # Check if embeddings exist and have correct dimension | |
| try: | |
| current_count = vector_db.get_count() | |
| current_dimension = vector_db.get_embedding_dimension() | |
| logging.info( | |
| f"[PID {os.getpid()}] Current database state: {current_count} embeddings, dimension {current_dimension}" | |
| ) | |
| has_valid = vector_db.has_valid_embeddings(EMBEDDING_DIMENSION) | |
| logging.info(f"[PID {os.getpid()}] Has valid embeddings: {has_valid}") | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to check vector database state: {e}") | |
| # Assume we need to rebuild | |
| has_valid = False | |
| current_count = 0 | |
| current_dimension = 0 | |
| if not has_valid: | |
| logging.warning( | |
| f"[PID {os.getpid()}] Vector store is empty or has wrong dimension. " | |
| f"Expected: {EMBEDDING_DIMENSION}, Current: {current_dimension}, " | |
| f"Count: {current_count}" | |
| ) | |
| logging.info(f"[PID {os.getpid()}] Starting ingestion pipeline with model: {EMBEDDING_MODEL_NAME}") | |
| # Check if corpus directory exists | |
| if not os.path.exists(CORPUS_DIRECTORY): | |
| logging.error(f"[PID {os.getpid()}] Corpus directory does not exist: {CORPUS_DIRECTORY}") | |
| return | |
| corpus_files = os.listdir(CORPUS_DIRECTORY) | |
| logging.info(f"[PID {os.getpid()}] Found {len(corpus_files)} files in corpus directory") | |
| # Run ingestion pipeline to rebuild embeddings | |
| try: | |
| ingestion_pipeline = IngestionPipeline( | |
| chunk_size=DEFAULT_CHUNK_SIZE, | |
| overlap=DEFAULT_OVERLAP, | |
| seed=RANDOM_SEED, | |
| store_embeddings=True, | |
| ) | |
| logging.info(f"[PID {os.getpid()}] Ingestion pipeline created successfully") | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to create ingestion pipeline: {e}") | |
| raise | |
| # Process the corpus directory | |
| try: | |
| logging.info(f"[PID {os.getpid()}] Starting to process corpus directory...") | |
| results = ingestion_pipeline.process_directory(CORPUS_DIRECTORY) | |
| logging.info(f"[PID {os.getpid()}] Process directory completed, got results: {type(results)}") | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed during directory processing: {e}", exc_info=True) | |
| raise | |
| if not results or len(results) == 0: | |
| logging.error( | |
| f"[PID {os.getpid()}] Ingestion failed or processed 0 chunks. " | |
| "Please check the corpus directory and ingestion pipeline for errors." | |
| ) | |
| else: | |
| logging.info(f"[PID {os.getpid()}] Ingestion completed successfully: {len(results)} chunks processed") | |
| # Verify the embeddings were actually stored | |
| try: | |
| final_count = vector_db.get_count() | |
| final_dimension = vector_db.get_embedding_dimension() | |
| logging.info( | |
| f"[PID {os.getpid()}] Final database state: {final_count} embeddings, " | |
| f"dimension {final_dimension}" | |
| ) | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to verify final database state: {e}") | |
| else: | |
| logging.info( | |
| f"[PID {os.getpid()}] Vector store is valid with {current_count} embeddings " | |
| f"of dimension {current_dimension}" | |
| ) | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to ensure embeddings on startup: {e}", exc_info=True) | |
| # Don't crash the app, but log the error | |
| # The app will still start but searches may fail | |
| finally: | |
| # Release lock | |
| try: | |
| fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN) | |
| lock_fd.close() | |
| logging.info(f"[PID {os.getpid()}] Released ingestion lock") | |
| except Exception as e: | |
| logging.error(f"[PID {os.getpid()}] Failed to release lock: {e}") | |
| def create_app( | |
| config_name: str = "default", | |
| initialize_vectordb: bool = True, | |
| initialize_llm: bool = True, | |
| ) -> Flask: | |
| """ | |
| Create the Flask application with all necessary configuration. | |
| Args: | |
| config_name: Configuration name to use (default, test, production) | |
| initialize_vectordb: Whether to initialize vector database connection | |
| initialize_llm: Whether to initialize LLM | |
| Returns: | |
| Configured Flask application | |
| """ | |
| try: | |
| # Initialize Render-specific monitoring if running on Render | |
| # (optional - don't break CI) | |
| is_render = os.environ.get("RENDER", "0") == "1" | |
| memory_monitoring_enabled = False | |
| # Only enable memory monitoring if explicitly requested or on Render | |
| if is_render or os.environ.get("ENABLE_MEMORY_MONITORING", "0") == "1": | |
| try: | |
| from src.utils.memory_utils import ( | |
| clean_memory, | |
| log_memory_checkpoint, | |
| start_periodic_memory_logger, | |
| start_tracemalloc, | |
| ) | |
| # Initialize advanced memory diagnostics if enabled | |
| try: | |
| start_tracemalloc() | |
| logger.info("tracemalloc started successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to start tracemalloc: {e}") | |
| # Use Render-specific monitoring if running on Render | |
| if is_render: | |
| try: | |
| from src.utils.render_monitoring import init_render_monitoring | |
| # Set shorter intervals for memory logging on Render | |
| init_render_monitoring(log_interval=10) | |
| logger.info("Render-specific memory monitoring activated") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize Render monitoring: {e}") | |
| else: | |
| # Use standard memory logging for local development | |
| try: | |
| start_periodic_memory_logger(interval_seconds=int(os.getenv("MEMORY_LOG_INTERVAL", "60"))) | |
| logger.info("Periodic memory logging started") | |
| except Exception as e: | |
| logger.warning(f"Failed to start periodic memory logger: {e}") | |
| # Clean memory at start | |
| try: | |
| clean_memory("App startup") | |
| log_memory_checkpoint("post_startup_cleanup") | |
| logger.info("Initial memory cleanup completed") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean memory at startup: {e}") | |
| memory_monitoring_enabled = True | |
| except ImportError as e: | |
| logger.warning(f"Memory monitoring dependencies not available: {e}") | |
| except Exception as e: | |
| logger.warning(f"Memory monitoring initialization failed: {e}") | |
| else: | |
| logger.info("Memory monitoring disabled (not on Render and not explicitly enabled)") | |
| logger.info( | |
| "App factory initialization complete (memory_monitoring=%s)", | |
| memory_monitoring_enabled, | |
| ) | |
| # Proactively disable ChromaDB telemetry | |
| os.environ.setdefault("ANONYMIZED_TELEMETRY", "False") | |
| os.environ.setdefault("CHROMA_TELEMETRY", "False") | |
| # Attempt to configure chromadb and monkeypatch telemetry | |
| try: | |
| import chromadb | |
| try: | |
| chromadb.configure(anonymized_telemetry=False) | |
| except Exception: | |
| pass # Non-fatal | |
| try: | |
| from chromadb.telemetry.product import posthog as _posthog | |
| if hasattr(_posthog, "capture"): | |
| setattr(_posthog, "capture", lambda *args, **kwargs: None) | |
| if hasattr(_posthog, "Posthog") and hasattr(_posthog.Posthog, "capture"): | |
| setattr(_posthog.Posthog, "capture", lambda *args, **kwargs: None) | |
| except Exception: | |
| pass # Non-fatal | |
| except Exception: | |
| pass # chromadb not installed | |
| # Get the absolute path to the project root directory (parent of src) | |
| project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| template_dir = os.path.join(project_root, "templates") | |
| static_dir = os.path.join(project_root, "static") | |
| app = Flask(__name__, template_folder=template_dir, static_folder=static_dir) | |
| # Force garbage collection after initialization | |
| # (only if memory monitoring is enabled) | |
| if memory_monitoring_enabled: | |
| try: | |
| from src.utils.memory_utils import clean_memory | |
| clean_memory("Post-initialization") | |
| except Exception as e: | |
| logger.warning(f"Post-initialization memory cleanup failed: {e}") | |
| # Add memory circuit breaker | |
| # Only add memory monitoring middleware if memory monitoring is enabled | |
| if memory_monitoring_enabled: | |
| def check_memory(): | |
| try: | |
| # Ensure we have the necessary functions imported | |
| from src.utils.memory_utils import clean_memory, log_memory_usage | |
| try: | |
| memory_mb = log_memory_usage("Before request") | |
| if memory_mb and memory_mb > 450: # Critical threshold for 512MB limit | |
| clean_memory("Emergency cleanup") | |
| if memory_mb > 480: # Near crash | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Server too busy, try again later", | |
| } | |
| ), | |
| 503, | |
| ) | |
| except Exception as e: | |
| # Don't let memory monitoring crash the app | |
| logger.warning(f"Memory monitoring failed: {e}") | |
| except ImportError as e: | |
| # Memory utils module not available | |
| logger.warning(f"Memory monitoring not available: {e}") | |
| except Exception as e: | |
| # Other errors shouldn't crash the app | |
| logger.warning(f"Memory monitoring error: {e}") | |
| def soft_ceiling(): | |
| """Block high-memory expensive endpoints when near hard limit.""" | |
| path = request.path | |
| if path in ("/ingest", "/search"): | |
| try: | |
| from src.utils.memory_utils import get_memory_usage | |
| mem = get_memory_usage() | |
| if mem and mem > 470: # soft ceiling | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Server memory high; try again later", | |
| "memory_mb": mem, | |
| } | |
| ), | |
| 503, | |
| ) | |
| except Exception: | |
| pass | |
| # Lazy-load services to avoid high memory usage at startup | |
| # These will be initialized on the first request to a relevant endpoint | |
| app.config["RAG_PIPELINE"] = None | |
| app.config["INGESTION_PIPELINE"] = None | |
| app.config["SEARCH_SERVICE"] = None | |
| def get_rag_pipeline(): | |
| """ | |
| Initialize and cache the RAG pipeline with a timeout. | |
| This prevents blocking the main thread for too long during cold starts. | |
| """ | |
| if app.config.get("RAG_PIPELINE") is not None: | |
| return app.config["RAG_PIPELINE"] | |
| def _init_pipeline(): | |
| """The actual initialization logic.""" | |
| from src.config import ( | |
| COLLECTION_NAME, | |
| EMBEDDING_BATCH_SIZE, | |
| EMBEDDING_DEVICE, | |
| EMBEDDING_MODEL_NAME, | |
| ) | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.llm.llm_service import LLMService | |
| from src.rag.rag_pipeline import RAGPipeline | |
| from src.search.search_service import SearchService | |
| from src.vector_store.vector_db import create_vector_database | |
| logging.info("RAG pipeline initialization started in worker thread...") | |
| vector_db = create_vector_database(collection_name=COLLECTION_NAME) | |
| embedding_service = EmbeddingService( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| device=EMBEDDING_DEVICE, | |
| batch_size=EMBEDDING_BATCH_SIZE, | |
| ) | |
| search_service = SearchService(vector_db, embedding_service) | |
| llm_service = LLMService.from_environment() | |
| pipeline = RAGPipeline(search_service, llm_service) | |
| logging.info("RAG pipeline initialization finished in worker thread.") | |
| return pipeline | |
| timeout = int(os.getenv("RAG_INIT_TIMEOUT", "60")) | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(_init_pipeline) | |
| try: | |
| pipeline = future.result(timeout=timeout) | |
| app.config["RAG_PIPELINE"] = pipeline | |
| return pipeline | |
| except concurrent.futures.TimeoutError: | |
| logging.error(f"RAG pipeline initialization timed out after {timeout}s.") | |
| raise InitializationTimeoutError("Initialization timed out. Please try again in a moment.") | |
| except Exception as e: | |
| logging.error(f"RAG pipeline initialization failed: {e}", exc_info=True) | |
| raise e | |
| def get_ingestion_pipeline(store_embeddings=True): | |
| """Initialize the ingestion pipeline.""" | |
| # Ingestion is request-specific, so we don't cache it | |
| from src.config import ( | |
| DEFAULT_CHUNK_SIZE, | |
| DEFAULT_OVERLAP, | |
| EMBEDDING_BATCH_SIZE, | |
| EMBEDDING_DEVICE, | |
| EMBEDDING_MODEL_NAME, | |
| RANDOM_SEED, | |
| ) | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.ingestion.ingestion_pipeline import IngestionPipeline | |
| embedding_service = None | |
| if store_embeddings: | |
| embedding_service = EmbeddingService( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| device=EMBEDDING_DEVICE, | |
| batch_size=EMBEDDING_BATCH_SIZE, | |
| ) | |
| return IngestionPipeline( | |
| chunk_size=DEFAULT_CHUNK_SIZE, | |
| overlap=DEFAULT_OVERLAP, | |
| seed=RANDOM_SEED, | |
| store_embeddings=store_embeddings, | |
| embedding_service=embedding_service, | |
| ) | |
| def get_search_service(): | |
| """Initialize and cache the search service.""" | |
| if app.config.get("SEARCH_SERVICE") is None: | |
| logging.info("Initializing search service for the first time...") | |
| from src.config import ( | |
| COLLECTION_NAME, | |
| EMBEDDING_BATCH_SIZE, | |
| EMBEDDING_DEVICE, | |
| EMBEDDING_MODEL_NAME, | |
| VECTOR_DB_PERSIST_PATH, | |
| ) | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.search.search_service import SearchService | |
| from src.utils.memory_utils import MemoryManager | |
| from src.vector_store.vector_db import VectorDatabase | |
| # Use memory manager for this expensive operation | |
| with MemoryManager("search_service_initialization"): | |
| vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME) | |
| embedding_service = EmbeddingService( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| device=EMBEDDING_DEVICE, | |
| batch_size=EMBEDDING_BATCH_SIZE, | |
| ) | |
| app.config["SEARCH_SERVICE"] = SearchService(vector_db, embedding_service) | |
| logging.info("Search service initialized.") | |
| return app.config["SEARCH_SERVICE"] | |
| def index(): | |
| return render_template("chat.html") | |
| # Minimal favicon/apple-touch handlers to eliminate 404 noise without storing binary files. | |
| # Returns a 1x1 transparent PNG generated on the fly (base64 decoded). | |
| import base64 | |
| from flask import Response | |
| _TINY_PNG_BASE64 = ( | |
| b"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAusB9YwWtYkAAAAASUVORK5CYII=" | |
| ) | |
| def _tiny_png_response(): | |
| png_bytes = base64.b64decode(_TINY_PNG_BASE64) | |
| return Response(png_bytes, mimetype="image/png") | |
| def favicon(): # pragma: no cover - trivial asset route | |
| return _tiny_png_response() | |
| def apple_touch_icon(): # pragma: no cover - trivial asset route | |
| return _tiny_png_response() | |
| def management_dashboard(): | |
| """Document management dashboard""" | |
| return render_template("management.html") | |
| def health(): | |
| try: | |
| # Default values in case memory_utils is not available | |
| memory_mb = 0 | |
| status = "ok" | |
| try: | |
| from src.utils.memory_utils import get_memory_usage | |
| memory_mb = get_memory_usage() | |
| except Exception as e: | |
| # Don't let memory monitoring failure break health check | |
| logger.warning(f"Memory usage check failed: {e}") | |
| status = "degraded" | |
| # Check LLM availability | |
| llm_available = True | |
| try: | |
| # Quick check for LLM configuration without caching | |
| has_api_keys = bool(os.getenv("OPENROUTER_API_KEY") or os.getenv("GROQ_API_KEY")) | |
| if not has_api_keys: | |
| llm_available = False | |
| except Exception: | |
| llm_available = False | |
| # Add warning if memory usage is high (only when monitoring enabled) | |
| if memory_monitoring_enabled: | |
| if memory_mb > 400: # Warning threshold for 512MB limit | |
| status = "warning" | |
| elif memory_mb > 450: # Critical threshold | |
| status = "critical" | |
| # Degrade status if LLM is not available | |
| if not llm_available: | |
| if status == "ok": | |
| status = "degraded" | |
| response_data = { | |
| "status": status, | |
| "memory_mb": round(memory_mb, 1), | |
| "timestamp": __import__("datetime").datetime.utcnow().isoformat(), | |
| "llm_available": llm_available, | |
| } | |
| # Return 200 for ok/warning/degraded, 503 for critical | |
| status_code = 503 if status == "critical" else 200 | |
| return jsonify(response_data), status_code | |
| except Exception as e: | |
| # Last resort error handler | |
| logger.error(f"Health check failed: {e}") | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Health check failed", | |
| "error": str(e), | |
| "timestamp": __import__("datetime").datetime.utcnow().isoformat(), | |
| } | |
| ), | |
| 500, | |
| ) | |
| def memory_diagnostics(): | |
| """Return detailed memory diagnostics (safe for production use). | |
| Query params: | |
| include_top=1 -> include top allocation traces | |
| limit=N -> number of top allocation entries (default 5) | |
| """ | |
| import tracemalloc | |
| from src.utils.memory_utils import memory_summary | |
| include_top = request.args.get("include_top") in ("1", "true", "True") | |
| try: | |
| limit = int(request.args.get("limit", 5)) | |
| except ValueError: | |
| limit = 5 | |
| summary = memory_summary() | |
| diagnostics = { | |
| "summary": summary, | |
| "tracemalloc_active": tracemalloc.is_tracing(), | |
| } | |
| if include_top and tracemalloc.is_tracing(): | |
| try: | |
| snapshot = tracemalloc.take_snapshot() | |
| stats = snapshot.statistics("lineno") | |
| top_list = [] | |
| for stat in stats[: max(1, min(limit, 25))]: | |
| size_mb = stat.size / 1024 / 1024 | |
| location = f"{stat.traceback[0].filename}:{stat.traceback[0].lineno}" | |
| top_list.append( | |
| { | |
| "location": location, | |
| "size_mb": round(size_mb, 4), | |
| "count": stat.count, | |
| "repr": str(stat)[:300], | |
| } | |
| ) | |
| diagnostics["top_allocations"] = top_list | |
| except Exception as e: # pragma: no cover | |
| diagnostics["top_allocations_error"] = str(e) | |
| return jsonify({"status": "success", "memory": diagnostics}) | |
| def force_clean(): | |
| """Force a full memory cleanup and return new memory usage.""" | |
| from src.utils.memory_utils import force_clean_and_report | |
| try: | |
| data = request.get_json(silent=True) or {} | |
| label = data.get("label", "manual") | |
| if not isinstance(label, str): | |
| label = "manual" | |
| summary = force_clean_and_report(label=str(label)) | |
| # Include the label at the top level for test compatibility | |
| return jsonify({"status": "success", "label": str(label), "summary": summary}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}) | |
| def render_memory_status(): | |
| """Return Render-specific memory monitoring data. | |
| This returns detailed metrics when running on Render. | |
| Otherwise it returns basic memory stats. | |
| """ | |
| try: | |
| # Default basic response for all environments | |
| basic_response = { | |
| "status": "success", | |
| "is_render": False, | |
| "memory_mb": 0, | |
| "timestamp": __import__("datetime").datetime.utcnow().isoformat(), | |
| } | |
| try: | |
| # Try to get basic memory usage | |
| from src.utils.memory_utils import get_memory_usage | |
| basic_response["memory_mb"] = get_memory_usage() | |
| # Try to add summary if available | |
| try: | |
| from src.utils.memory_utils import memory_summary | |
| basic_response["summary"] = memory_summary() | |
| except Exception as e: | |
| basic_response["summary_error"] = str(e) | |
| # If on Render, try to get enhanced metrics | |
| if is_render: | |
| try: | |
| # Import here to avoid errors when not on Render | |
| from src.utils.render_monitoring import ( | |
| check_render_memory_thresholds, | |
| get_memory_trends, | |
| ) | |
| # Get current memory status with checks | |
| status = check_render_memory_thresholds("api_request") | |
| # Get trend information | |
| trends = get_memory_trends() | |
| # Return structured memory status for Render | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "is_render": True, | |
| "memory_status": status, | |
| "memory_trends": trends, | |
| "render_limit_mb": 512, | |
| } | |
| ) | |
| except Exception as e: | |
| basic_response["render_metrics_error"] = str(e) | |
| except Exception as e: | |
| basic_response["memory_utils_error"] = str(e) | |
| # Return basic response with whatever data we could get | |
| return jsonify(basic_response) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}) | |
| def ingest(): | |
| try: | |
| from src.config import CORPUS_DIRECTORY | |
| # Use silent=True to avoid exceptions and provide a known dict type | |
| data: Dict[str, Any] = request.get_json(silent=True) or {} | |
| store_embeddings = bool(data.get("store_embeddings", True)) | |
| pipeline = get_ingestion_pipeline(store_embeddings) | |
| result = pipeline.process_directory_with_embeddings(CORPUS_DIRECTORY) | |
| # Create response with enhanced information | |
| response = { | |
| "status": result["status"], | |
| "chunks_processed": result["chunks_processed"], | |
| "files_processed": result["files_processed"], | |
| "embeddings_stored": result["embeddings_stored"], | |
| "store_embeddings": result["store_embeddings"], | |
| "message": ( | |
| f"Successfully processed {result['chunks_processed']} " | |
| f"chunks from {result['files_processed']} files" | |
| ), | |
| } | |
| # Include failed files info if any | |
| if result["failed_files"]: | |
| response["failed_files"] = result["failed_files"] | |
| failed_count = len(result["failed_files"]) | |
| response["warnings"] = f"{failed_count} files failed to process" | |
| return jsonify(response) | |
| except Exception as e: | |
| logging.error(f"Ingestion failed: {e}", exc_info=True) | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def search(): | |
| from src.utils.memory_utils import log_memory_usage | |
| try: | |
| log_memory_usage("search_request_start") | |
| # Validate request contains JSON data | |
| if not request.is_json: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Content-Type must be application/json", | |
| } | |
| ), | |
| 400, | |
| ) | |
| data: Dict[str, Any] = request.get_json() or {} | |
| # Validate required query parameter | |
| query = data.get("query") | |
| if query is None: | |
| return ( | |
| jsonify({"status": "error", "message": "Query parameter is required"}), | |
| 400, | |
| ) | |
| if not isinstance(query, str) or not query.strip(): | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Query must be a non-empty string", | |
| } | |
| ), | |
| 400, | |
| ) | |
| # Extract optional parameters with defaults | |
| top_k = data.get("top_k", 5) | |
| threshold = data.get("threshold", 0.3) | |
| # Validate parameters | |
| if not isinstance(top_k, int) or top_k <= 0: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "top_k must be a positive integer", | |
| } | |
| ), | |
| 400, | |
| ) | |
| if not isinstance(threshold, (int, float)) or not (0.0 <= threshold <= 1.0): | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "threshold must be a number between 0 and 1", | |
| } | |
| ), | |
| 400, | |
| ) | |
| search_service = get_search_service() | |
| results = search_service.search(query=query.strip(), top_k=top_k, threshold=threshold) | |
| # Format response | |
| response = { | |
| "status": "success", | |
| "query": query.strip(), | |
| "results_count": len(results), | |
| "results": results, | |
| } | |
| return jsonify(response) | |
| except ValueError as e: | |
| return jsonify({"status": "error", "message": str(e)}), 400 | |
| except Exception as e: | |
| logging.error(f"Search failed: {e}", exc_info=True) | |
| return ( | |
| jsonify({"status": "error", "message": f"Search failed: {str(e)}"}), | |
| 500, | |
| ) | |
| def chat(): | |
| try: | |
| # Validate request contains JSON data | |
| if not request.is_json: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "Content-Type must be application/json", | |
| } | |
| ), | |
| 400, | |
| ) | |
| data: Dict[str, Any] = request.get_json() or {} | |
| # Validate required message parameter and length guard | |
| message = data.get("message") | |
| if message is None: | |
| return ( | |
| jsonify({"status": "error", "message": "message parameter is required"}), | |
| 400, | |
| ) | |
| if not isinstance(message, str) or not message.strip(): | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "message must be a non-empty string", | |
| } | |
| ), | |
| 400, | |
| ) | |
| # Enforce maximum chat input size to prevent memory spikes | |
| try: | |
| max_chars = int(os.getenv("CHAT_MAX_CHARS", "5000")) | |
| except ValueError: | |
| max_chars = 5000 | |
| if len(message) > max_chars: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": (f"message too long (>{max_chars} chars); " "please shorten your input"), | |
| } | |
| ), | |
| 413, | |
| ) | |
| # Extract optional parameters | |
| conversation_id = data.get("conversation_id") | |
| include_sources = data.get("include_sources", True) | |
| include_debug = data.get("include_debug", False) | |
| # Let the global error handler handle LLMConfigurationError | |
| rag_pipeline = get_rag_pipeline() | |
| rag_response = rag_pipeline.generate_answer(message.strip()) | |
| from src.rag.response_formatter import ResponseFormatter | |
| formatter = ResponseFormatter() | |
| # Format response for API | |
| if include_sources: | |
| formatted_response = formatter.format_api_response(rag_response, include_debug) | |
| else: | |
| formatted_response = formatter.format_chat_response( | |
| rag_response, conversation_id, include_sources=False | |
| ) | |
| return jsonify(formatted_response) | |
| except InitializationTimeoutError as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": "The server is starting up and is not yet ready " | |
| "to handle requests. Please try again in a moment.", | |
| "details": str(e), | |
| } | |
| ), | |
| 503, | |
| ) | |
| except Exception as e: | |
| # Re-raise LLMConfigurationError so our custom error handler can catch it | |
| from src.llm.llm_configuration_error import LLMConfigurationError | |
| if isinstance(e, LLMConfigurationError): | |
| raise e | |
| logging.error(f"Chat failed: {e}", exc_info=True) | |
| return ( | |
| jsonify({"status": "error", "message": f"Chat request failed: {str(e)}"}), | |
| 500, | |
| ) | |
| def chat_health(): | |
| try: | |
| # Let the global error handler handle LLMConfigurationError | |
| rag_pipeline = get_rag_pipeline() | |
| health_data = rag_pipeline.health_check() | |
| from src.rag.response_formatter import ResponseFormatter | |
| formatter = ResponseFormatter() | |
| health_response = formatter.create_health_response(health_data) | |
| # Determine HTTP status based on health | |
| if health_data.get("pipeline") == "healthy": | |
| return jsonify(health_response), 200 | |
| elif health_data.get("pipeline") == "degraded": | |
| return jsonify(health_response), 200 # Still functional | |
| else: | |
| return jsonify(health_response), 503 # Service unavailable | |
| except Exception as e: | |
| # Re-raise LLMConfigurationError so our custom error handler can catch it | |
| from src.llm.llm_configuration_error import LLMConfigurationError | |
| if isinstance(e, LLMConfigurationError): | |
| raise e | |
| logging.error(f"Chat health check failed: {e}", exc_info=True) | |
| return ( | |
| jsonify({"status": "error", "message": f"Health check failed: {str(e)}"}), | |
| 500, | |
| ) | |
| # Add other non-ML routes directly | |
| def get_query_suggestions(): | |
| suggestions = [ | |
| "What is our remote work policy?", | |
| "How do I request time off?", | |
| "What are our information security guidelines?", | |
| "How does our expense reimbursement work?", | |
| "Tell me about our diversity and inclusion policy", | |
| "What's the process for employee performance reviews?", | |
| "How do I report an emergency at work?", | |
| "What professional development opportunities are available?", | |
| ] | |
| return jsonify({"status": "success", "suggestions": suggestions}) | |
| def submit_feedback(): | |
| try: | |
| feedback_data = request.json | |
| if not feedback_data: | |
| return ( | |
| jsonify({"status": "error", "message": "No feedback data provided"}), | |
| 400, | |
| ) | |
| required_fields = ["conversation_id", "message_id", "feedback_type"] | |
| for field in required_fields: | |
| if field not in feedback_data: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Missing required field: {field}", | |
| } | |
| ), | |
| 400, | |
| ) | |
| print(f"Received feedback: {feedback_data}") | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "message": "Feedback received", | |
| "feedback": feedback_data, | |
| } | |
| ) | |
| except Exception as e: | |
| print(f"Error processing feedback: {str(e)}") | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Error processing feedback: {str(e)}", | |
| } | |
| ), | |
| 500, | |
| ) | |
| def get_source_document(source_id: str): | |
| try: | |
| from typing import Union | |
| source_map: Dict[str, Dict[str, Union[str, Dict[str, str]]]] = { | |
| "remote_work": { | |
| "content": ( | |
| "# Remote Work Policy\n\n" | |
| "Employees may work remotely up to 3 days per week" | |
| " with manager approval." | |
| ), | |
| "metadata": { | |
| "filename": "remote_work_policy.md", | |
| "last_updated": "2025-09-15", | |
| }, | |
| }, | |
| "pto": { | |
| "content": ( | |
| "# PTO Policy\n\n" | |
| "Full-time employees receive 20 days of PTO annually, " | |
| "accrued monthly." | |
| ), | |
| "metadata": { | |
| "filename": "pto_policy.md", | |
| "last_updated": "2025-08-20", | |
| }, | |
| }, | |
| "security": { | |
| "content": ( | |
| "# Information Security Policy\n\n" | |
| "All employees must use company-approved devices and " | |
| "software for work tasks." | |
| ), | |
| "metadata": { | |
| "filename": "information_security_policy.md", | |
| "last_updated": "2025-10-01", | |
| }, | |
| }, | |
| "expense": { | |
| "content": ( | |
| "# Expense Reimbursement\n\n" | |
| "Submit all expense reports within 30 days of incurring " | |
| "the expense." | |
| ), | |
| "metadata": { | |
| "filename": "expense_reimbursement_policy.md", | |
| "last_updated": "2025-07-10", | |
| }, | |
| }, | |
| } | |
| if source_id in source_map: | |
| source_data = source_map[source_id] | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "source_id": source_id, | |
| "content": source_data["content"], | |
| "metadata": source_data["metadata"], | |
| } | |
| ) | |
| else: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": (f"Source document with ID {source_id} not found"), | |
| } | |
| ), | |
| 404, | |
| ) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Failed to retrieve source document: {str(e)}", | |
| } | |
| ), | |
| 500, | |
| ) | |
| def get_conversations(): | |
| conversations = [ | |
| { | |
| "id": "conv-123456", | |
| "title": "HR Policy Questions", | |
| "timestamp": "2025-10-15T14:30:00Z", | |
| "preview": "What is our remote work policy?", | |
| }, | |
| { | |
| "id": "conv-789012", | |
| "title": "Project Planning Queries", | |
| "timestamp": "2025-10-14T09:15:00Z", | |
| "preview": "How do we handle project kickoffs?", | |
| }, | |
| { | |
| "id": "conv-345678", | |
| "title": "Security Compliance", | |
| "timestamp": "2025-10-12T16:45:00Z", | |
| "preview": "What are our password requirements?", | |
| }, | |
| ] | |
| return jsonify({"status": "success", "conversations": conversations}) | |
| def get_conversation(conversation_id: str): | |
| try: | |
| from typing import List, Union | |
| if conversation_id == "conv-123456": | |
| messages: List[Dict[str, Union[str, List[Dict[str, str]]]]] = [ | |
| { | |
| "id": "msg-111", | |
| "role": "user", | |
| "content": "What is our remote work policy?", | |
| "timestamp": "2025-10-15T14:30:00Z", | |
| }, | |
| { | |
| "id": "msg-112", | |
| "role": "assistant", | |
| "content": ( | |
| "According to our remote work policy, employees may " | |
| "work up to 3 days per week with manager approval." | |
| ), | |
| "timestamp": "2025-10-15T14:30:15Z", | |
| "sources": [{"id": "remote_work", "title": "Remote Work Policy"}], | |
| }, | |
| ] | |
| else: | |
| return ( | |
| jsonify( | |
| { | |
| "status": "error", | |
| "message": f"Conversation {conversation_id} not found", | |
| } | |
| ), | |
| 404, | |
| ) | |
| return jsonify( | |
| { | |
| "status": "success", | |
| "conversation_id": conversation_id, | |
| "messages": messages, | |
| } | |
| ) | |
| except Exception as e: | |
| app.logger.error(f"An unexpected error occurred: {e}") | |
| return ( | |
| jsonify({"status": "error", "message": "An internal error occurred."}), | |
| 500, | |
| ) | |
| # Register memory-aware error handlers | |
| from src.utils.error_handlers import register_error_handlers | |
| register_error_handlers(app) | |
| # Ensure embeddings on app startup. | |
| # Embeddings are checked and rebuilt before the app starts serving requests. | |
| # Disabled: Using pre-built embeddings to avoid memory spikes during deployment. | |
| # ensure_embeddings_on_startup() | |
| # Register document management blueprint | |
| try: | |
| from src.document_management.routes import document_bp | |
| app.register_blueprint(document_bp, url_prefix="/api/documents") | |
| logging.info("Document management blueprint registered successfully") | |
| except Exception as e: | |
| logging.warning(f"Failed to register document management blueprint: {e}") | |
| # Use pre-built embeddings by default for reliable deployment | |
| # Only rebuild embeddings if explicitly requested via environment variable | |
| if os.getenv("REBUILD_EMBEDDINGS_ON_START", "false").lower() == "true": | |
| with app.app_context(): | |
| logging.info("REBUILD_EMBEDDINGS_ON_START is true, rebuilding embeddings on startup.") | |
| ensure_embeddings_on_startup() | |
| else: | |
| logging.info("Using pre-built embeddings. Set REBUILD_EMBEDDINGS_ON_START=true to rebuild.") | |
| # Add Render-specific memory middleware if running on Render and | |
| # memory monitoring is enabled | |
| if is_render and memory_monitoring_enabled: | |
| try: | |
| # Import locally and alias to avoid redefinition warnings | |
| from src.utils.render_monitoring import ( | |
| add_memory_middleware as _add_memory_middleware, | |
| ) | |
| _add_memory_middleware(app) | |
| logger.info("Render memory monitoring middleware added") | |
| except Exception as e: | |
| logger.warning(f"Failed to add Render memory middleware: {e}") | |
| return app | |
| except Exception as e: | |
| # This is a critical catch-all for any exception during app creation. | |
| # Logging this as a critical error is essential for debugging startup failures. | |
| logging.critical(f"CRITICAL: App creation failed: {e}", exc_info=True) | |
| # Re-raise the exception to ensure the Gunicorn worker fails loudly | |
| # and the failure is immediately obvious in the logs. | |
| raise | |