Spaces:
Runtime error
Runtime error
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| import pickle | |
| import torch | |
| from transformers import PegasusTokenizer, PegasusForConditionalGeneration | |
| import tensorflow as tf | |
| from tensorflow.python.lib.io import file_io | |
| from nltk.tokenize import sent_tokenize | |
| import io | |
| #contents = pickle.load(f) becomes... | |
| #contents = CPU_Unpickler(f).load() | |
| model_path = "finbert.sav" | |
| #load model from drive | |
| with open(model_path, "rb") as f: | |
| model1= pickle.load(f) | |
| tf.compat.v1.disable_eager_execution() | |
| # Let's load the model and the tokenizer | |
| model_name = "human-centered-summarization/financial-summarization-pegasus" | |
| tokenizer = PegasusTokenizer.from_pretrained(model_name) | |
| model2 = PegasusForConditionalGeneration.from_pretrained(model_name) | |
| #tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
| #model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
| import nltk | |
| from finbert_embedding.embedding import FinbertEmbedding | |
| import pandas as pd | |
| from nltk.cluster import KMeansClusterer | |
| import numpy as np | |
| import os | |
| from scipy.spatial import distance_matrix | |
| from tensorflow.python.lib.io import file_io | |
| import pickle | |
| nltk.download('punkt') | |
| def finbert(word): | |
| # Instantiate path to store each text Datafile in dataframe | |
| data_path = "/tmp/" | |
| if not os.path.exists(data_path): | |
| os.makedirs(data_path) | |
| input_ = "/tmp/input.txt" | |
| # Write file to disk so we can convert each datapoint to a txt file | |
| with open(input_, "w") as file: | |
| file.write(word) | |
| # read the written txt into a variable to start clustering | |
| with open(input_ , 'r') as f: | |
| text = f.read() | |
| # Create tokens from the txt file | |
| tokens = nltk.sent_tokenize(text) | |
| # Strip out trailing and leading white spaces from tokens | |
| sentences = [word.strip() for word in tokens] | |
| #Create a DataFrame from the tokens | |
| data = pd.DataFrame(sentences) | |
| # Assign name Sentences to the column containing text tokens | |
| data.columns = ['Sentences'] | |
| # Function to create numerical embeddings for each text tokens in dataframe | |
| def get_sentence_embeddings(): | |
| # Create empty list for sentence embeddings | |
| sentence_list = [] | |
| # Loop through all sentences and append sentence embeddings to list | |
| for i in tokens: | |
| sentence_embedding = model1.sentence_vector(i) | |
| sentence_list.append(sentence_embedding) | |
| # Create empty list for ndarray | |
| sentence_array=[] | |
| # Loop through sentence list and change data type from tensor to array | |
| for i in sentence_list: | |
| sentence_array.append(i.numpy()) | |
| # return sentence embeddings as list | |
| return sentence_array | |
| # Apply get_sentence_embeddings to dataframe to create column Embeddings | |
| data['Embeddings'] = get_sentence_embeddings() | |
| #Number of expected sentences | |
| NUM_CLUSTERS = 10 | |
| iterations = 8 | |
| # Convert Embeddings into an array and store in variable X | |
| X = np.array(data['Embeddings'].to_list()) | |
| #Build k-means cluster algorithm | |
| Kclusterer = KMeansClusterer( | |
| NUM_CLUSTERS, | |
| distance = nltk.cluster.util.cosine_distance, | |
| repeats = iterations, avoid_empty_clusters = True) | |
| # if length of text is too short, K means would return an error | |
| # use the try except block to return the text as result if it is too short. | |
| try: | |
| assigned_clusters = Kclusterer.cluster(X,assign_clusters=True) | |
| # Apply Kmean Cluster to DataFrame and create new columns Clusters and Centroid | |
| data['Cluster'] = pd.Series(assigned_clusters, index = data.index) | |
| data['Centroid'] = data['Cluster'].apply(lambda x: Kclusterer.means()[x]) | |
| # return the text if clustering algorithm catches an exceptiona and move to the next text file | |
| except ValueError: | |
| return text | |
| # function that computes the distance of each embeddings from the centroid of the cluster | |
| def distance_from_centroid(row): | |
| return distance_matrix([row['Embeddings']], [row['Centroid'].tolist()])[0][0] | |
| # apply distance_from_centroid function to data | |
| data['Distance_From_Centroid'] = data.apply(distance_from_centroid, axis =1) | |
| ## Return Final Summary | |
| summary = " ".join(data.sort_values( | |
| 'Distance_From_Centroid', | |
| ascending = True).groupby('Cluster').head(1).sort_index()['Sentences'].tolist()) | |
| import re | |
| words = list() | |
| for text in summary.split(): | |
| text = re.sub(r'\n','',text) | |
| text = re.sub(r'\s$','',text) | |
| words.append(text) | |
| summary = " ".join(words) | |
| return (summary," Length of Input:---->"+str(len(word))," Length of Output:----> "+str(len(summary))) | |
| def pegasus(text): | |
| '''A function to obtain summaries for each tokenized sentence. | |
| It returns a summarized document as output''' | |
| import nltk | |
| nltk.download('punkt') | |
| import os | |
| data_path = "/tmp/" | |
| if not os.path.exists(data_path): | |
| os.makedirs(data_path) | |
| input_ = "/tmp/input.txt" | |
| with open(input_, "w") as file: | |
| file.write(text) | |
| # read the written txt into a variable | |
| with open(input_ , 'r') as f: | |
| text_ = f.read() | |
| def tokenized_sentences(file): | |
| '''A function to generate chunks of sentences and texts. | |
| Returns tokenized texts''' | |
| # Create empty arrays | |
| tokenized_sentences = [] | |
| sentences = [] | |
| length = 0 | |
| for sentence in sent_tokenize(file): | |
| length += len(sentence) | |
| # 512 is the maximum input length for the Pegasus model | |
| if length < 512: | |
| sentences.append(sentence) | |
| else: | |
| tokenized_sentences.append(sentences) | |
| sentences = [sentence] | |
| length = len(sentence) | |
| sentences = [sentence.strip() for sentence in sentences] | |
| # Append all tokenized sentences | |
| if sentences: | |
| tokenized_sentences.append(sentences) | |
| return tokenized_sentences | |
| tokenized = tokenized_sentences(text_) | |
| # Use GPU if available | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| global summary | |
| # Create an empty array for all summaries | |
| summary = [] | |
| # Loop to encode tokens, to generate abstractive summary and finally decode tokens | |
| for token in tokenized: | |
| # Encoding | |
| inputs = tokenizer.encode(' '.join(token), truncation=True, return_tensors='pt') | |
| # Use CPU or GPU | |
| inputs = inputs.to(device) | |
| # Get summaries from transformer model | |
| all_summary = model2.to(device).generate(inputs,do_sample=True, | |
| max_length=50, top_k=50, top_p=0.95, | |
| num_beams = 5, early_stopping=True) | |
| # num_return_sequences=5) | |
| # length_penalty=0.2, no_repeat_ngram_size=2 | |
| # min_length=10, | |
| # max_length=50) | |
| # Decoding | |
| output = [tokenizer.decode(each_summary, skip_special_tokens=True, clean_up_tokenization_spaces=False) for each_summary in all_summary] | |
| # Append each output to array | |
| summary.append(output) | |
| # Get final summary | |
| summary = [sentence for each in summary for sentence in each] | |
| final = "".join(summary) | |
| return final | |
| import gradio as gr | |
| interface1 = gr.Interface(fn=finbert, | |
| inputs =gr.inputs.Textbox(lines=15,placeholder="Enter your text !!",label='Input-10k Sections'), | |
| outputs=gr.outputs.Textbox(label='Output')).launch() | |