Spaces:
Sleeping
Sleeping
| """ | |
| Document Service - Core document management functionality | |
| Provides centralized document management capabilities that integrate with | |
| the existing RAG pipeline architecture. Follows the lazy loading pattern | |
| established in the app factory. | |
| """ | |
| import logging | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| from werkzeug.utils import secure_filename | |
| class DocumentStatus(Enum): | |
| """Document processing status enumeration""" | |
| UPLOADED = "uploaded" | |
| VALIDATING = "validating" | |
| PARSING = "parsing" | |
| CHUNKING = "chunking" | |
| EMBEDDING = "embedding" | |
| INDEXING = "indexing" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class DocumentService: | |
| """ | |
| Core document management service that integrates with existing RAG infrastructure. | |
| This service manages the document lifecycle from upload through processing, | |
| leveraging the existing ingestion pipeline and vector database. | |
| """ | |
| def __init__(self, upload_dir: str = None): | |
| """ | |
| Initialize the document service. | |
| Args: | |
| upload_dir: Directory for storing uploaded files | |
| """ | |
| self.upload_dir = upload_dir or self._get_default_upload_dir() | |
| self.supported_formats = { | |
| "text": [".txt", ".md", ".csv"], | |
| "documents": [".pdf", ".docx", ".doc"], | |
| "structured": [".json", ".yaml", ".xml"], | |
| "web": [".html", ".htm"], | |
| "office": [".xlsx", ".pptx"], | |
| } | |
| self.max_file_size = 50 * 1024 * 1024 # 50MB | |
| self.max_batch_size = 100 | |
| # Ensure upload directory exists | |
| Path(self.upload_dir).mkdir(parents=True, exist_ok=True) | |
| logging.info(f"DocumentService initialized with upload_dir: {self.upload_dir}") | |
| def _get_default_upload_dir(self) -> str: | |
| """Get default upload directory path""" | |
| project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| return os.path.join(project_root, "data", "uploads") | |
| def validate_file(self, filename: str, file_size: int) -> Dict[str, Any]: | |
| """ | |
| Validate uploaded file. | |
| Args: | |
| filename: Name of the file | |
| file_size: Size of the file in bytes | |
| Returns: | |
| Dict with validation results | |
| """ | |
| errors = [] | |
| warnings = [] | |
| # Check file extension | |
| file_ext = Path(filename).suffix.lower() | |
| all_supported = [] | |
| for format_list in self.supported_formats.values(): | |
| all_supported.extend(format_list) | |
| if file_ext not in all_supported: | |
| errors.append(f"Unsupported file format: {file_ext}") | |
| # Check file size | |
| if file_size > self.max_file_size: | |
| errors.append(f"File too large: {file_size} bytes (max: {self.max_file_size})") | |
| # Check filename security | |
| secure_name = secure_filename(filename) | |
| if secure_name != filename: | |
| warnings.append("Filename was sanitized for security") | |
| return { | |
| "valid": len(errors) == 0, | |
| "errors": errors, | |
| "warnings": warnings, | |
| "secure_filename": secure_name, | |
| } | |
| def save_uploaded_file(self, file_obj, filename: str) -> Dict[str, Any]: | |
| """ | |
| Save uploaded file to disk. | |
| Args: | |
| file_obj: File object from request | |
| filename: Original filename | |
| Returns: | |
| Dict with file information | |
| """ | |
| # Generate unique filename to avoid conflicts | |
| secure_name = secure_filename(filename) | |
| file_id = str(uuid.uuid4()) | |
| file_ext = Path(secure_name).suffix | |
| unique_filename = f"{file_id}{file_ext}" | |
| file_path = os.path.join(self.upload_dir, unique_filename) | |
| try: | |
| file_obj.save(file_path) | |
| file_size = os.path.getsize(file_path) | |
| file_info = { | |
| "file_id": file_id, | |
| "original_name": filename, | |
| "secure_name": secure_name, | |
| "unique_filename": unique_filename, | |
| "file_path": file_path, | |
| "file_size": file_size, | |
| "upload_time": datetime.utcnow().isoformat(), | |
| "status": DocumentStatus.UPLOADED.value, | |
| } | |
| logging.info(f"Saved uploaded file: {filename} -> {unique_filename}") | |
| return file_info | |
| except Exception as e: | |
| logging.error(f"Failed to save uploaded file {filename}: {e}") | |
| raise | |
| def get_file_metadata(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Extract metadata from file. | |
| Args: | |
| file_path: Path to the file | |
| Returns: | |
| Dict with file metadata | |
| """ | |
| try: | |
| stat = os.stat(file_path) | |
| file_ext = Path(file_path).suffix.lower() | |
| metadata = { | |
| "file_size": stat.st_size, | |
| "created_time": datetime.fromtimestamp(stat.st_ctime).isoformat(), | |
| "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), | |
| "file_extension": file_ext, | |
| "file_type": self._get_file_type(file_ext), | |
| } | |
| # Try to extract additional metadata based on file type | |
| if file_ext == ".pdf": | |
| metadata.update(self._extract_pdf_metadata(file_path)) | |
| elif file_ext in [".docx", ".doc"]: | |
| metadata.update(self._extract_word_metadata(file_path)) | |
| return metadata | |
| except Exception as e: | |
| logging.error(f"Failed to extract metadata from {file_path}: {e}") | |
| return {} | |
| def _get_file_type(self, file_ext: str) -> str: | |
| """Get file type category from extension""" | |
| for file_type, extensions in self.supported_formats.items(): | |
| if file_ext in extensions: | |
| return file_type | |
| return "unknown" | |
| def _extract_pdf_metadata(self, file_path: str) -> Dict[str, Any]: | |
| """Extract metadata from PDF file""" | |
| try: | |
| # This would use PyPDF2 or similar library in a real implementation | |
| # For now, return basic info | |
| return { | |
| "pages": "unknown", # Would extract actual page count | |
| "title": "unknown", # Would extract PDF title | |
| "author": "unknown", # Would extract PDF author | |
| } | |
| except Exception: | |
| return {} | |
| def _extract_word_metadata(self, file_path: str) -> Dict[str, Any]: | |
| """Extract metadata from Word document""" | |
| try: | |
| # This would use python-docx or similar library in a real implementation | |
| # For now, return basic info | |
| return { | |
| "word_count": "unknown", # Would extract actual word count | |
| "title": "unknown", # Would extract document title | |
| "author": "unknown", # Would extract document author | |
| } | |
| except Exception: | |
| return {} | |
| def delete_file(self, file_path: str) -> bool: | |
| """ | |
| Delete file from disk. | |
| Args: | |
| file_path: Path to file to delete | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| logging.info(f"Deleted file: {file_path}") | |
| return True | |
| else: | |
| logging.warning(f"File not found for deletion: {file_path}") | |
| return False | |
| except Exception as e: | |
| logging.error(f"Failed to delete file {file_path}: {e}") | |
| return False | |
| def get_upload_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get statistics about uploaded files. | |
| Returns: | |
| Dict with upload statistics | |
| """ | |
| try: | |
| if not os.path.exists(self.upload_dir): | |
| return {"total_files": 0, "total_size": 0, "file_types": {}} | |
| files = list(Path(self.upload_dir).glob("*")) | |
| total_size = sum(f.stat().st_size for f in files if f.is_file()) | |
| file_types = {} | |
| for file_path in files: | |
| if file_path.is_file(): | |
| ext = file_path.suffix.lower() | |
| file_types[ext] = file_types.get(ext, 0) + 1 | |
| return { | |
| "total_files": len(files), | |
| "total_size": total_size, | |
| "file_types": file_types, | |
| "upload_dir": self.upload_dir, | |
| } | |
| except Exception as e: | |
| logging.error(f"Failed to get upload stats: {e}") | |
| return {"error": str(e)} | |
| def cleanup_old_files(self, days_old: int = 30) -> Dict[str, Any]: | |
| """ | |
| Clean up old uploaded files. | |
| Args: | |
| days_old: Delete files older than this many days | |
| Returns: | |
| Dict with cleanup results | |
| """ | |
| try: | |
| cutoff_time = datetime.now().timestamp() - (days_old * 24 * 60 * 60) | |
| deleted_files = [] | |
| errors = [] | |
| if os.path.exists(self.upload_dir): | |
| for file_path in Path(self.upload_dir).glob("*"): | |
| if file_path.is_file() and file_path.stat().st_mtime < cutoff_time: | |
| try: | |
| file_path.unlink() | |
| deleted_files.append(str(file_path)) | |
| except Exception as e: | |
| errors.append(f"Failed to delete {file_path}: {e}") | |
| result = { | |
| "deleted_count": len(deleted_files), | |
| "deleted_files": deleted_files, | |
| "errors": errors, | |
| } | |
| logging.info(f"Cleanup completed: {len(deleted_files)} files deleted") | |
| return result | |
| except Exception as e: | |
| logging.error(f"Cleanup failed: {e}") | |
| return {"error": str(e)} | |