Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| import torch | |
| from transformers import pipeline | |
| from sentence_transformers import SentenceTransformer, util | |
| import concurrent.futures | |
| import time | |
| import sys | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from transformers import AutoTokenizer, AutoModel | |
| import numpy as np | |
| from scipy import stats | |
| from PyDictionary import PyDictionary | |
| import matplotlib.pyplot as plt | |
| from scipy import stats | |
| import litellm | |
| import re | |
| import sentencepiece | |
| import random | |
| def score_with_llm(title, topic, llm_model): | |
| prompt = f"""Evaluate the relevance of the following article to the topic '{topic}'. | |
| Article title: {title} | |
| Give a final relevance score between 0 and 1, where 1 is very relevant and 0 is not relevant at all. | |
| Respond only with a number between 0 and 1.""" | |
| try: | |
| response = litellm.completion( | |
| model=llm_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=10 | |
| ) | |
| score_match = re.search(r'\d+(\.\d+)?', response.choices[0].message.content.strip()) | |
| if score_match: | |
| score = float(score_match.group()) | |
| print(f"Score LLM : {score}") | |
| return max(0, min(score, 1)) | |
| else: | |
| print(f"Could not extract a score from LLM response: {response.choices[0].message.content}") | |
| return None | |
| except Exception as e: | |
| print(f"Error in scoring with LLM {llm_model}: {str(e)}") | |
| return None | |
| def expand_keywords_llm(keyword, max_synonyms=3, llm_model="ollama/qwen2"): | |
| prompt = f"""Please provide up to {max_synonyms} synonyms or closely related terms for the word or phrase: "{keyword}". | |
| Return only the list of synonyms, separated by commas, without any additional explanation.""" | |
| try: | |
| response = litellm.completion( | |
| model=llm_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=50 | |
| ) | |
| synonyms = [s.strip() for s in response.choices[0].message.content.split(',')] | |
| return [keyword] + synonyms[:max_synonyms] | |
| except Exception as e: | |
| print(f"Error in expanding keywords with LLM {llm_model}: {str(e)}") | |
| return [keyword] | |
| # Fonction pour obtenir les liens de la page d'accueil | |
| def get_homepage_links(url): | |
| response = requests.get(url) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| links = soup.find_all('a', href=True) | |
| return [(link.text.strip(), link['href']) for link in links if link.text.strip()] | |
| # Fonction pour obtenir le contenu d'un article | |
| def get_article_content(url): | |
| try: | |
| print(f"Récupération du contenu de : {url}") | |
| response = requests.get(url) | |
| print(f"Taille de la réponse HTTP : {len(response.content)} octets") # Affiche le nombre d'octets de la réponse HTTP | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| print(f"Taille de l'objet soup : {sys.getsizeof(soup)} octets") # Affiche la taille en mémoire de l'objet soup | |
| article = soup.find('article') | |
| if article: | |
| paragraphs = article.find_all('p') | |
| content = ' '.join([p.text for p in paragraphs]) | |
| print(f"Paragraphes récupéré : {len(content)} caractères") | |
| return content | |
| print("Aucun contenu d'article trouvé") | |
| return "" | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération du contenu : {str(e)}") | |
| return "" | |
| # Fonction pour l'analyse zero-shot | |
| def zero_shot_analysis(text, topic, classifier): | |
| if not text: | |
| print("Texte vide pour l'analyse zero-shot") | |
| return 0.0 | |
| result = classifier(text, candidate_labels=[topic, f"not {topic}"], multi_label=False) | |
| print(f"Score zero-shot : {result['scores'][0]}") | |
| return result['scores'][0] | |
| # Fonction pour l'analyse par embeddings | |
| def embedding_analysis(text, topic_embedding, model): | |
| if not text: | |
| print("Texte vide pour l'analyse par embeddings") | |
| return 0.0 | |
| text_embedding = model.encode([text], convert_to_tensor=True) | |
| similarity = util.pytorch_cos_sim(text_embedding, topic_embedding).item() | |
| print(f"Score embedding : {similarity}") | |
| return similarity | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| #import nltk | |
| #from nltk.corpus import wordnet | |
| #nltk.download('wordnet') | |
| def preprocess_text(text): | |
| # Tokenize the text | |
| tokens = text.lower().split() | |
| # Expand each token with its synonyms | |
| expanded_tokens = [] | |
| for token in tokens: | |
| synonyms = set() | |
| for syn in wordnet.synsets(token): | |
| for lemma in syn.lemmas(): | |
| synonyms.add(lemma.name()) | |
| expanded_tokens.extend(list(synonyms)) | |
| return ' '.join(expanded_tokens) | |
| def improved_tfidf_similarity(texts, query): | |
| # Preprocess texts and query | |
| preprocessed_texts = [preprocess_text(text) for text in texts] | |
| preprocessed_query = preprocess_text(query) | |
| # Combine texts and query for vectorization | |
| all_texts = preprocessed_texts + [preprocessed_query] | |
| # Use TfidfVectorizer with custom parameters | |
| vectorizer = TfidfVectorizer(ngram_range=(1, 2), min_df=1, smooth_idf=True) | |
| tfidf_matrix = vectorizer.fit_transform(all_texts) | |
| # Calculate cosine similarity | |
| cosine_similarities = cosine_similarity(tfidf_matrix[-1], tfidf_matrix[:-1]).flatten() | |
| # Normalize similarities to avoid zero scores | |
| normalized_similarities = (cosine_similarities - cosine_similarities.min()) / (cosine_similarities.max() - cosine_similarities.min()) | |
| return normalized_similarities | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| def improved_tfidf_similarity_v2(texts, query): | |
| # Combine texts and query, treating each word or phrase as a separate document | |
| all_docs = [word.strip() for text in texts for word in text.split(',')] + [word.strip() for word in query.split(',')] | |
| # Create TF-IDF matrix | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(all_docs) | |
| # Calculate document vectors by summing the TF-IDF vectors of their words | |
| doc_vectors = [] | |
| query_vector = np.zeros((1, tfidf_matrix.shape[1])) | |
| current_doc = 0 | |
| for i, doc in enumerate(all_docs): | |
| if i < len(all_docs) - len(query.split(',')): # If it's part of the texts | |
| if current_doc == len(texts): | |
| break | |
| if doc in texts[current_doc]: | |
| doc_vectors.append(tfidf_matrix[i].toarray()) | |
| else: | |
| current_doc += 1 | |
| doc_vectors.append(tfidf_matrix[i].toarray()) | |
| else: # If it's part of the query | |
| query_vector += tfidf_matrix[i].toarray() | |
| doc_vectors = np.array([np.sum(doc, axis=0) for doc in doc_vectors]) | |
| # Calculate cosine similarity | |
| similarities = cosine_similarity(query_vector, doc_vectors).flatten() | |
| # Normalize similarities to avoid zero scores | |
| normalized_similarities = (similarities - similarities.min()) / (similarities.max() - similarities.min() + 1e-8) | |
| return normalized_similarities | |
| # Example usage: | |
| # texts = ["longevity, health, aging", "computer science, AI"] | |
| # query = "longevity, life extension, anti-aging" | |
| # results = improved_tfidf_similarity_v2(texts, query) | |
| # print(results) | |
| # Nouvelles fonctions | |
| def tfidf_similarity(texts, query): | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(texts + [query]) | |
| cosine_similarities = cosine_similarity(tfidf_matrix[-1], tfidf_matrix[:-1]).flatten() | |
| return cosine_similarities | |
| def bert_similarity(texts, query, model_name='bert-base-uncased'): | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModel.from_pretrained(model_name) | |
| def get_embedding(text): | |
| inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=512) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| return outputs.last_hidden_state.mean(dim=1).squeeze().numpy() | |
| query_embedding = get_embedding(query) | |
| text_embeddings = [get_embedding(text) for text in texts] | |
| similarities = [cosine_similarity([query_embedding], [text_embedding])[0][0] for text_embedding in text_embeddings] | |
| return similarities | |
| # Fonction principale d'analyse modifiée | |
| def analyze_link(title, link, topic, zero_shot_classifiers, embedding_models, expanded_query, llm_models, testcontent): | |
| print(f"\nAnalyse de : {title}") | |
| results = { | |
| "Titre": title, | |
| #"TF-IDF (titre)": improved_tfidf_similarity_v2([title], expanded_query)[0], | |
| #"BERT (titre)": bert_similarity([title], expanded_query)[0], | |
| } | |
| # Zero-shot analysis | |
| for name, classifier in zero_shot_classifiers.items(): | |
| results[f"Zero-shot (titre) - {name}"] = zero_shot_analysis(title, topic, classifier) | |
| # Embedding analysis | |
| for name, model in embedding_models.items(): | |
| topic_embedding = model.encode([expanded_query], convert_to_tensor=True) | |
| results[f"Embeddings (titre) - {name}"] = embedding_analysis(title, topic_embedding, model) | |
| # LLM analysis | |
| for model in llm_models: | |
| results[f"LLM Score - {model}"] = score_with_llm(title, topic, model) | |
| if testcontent: | |
| content = get_article_content(link) | |
| #results["TF-IDF (contenu)"] = improved_tfidf_similarity_v2([content], expanded_query)[0] | |
| #results["BERT (contenu)"]= bert_similarity([content], expanded_query)[0] | |
| # Zero-shot analysis | |
| for name, classifier in zero_shot_classifiers.items(): | |
| results[f"Zero-shot (contenu) - {name}"] = zero_shot_analysis(content, topic, classifier) | |
| # Embedding analysis | |
| for name, model in embedding_models.items(): | |
| topic_embedding = model.encode([expanded_query], convert_to_tensor=True) | |
| results[f"Embeddings (contenu) - {name}"] = embedding_analysis(content, topic_embedding, model) | |
| # LLM analysis | |
| for model in llm_models: | |
| results[f"LLM Content Score - {model}"] = score_with_llm(content, topic, model) | |
| return results | |
| from scipy import stats | |
| def evaluate_ranking(reference_data_valid, reference_data_rejected, method_scores, threshold, silent): | |
| simple_score = 0 | |
| true_positives = 0 | |
| false_positives = 0 | |
| true_negatives = 0 | |
| false_negatives = 0 | |
| # Créer une liste de tous les éléments avec leur statut (1 pour valide, 0 pour rejeté) | |
| all_items = [(item, 1) for item in reference_data_valid] + [(item, 0) for item in reference_data_rejected] | |
| # Trier les éléments selon leur score dans la méthode | |
| all_items_temp = all_items.copy() | |
| # correct false positive if method spit out same score for all | |
| #random.shuffle(all_items_temp) | |
| all_items_temp.reverse() | |
| sorted_method = sorted([(item, method_scores.get(item, 0)) for item, _ in all_items_temp], | |
| key=lambda x: x[1], reverse=True) | |
| # Créer des listes pour le calcul de la corrélation de Spearman | |
| reference_ranks = [] | |
| method_ranks = [] | |
| for i, (item, status) in enumerate(all_items): | |
| method_score = method_scores.get(item, 0) | |
| method_rank = next(j for j, (it, score) in enumerate(sorted_method) if it == item) | |
| reference_ranks.append(i) | |
| method_ranks.append(method_rank) | |
| if status == 1: # Item valide | |
| if method_score >= threshold: | |
| simple_score += 1 | |
| true_positives += 1 | |
| else: | |
| simple_score -= 1 | |
| false_negatives += 1 | |
| else: # Item rejeté | |
| if method_score < threshold: | |
| simple_score += 1 | |
| true_negatives += 1 | |
| else: | |
| simple_score -= 1 | |
| false_positives += 1 | |
| # Calculer le coefficient de corrélation de Spearman | |
| if not silent: | |
| print("+++") | |
| print(reference_ranks) | |
| print("---") | |
| print(method_ranks) | |
| spearman_corr, _ = stats.spearmanr(reference_ranks, method_ranks) | |
| # Calculer la précision, le rappel et le F1-score | |
| precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0 | |
| recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0 | |
| f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 | |
| return { | |
| "simple_score": simple_score, | |
| "spearman_correlation": spearman_corr, | |
| "precision": precision, | |
| "recall": recall, | |
| "f1_score": f1_score, | |
| } | |
| def find_optimal_threshold(reference_data_valid, reference_data_rejected, method_scores): | |
| best_score = float('-inf') | |
| best_threshold = 0 | |
| for threshold in np.arange(0, 1.05, 0.05): | |
| result = evaluate_ranking( | |
| reference_data_valid, | |
| reference_data_rejected, | |
| method_scores, | |
| threshold, True | |
| ) | |
| if result['simple_score'] > best_score: | |
| best_score = result['simple_score'] | |
| best_threshold = threshold | |
| return best_threshold | |
| def reset_cuda_context(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.set_device(torch.cuda.current_device()) | |
| torch.cuda.synchronize() | |
| import gc | |
| def clear_models(): | |
| global zero_shot_classifiers, embedding_models_dict, bert_models, tfidf_objects | |
| for classifier in zero_shot_classifiers.values(): | |
| del classifier | |
| zero_shot_classifiers.clear() | |
| for model in embedding_models_dict.values(): | |
| del model | |
| embedding_models_dict.clear() | |
| for model in bert_models: | |
| del model | |
| bert_models.clear() | |
| for vectorizer in tfidf_objects: | |
| del vectorizer | |
| tfidf_objects.clear() | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def clear_globals(): | |
| for name in list(globals()): | |
| if isinstance(globals()[name], (torch.nn.Module, torch.Tensor)): | |
| del globals()[name] | |
| def release_vram(zero_shot_classifiers, embedding_models, bert_models, tfidf_objects): | |
| # Supprimer les objets zero-shot classifiers | |
| for model in zero_shot_classifiers.values(): | |
| del model | |
| # Supprimer les objets embedding models | |
| for model in embedding_models.values(): | |
| del model | |
| # Supprimer les objets bert models | |
| for model in bert_models: | |
| del model | |
| # Supprimer les objets tfidf objects | |
| for obj in tfidf_objects: | |
| del obj | |
| # Vider le cache de la mémoire GPU | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() | |
| gc.collect() | |
| clear_globals() | |
| reset_cuda_context() | |
| def load_finetuned_model(model_path): | |
| checkpoint = torch.load(model_path) | |
| base_model = AutoModel.from_pretrained(checkpoint['base_model_name']) | |
| model = EmbeddingModel(base_model) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| return model | |