Spaces:
Sleeping
Sleeping
| import logging | |
| import time | |
| import traceback | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from litellm import OpenAI | |
| from classifiers import TFIDFClassifier, LLMClassifier | |
| from utils import load_data, validate_results | |
| def update_api_key(api_key): | |
| """Update the OpenAI API key""" | |
| global OPENAI_API_KEY, client | |
| if not api_key: | |
| return "API Key cannot be empty" | |
| OPENAI_API_KEY = api_key | |
| try: | |
| client = OpenAI(api_key=api_key) | |
| # Test the connection with a simple request | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[{"role": "user", "content": "test"}], | |
| max_tokens=5, | |
| ) | |
| return f"API Key updated and verified successfully" | |
| except Exception as e: | |
| error_msg = str(e) | |
| logging.error(f"API key update failed: {error_msg}") | |
| return f"Failed to update API Key: {error_msg}" | |
| def process_file(file, text_columns, categories, classifier_type, show_explanations): | |
| """Process the uploaded file and classify text data""" | |
| # Initialize result_df and validation_report | |
| result_df = None | |
| validation_report = None | |
| try: | |
| # Load data from file | |
| if isinstance(file, str): | |
| df = load_data(file) | |
| else: | |
| df = load_data(file.name) | |
| if not text_columns: | |
| return None, "Please select at least one text column" | |
| # Check if all selected columns exist | |
| missing_columns = [col for col in text_columns if col not in df.columns] | |
| if missing_columns: | |
| return ( | |
| None, | |
| f"Columns not found in the file: {', '.join(missing_columns)}. Available columns: {', '.join(df.columns)}", | |
| ) | |
| # Combine text from selected columns | |
| texts = [] | |
| for _, row in df.iterrows(): | |
| combined_text = " ".join(str(row[col]) for col in text_columns) | |
| texts.append(combined_text) | |
| # Parse categories if provided | |
| category_list = [] | |
| if categories: | |
| category_list = [cat.strip() for cat in categories.split(",")] | |
| # Select classifier based on data size and user choice | |
| num_texts = len(texts) | |
| # If no specific model is chosen, select the most appropriate one | |
| if classifier_type == "auto": | |
| if num_texts <= 500: | |
| classifier_type = "gpt4" | |
| elif num_texts <= 1000: | |
| classifier_type = "gpt35" | |
| elif num_texts <= 5000: | |
| classifier_type = "hybrid" | |
| else: | |
| classifier_type = "tfidf" | |
| # Initialize appropriate classifier | |
| if classifier_type == "tfidf": | |
| classifier = TFIDFClassifier() | |
| results = classifier.classify(texts, category_list) | |
| elif classifier_type in ["gpt35", "gpt4"]: | |
| if client is None: | |
| return ( | |
| None, | |
| "Erreur : Le client API n'est pas initialisé. Veuillez configurer une clé API valide dans l'onglet 'Setup'.", | |
| ) | |
| model = "gpt-3.5-turbo" if classifier_type == "gpt35" else "gpt-4" | |
| classifier = LLMClassifier(client=client, model=model) | |
| results = classifier.classify(texts, category_list) | |
| else: # hybrid | |
| if client is None: | |
| return ( | |
| None, | |
| "Erreur : Le client API n'est pas initialisé. Veuillez configurer une clé API valide dans l'onglet 'Setup'.", | |
| ) | |
| # First pass with TF-IDF | |
| tfidf_classifier = TFIDFClassifier() | |
| tfidf_results = tfidf_classifier.classify(texts, category_list) | |
| # Second pass with LLM for low confidence results | |
| llm_classifier = LLMClassifier(client=client, model="gpt-3.5-turbo") | |
| results = [] | |
| low_confidence_texts = [] | |
| low_confidence_indices = [] | |
| for i, (text, tfidf_result) in enumerate(zip(texts, tfidf_results)): | |
| if tfidf_result["confidence"] < 70: # If confidence is below 70% | |
| low_confidence_texts.append(text) | |
| low_confidence_indices.append(i) | |
| results.append(None) # Placeholder | |
| else: | |
| results.append(tfidf_result) | |
| if low_confidence_texts: | |
| llm_results = llm_classifier.classify( | |
| low_confidence_texts, category_list | |
| ) | |
| for idx, llm_result in zip(low_confidence_indices, llm_results): | |
| results[idx] = llm_result | |
| # Create results dataframe | |
| result_df = df.copy() | |
| result_df["Category"] = [r["category"] for r in results] | |
| result_df["Confidence"] = [r["confidence"] for r in results] | |
| if show_explanations: | |
| result_df["Explanation"] = [r["explanation"] for r in results] | |
| # Validate results using LLM | |
| validation_report = validate_results(result_df, text_columns, client) | |
| return result_df, validation_report | |
| except Exception as e: | |
| error_traceback = traceback.format_exc() | |
| return None, f"Error: {str(e)}\n{error_traceback}" | |
| def export_results(df, format_type): | |
| """Export results to a file and return the file path for download""" | |
| if df is None: | |
| return None | |
| # Create a temporary file | |
| import tempfile | |
| import os | |
| # Create a temporary directory if it doesn't exist | |
| temp_dir = "temp_exports" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Generate a unique filename | |
| timestamp = time.strftime("%Y%m%d-%H%M%S") | |
| filename = f"classification_results_{timestamp}" | |
| if format_type == "excel": | |
| file_path = os.path.join(temp_dir, f"{filename}.xlsx") | |
| df.to_excel(file_path, index=False) | |
| else: | |
| file_path = os.path.join(temp_dir, f"{filename}.csv") | |
| df.to_csv(file_path, index=False) | |
| return file_path | |