Spaces:
Sleeping
Sleeping
| """ | |
| Upload Service - Handle file uploads and validation | |
| Provides upload management functionality that integrates with | |
| the Flask app factory pattern and existing services. | |
| """ | |
| import logging | |
| from typing import Any, Dict, List, Tuple | |
| from werkzeug.datastructures import FileStorage | |
| class UploadService: | |
| """ | |
| File upload service that handles multi-file uploads with validation. | |
| Integrates with DocumentService for file management and ProcessingService | |
| for async processing workflow. | |
| """ | |
| def __init__(self, document_service, processing_service): | |
| """ | |
| Initialize upload service. | |
| Args: | |
| document_service: DocumentService instance | |
| processing_service: ProcessingService instance | |
| """ | |
| self.document_service = document_service | |
| self.processing_service = processing_service | |
| logging.info("UploadService initialized") | |
| def handle_upload_request(self, request_files, metadata: Dict[str, Any] = None) -> Dict[str, Any]: | |
| """ | |
| Handle multi-file upload request. | |
| Args: | |
| request_files: Files from Flask request | |
| metadata: Optional metadata for files | |
| Returns: | |
| Upload results with status and file information | |
| """ | |
| if not request_files: | |
| return {"status": "error", "message": "No files provided", "files": []} | |
| results = { | |
| "status": "success", | |
| "files": [], | |
| "job_ids": [], | |
| "total_files": 0, | |
| "successful_uploads": 0, | |
| "failed_uploads": 0, | |
| "errors": [], | |
| } | |
| # Handle multiple files | |
| files = request_files.getlist("files") if hasattr(request_files, "getlist") else [request_files.get("file")] | |
| files = [f for f in files if f] # Remove None values | |
| results["total_files"] = len(files) | |
| for file_obj in files: | |
| try: | |
| file_result = self._process_single_file(file_obj, metadata or {}) | |
| results["files"].append(file_result) | |
| if file_result["status"] == "success": | |
| results["successful_uploads"] += 1 | |
| if file_result.get("job_id"): | |
| results["job_ids"].append(file_result["job_id"]) | |
| else: | |
| results["failed_uploads"] += 1 | |
| if file_result.get("error"): | |
| results["errors"].append(file_result["error"]) | |
| except Exception as e: | |
| error_msg = f"Failed to process file: {str(e)}" | |
| results["errors"].append(error_msg) | |
| results["failed_uploads"] += 1 | |
| results["files"].append( | |
| { | |
| "filename": getattr(file_obj, "filename", "unknown"), | |
| "status": "error", | |
| "error": error_msg, | |
| } | |
| ) | |
| # Update overall status | |
| if results["failed_uploads"] > 0: | |
| if results["successful_uploads"] == 0: | |
| results["status"] = "error" | |
| results["message"] = "All uploads failed" | |
| else: | |
| results["status"] = "partial" | |
| results["message"] = ( | |
| f"{results['successful_uploads']} files uploaded, " f"{results['failed_uploads']} failed" | |
| ) | |
| else: | |
| results["message"] = f"Successfully uploaded {results['successful_uploads']} files" | |
| return results | |
| def _process_single_file(self, file_obj: FileStorage, metadata: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Process a single uploaded file. | |
| Args: | |
| file_obj: File object from request | |
| metadata: File metadata | |
| Returns: | |
| Processing result for the file | |
| """ | |
| filename = file_obj.filename or "unknown" | |
| try: | |
| # Get file size | |
| file_obj.seek(0, 2) # Seek to end | |
| file_size = file_obj.tell() | |
| file_obj.seek(0) # Reset to beginning | |
| # Validate file | |
| validation_result = self.document_service.validate_file(filename, file_size) | |
| if not validation_result["valid"]: | |
| error_msg = f"Validation failed: {', '.join(validation_result['errors'])}" | |
| return { | |
| "filename": filename, | |
| "status": "error", | |
| "error": error_msg, | |
| "validation": validation_result, | |
| } | |
| # Save file | |
| file_info = self.document_service.save_uploaded_file(file_obj, filename) | |
| # Add metadata | |
| file_info.update(metadata) | |
| # Extract file metadata | |
| file_metadata = self.document_service.get_file_metadata(file_info["file_path"]) | |
| file_info["metadata"] = file_metadata | |
| # Submit for processing | |
| processing_options = { | |
| "chunk_size": metadata.get("chunk_size", 1000), | |
| "overlap": metadata.get("overlap", 200), | |
| "auto_process": metadata.get("auto_process", True), | |
| } | |
| job_id = None | |
| if processing_options.get("auto_process", True): | |
| job_id = self.processing_service.submit_job(file_info, processing_options) | |
| upload_msg = "File uploaded" | |
| if job_id: | |
| upload_msg += " and submitted for processing" | |
| return { | |
| "filename": filename, | |
| "status": "success", | |
| "file_info": file_info, | |
| "job_id": job_id, | |
| "validation": validation_result, | |
| "message": upload_msg, | |
| } | |
| except Exception as e: | |
| logging.error(f"Error processing file {filename}: {e}", exc_info=True) | |
| return {"filename": filename, "status": "error", "error": str(e)} | |
| def get_upload_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get summary of upload system status. | |
| Returns: | |
| Upload system summary | |
| """ | |
| try: | |
| upload_stats = self.document_service.get_upload_stats() | |
| queue_status = self.processing_service.get_queue_status() | |
| return { | |
| "upload_stats": upload_stats, | |
| "processing_queue": queue_status, | |
| "service_status": { | |
| "document_service": "active", | |
| "processing_service": ("active" if queue_status["service_running"] else "inactive"), | |
| }, | |
| } | |
| except Exception as e: | |
| logging.error(f"Error getting upload summary: {e}") | |
| return {"error": str(e)} | |
| def validate_batch_upload(self, files: List[FileStorage]) -> Tuple[List[FileStorage], List[str]]: | |
| """ | |
| Validate a batch of files before upload. | |
| Args: | |
| files: List of file objects | |
| Returns: | |
| Tuple of (valid_files, error_messages) | |
| """ | |
| valid_files = [] | |
| errors = [] | |
| if len(files) > self.document_service.max_batch_size: | |
| max_batch = self.document_service.max_batch_size | |
| errors.append(f"Too many files: {len(files)} (max: {max_batch})") | |
| return [], errors | |
| total_size = 0 | |
| for file_obj in files: | |
| if not file_obj or not file_obj.filename: | |
| errors.append("Empty file or missing filename") | |
| continue | |
| # Get file size | |
| file_obj.seek(0, 2) | |
| file_size = file_obj.tell() | |
| file_obj.seek(0) | |
| total_size += file_size | |
| # Validate individual file | |
| validation = self.document_service.validate_file(file_obj.filename, file_size) | |
| if validation["valid"]: | |
| valid_files.append(file_obj) | |
| else: | |
| errors.extend([f"{file_obj.filename}: {error}" for error in validation["errors"]]) | |
| # Check total batch size | |
| max_total_size = self.document_service.max_file_size * len(files) | |
| if total_size > max_total_size: | |
| errors.append(f"Total batch size too large: {total_size} bytes") | |
| return valid_files, errors | |