Spaces:
Paused
Paused
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments | |
| import torch | |
| from torch.utils.data import Dataset | |
| from torch.utils.data import DataLoader | |
| from transformers import RobertaTokenizer, RobertaForSequenceClassification | |
| import pandas as pd | |
| #from sklearn.linear_model import LogisticRegression | |
| #from sklearn.metrics import accuracy_score, confusion_matrix | |
| #import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| #import numpy as np | |
| import sys | |
| import torch.nn.functional as F | |
| #from torch.nn import CrossEntropyLoss | |
| #from sklearn.decomposition import PCA | |
| import matplotlib.pyplot as plt | |
| import json | |
| import gradio as gr | |
| # Load configuration file | |
| with open('config.json', 'r') as config_file: | |
| config = json.load(config_file) | |
| num_args = len(config) | |
| arg1 = config.get('arg1', 'default_value1') | |
| arg2 = config.get('arg2', 'default_value2') | |
| print(f"Argument 1: {arg1}") | |
| print(f"Argument 2: {arg2}") | |
| print(f"Total argument size: {num_args}") | |
| if num_args > 1: | |
| # sys.argv[0] is the script name, sys.argv[1] is the first argument, etc. | |
| runModel = arg1 | |
| print(f"Passed value: {runModel}") | |
| print (arg2) | |
| else: | |
| print("No argument was passed.") | |
| device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') | |
| modelNameToUse = arg2 | |
| if (runModel=='1'): | |
| dataFileName = arg2 + '.csv' | |
| print (dataFileName) | |
| # Load the data from the CSV file | |
| df = pd.read_csv(dataFileName) | |
| # Access the text and labels | |
| texts = df['text'].tolist() | |
| labels = df['label'].tolist() | |
| print('Train Model') | |
| # Encode the labels | |
| sorted_labels = sorted(df['label'].unique()) | |
| label_mapping = {label: i for i, label in enumerate(sorted_labels)} | |
| df['label'] = df['label'].map(label_mapping) | |
| print(df['label']) | |
| # Train/test split | |
| train_df, test_df = train_test_split(df, test_size=0.2, random_state=42) | |
| # Tokenization | |
| tokenizer = RobertaTokenizer.from_pretrained('roberta-base') | |
| # Model and training setup | |
| model = RobertaForSequenceClassification.from_pretrained('roberta-base', output_attentions=True, num_labels=len(label_mapping)).to('cpu') | |
| model.resize_token_embeddings(len(tokenizer)) | |
| train_encodings = tokenizer(list(train_df['text']), truncation=True, padding=True, max_length=64) | |
| test_encodings = tokenizer(list(test_df['text']), truncation=True, padding=True, max_length=64) | |
| # Dataset class | |
| class IntentDataset(Dataset): | |
| def __init__(self, encodings, labels): | |
| self.encodings = encodings | |
| self.labels = labels | |
| def __getitem__(self, idx): | |
| item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} | |
| label = self.labels[idx] | |
| item['labels'] = torch.tensor(self.labels[idx]) | |
| return item | |
| def __len__(self): | |
| return len(self.labels) | |
| train_dataset = IntentDataset(train_encodings, list(train_df['label'])) | |
| test_dataset = IntentDataset(test_encodings, list(test_df['label'])) | |
| # Create an instance of the custom loss function | |
| training_args = TrainingArguments( | |
| output_dir='./results_' + modelNameToUse, | |
| num_train_epochs=2, | |
| per_device_train_batch_size=2, | |
| per_device_eval_batch_size=2, | |
| warmup_steps=500, | |
| weight_decay=0.02, | |
| logging_dir='./logs_' + modelNameToUse, | |
| logging_steps=10, | |
| evaluation_strategy="epoch", | |
| ) | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=train_dataset, | |
| eval_dataset=test_dataset | |
| ) | |
| # Train the model | |
| trainer.train() | |
| # Evaluate the model | |
| trainer.evaluate() | |
| label_mapping = { | |
| 0: "lastmonth", | |
| 1: "nextweek", | |
| 2: "sevendays", | |
| 3: "today", | |
| 4: "tomorrow", | |
| 5: "yesterday" | |
| } | |
| def evaluate_and_report_errors(model, dataloader, tokenizer): | |
| model.eval() | |
| incorrect_predictions = [] | |
| with torch.no_grad(): | |
| #print(dataloader) | |
| for batch in dataloader: | |
| input_ids = batch['input_ids'].to(device) | |
| attention_mask = batch['attention_mask'].to(device) | |
| labels = batch['labels'].to(device) | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| logits = outputs.logits | |
| predictions = torch.argmax(logits, dim=1) | |
| for i, prediction in enumerate(predictions): | |
| if prediction != labels[i]: | |
| incorrect_predictions.append({ | |
| "prompt": tokenizer.decode(input_ids[i], skip_special_tokens=True), | |
| "predicted": prediction.item(), | |
| "actual": labels[i].item() | |
| }) | |
| # Print incorrect predictions | |
| if incorrect_predictions: | |
| print("\nIncorrect Predictions:") | |
| for error in incorrect_predictions: | |
| print(f"Sentence: {error['prompt']}") | |
| #print(f"Predicted Label: {GetCategoryFromCategoryLong(error['predicted'])} | Actual Label: {GetCategoryFromCategoryLong(error['actual'])}\n") | |
| print(f"Predicted Label: {label_mapping[error['predicted']]} | Actual Label: {label_mapping[error['actual']]}\n") | |
| #print(f"Predicted Label: {error['predicted']} | Actual Label: {label_mapping[error['actual']]}\n") | |
| else: | |
| print("\nNo incorrect predictions found.") | |
| train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True) | |
| evaluate_and_report_errors(model,train_dataloader, tokenizer) | |
| # Save the model and tokenizer | |
| model.save_pretrained('./' + modelNameToUse + '_model') | |
| tokenizer.save_pretrained('./' + modelNameToUse + '_tokenizer') | |
| else: | |
| print('Load Pre-trained') | |
| model_save_path = "./" + modelNameToUse + "_model" | |
| tokenizer_save_path = "./" + modelNameToUse + "_tokenizer" | |
| # RobertaTokenizer.from_pretrained(model_save_path) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_save_path).to('cpu') | |
| tokenizer = AutoTokenizer.from_pretrained(tokenizer_save_path) | |
| #Define the label mappings (this must match the mapping used during training) | |
| label_mapping = { | |
| 0: "lastmonth", | |
| 1: "nextweek", | |
| 2: "sevendays", | |
| 3: "today", | |
| 4: "tomorrow", | |
| 5: "yesterday" | |
| } | |
| #Function to classify user input | |
| def classifyTimeFrame(user_input): | |
| # Tokenize and predict | |
| input_encoding = tokenizer(user_input, padding=True, truncation=True, return_tensors="pt").to('cpu') | |
| with torch.no_grad(): | |
| attention_mask = input_encoding['attention_mask'].clone() | |
| # Modify the attention mask to emphasize certain key tokens | |
| # for idx, token_id in enumerate(input_encoding['input_ids'][0]): | |
| # word = tokenizer.decode([token_id]) | |
| # print(word) | |
| # if word.strip() in ["now", "same", "continue", "again", "also"]: # Target key tokens | |
| # attention_mask[0, idx] = 3 # Increase attention weight for these words | |
| # else: | |
| # attention_mask[0, idx] = 0 | |
| # print (attention_mask) | |
| # input_encoding['attention_mask'] = attention_mask | |
| # print (input_encoding) | |
| output = model(**input_encoding, output_hidden_states=True) | |
| probabilities = F.softmax(output.logits, dim=-1) | |
| prediction = torch.argmax(output.logits, dim=1).cpu().numpy() | |
| # Map prediction back to label | |
| print(prediction) | |
| predicted_label = label_mapping[prediction[0]] | |
| print(f"Predicted intent: {predicted_label}\n") | |
| # Print the confidence for each label | |
| print("\nLabel Confidence Scores:") | |
| for i, label in label_mapping.items(): | |
| confidence = probabilities[0][i].item() # Get confidence score for each label | |
| print(f"{label}: {confidence:.4f}") | |
| print("\n") | |
| iface = gr.Interface(fn=classifyTimeFrame, inputs="text", outputs="text") | |
| iface.launch(share=True) | |
| #Run the function | |
| #classifyTimeFrame() | |