Spaces:
Runtime error
Runtime error
| import copy | |
| import logging | |
| import numpy as np | |
| import torch | |
| from torch import nn | |
| from torch.utils.data import DataLoader | |
| from utils.toolkit import tensor2numpy, accuracy | |
| from scipy.spatial.distance import cdist | |
| import os | |
| EPSILON = 1e-8 | |
| batch_size = 64 | |
| class BaseLearner(object): | |
| def __init__(self, args): | |
| self.args = args | |
| self._cur_task = -1 | |
| self._known_classes = 0 | |
| self._total_classes = 0 | |
| self.class_list = [] | |
| self._network = None | |
| self._old_network = None | |
| self._data_memory, self._targets_memory = np.array([]), np.array([]) | |
| self.topk = 5 | |
| self._memory_size = args["memory_size"] | |
| self._memory_per_class = args.get("memory_per_class", None) | |
| self._fixed_memory = args.get("fixed_memory", False) | |
| self._device = args["device"][0] | |
| self._multiple_gpus = args["device"] | |
| def exemplar_size(self): | |
| assert len(self._data_memory) == len( | |
| self._targets_memory | |
| ), "Exemplar size error." | |
| return len(self._targets_memory) | |
| def samples_per_class(self): | |
| if self._fixed_memory: | |
| return self._memory_per_class | |
| else: | |
| assert self._total_classes != 0, "Total classes is 0" | |
| return self._memory_size // self._total_classes | |
| def feature_dim(self): | |
| if isinstance(self._network, nn.DataParallel): | |
| return self._network.module.feature_dim | |
| else: | |
| return self._network.feature_dim | |
| def build_rehearsal_memory(self, data_manager, per_class, ): | |
| if self._fixed_memory: | |
| self._construct_exemplar_unified(data_manager, per_class) | |
| else: | |
| self._reduce_exemplar(data_manager, per_class) | |
| self._construct_exemplar(data_manager, per_class) | |
| def load_checkpoint(self, filename): | |
| pass; | |
| def save_checkpoint(self, filename): | |
| self._network.cpu() | |
| save_dict = { | |
| "tasks": self._cur_task, | |
| "model_state_dict": self._network.state_dict(), | |
| } | |
| torch.save(save_dict, "./{}/{}_{}.pkl".format(filename, self.args['model_name'], self._cur_task)) | |
| def after_task(self): | |
| pass | |
| def _evaluate(self, y_pred, y_true, group = 10): | |
| ret = {} | |
| grouped = accuracy(y_pred.T[0], y_true, self._known_classes, increment = group) | |
| ret["grouped"] = grouped | |
| ret["top1"] = grouped["total"] | |
| ret["top{}".format(self.topk)] = np.around( | |
| (y_pred.T == np.tile(y_true, (self.topk, 1))).sum() * 100 / len(y_true), | |
| decimals=2, | |
| ) | |
| return ret | |
| def eval_task(self, data=None, save_conf=False, group = 10, mode = "train"): | |
| if data is None: | |
| data = self.test_loader | |
| y_pred, y_true = self._eval_cnn(data, mode = mode) | |
| cnn_accy = self._evaluate(y_pred, y_true, group = group) | |
| if hasattr(self, "_class_means"): | |
| y_pred, y_true = self._eval_nme(data, self._class_means) | |
| nme_accy = self._evaluate(y_pred, y_true) | |
| else: | |
| nme_accy = None | |
| if save_conf: | |
| _pred = y_pred.T[0] | |
| _pred_path = os.path.join(self.args['logfilename'], "pred.npy") | |
| _target_path = os.path.join(self.args['logfilename'], "target.npy") | |
| np.save(_pred_path, _pred) | |
| np.save(_target_path, y_true) | |
| _save_dir = os.path.join(f"./results/{self.args['model_name']}/conf_matrix/{self.args['prefix']}") | |
| os.makedirs(_save_dir, exist_ok=True) | |
| _save_path = os.path.join(_save_dir, f"{self.args['csv_name']}.csv") | |
| with open(_save_path, "a+") as f: | |
| f.write(f"{self.args['model_name']},{_pred_path},{_target_path} \n") | |
| return cnn_accy, nme_accy | |
| def incremental_train(self): | |
| pass | |
| def _train(self): | |
| pass | |
| def _get_memory(self): | |
| if len(self._data_memory) == 0: | |
| return None | |
| else: | |
| return (self._data_memory, self._targets_memory) | |
| def _compute_accuracy(self, model, loader): | |
| model.eval() | |
| correct, total = 0, 0 | |
| for i, (_, inputs, targets) in enumerate(loader): | |
| inputs = inputs.to(self._device) | |
| with torch.no_grad(): | |
| outputs = model(inputs)["logits"] | |
| predicts = torch.max(outputs, dim=1)[1] | |
| correct += (predicts.cpu() == targets).sum() | |
| total += len(targets) | |
| return np.around(tensor2numpy(correct) * 100 / total, decimals=2) | |
| def _eval_cnn(self, loader, mode = "train"): | |
| self._network.eval() | |
| y_pred, y_true = [], [] | |
| for _, (_, inputs, targets) in enumerate(loader): | |
| inputs = inputs.to(self._device) | |
| with torch.no_grad(): | |
| outputs = self._network(inputs)["logits"] | |
| if self.topk > self._total_classes: | |
| self.topk = self._total_classes | |
| predicts = torch.topk( | |
| outputs, k=self.topk, dim=1, largest=True, sorted=True | |
| )[ | |
| 1 | |
| ] # [bs, topk] | |
| refine_predicts = predicts.cpu().numpy() | |
| if mode == "test": | |
| refine_predicts = self.class_list[refine_predicts] | |
| y_pred.append(refine_predicts) | |
| y_true.append(targets.cpu().numpy()) | |
| return np.concatenate(y_pred), np.concatenate(y_true) # [N, topk] | |
| def inference(self, image): | |
| self._network.eval() | |
| self._network.to(self._device) | |
| image = image.to(self._device, dtype=torch.float32) | |
| with torch.no_grad(): | |
| output = self._network(image)["logits"] | |
| if self.topk > self._total_classes: | |
| self.topk = self._total_classes | |
| predict = torch.topk( | |
| output, k=self.topk, dim=1, largest=True, sorted=True | |
| )[1] | |
| confidents = softmax(output.cpu().numpy()) | |
| if self.class_list is not None: | |
| self.class_list = np.array(self.class_list) | |
| predicts = predict.cpu().numpy() | |
| result = self.class_list[predicts].tolist() | |
| #result = predicts.tolist() | |
| result.append([self.label_list[item] for item in result[0]]) | |
| result.append(confidents[0][predicts][0].tolist()) | |
| return result | |
| elif self.data_manager is not None: | |
| return self.data_manager.class_list[predict.cpu().numpy()] | |
| predicts.append([self.label_list[index] for index in predicts[0]]) | |
| return predicts | |
| def _eval_nme(self, loader, class_means): | |
| self._network.eval() | |
| vectors, y_true = self._extract_vectors(loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| dists = cdist(class_means, vectors, "sqeuclidean") # [nb_classes, N] | |
| scores = dists.T # [N, nb_classes], choose the one with the smallest distance | |
| return np.argsort(scores, axis=1)[:, : self.topk], y_true # [N, topk] | |
| def _extract_vectors(self, loader): | |
| self._network.eval() | |
| vectors, targets = [], [] | |
| for _, _inputs, _targets in loader: | |
| _targets = _targets.numpy() | |
| if isinstance(self._network, nn.DataParallel): | |
| _vectors = tensor2numpy( | |
| self._network.module.extract_vector(_inputs.to(self._device)) | |
| ) | |
| else: | |
| _vectors = tensor2numpy( | |
| self._network.extract_vector(_inputs.to(self._device)) | |
| ) | |
| vectors.append(_vectors) | |
| targets.append(_targets) | |
| return np.concatenate(vectors), np.concatenate(targets) | |
| def _reduce_exemplar(self, data_manager, m): | |
| logging.info("Reducing exemplars...({} per classes)".format(m)) | |
| dummy_data, dummy_targets = copy.deepcopy(self._data_memory), copy.deepcopy( | |
| self._targets_memory | |
| ) | |
| self._class_means = np.zeros((self._total_classes, self.feature_dim)) | |
| self._data_memory, self._targets_memory = np.array([]), np.array([]) | |
| for class_idx in range(self._known_classes): | |
| mask = np.where(dummy_targets == class_idx)[0] | |
| dd, dt = dummy_data[mask][:m], dummy_targets[mask][:m] | |
| self._data_memory = ( | |
| np.concatenate((self._data_memory, dd)) | |
| if len(self._data_memory) != 0 | |
| else dd | |
| ) | |
| self._targets_memory = ( | |
| np.concatenate((self._targets_memory, dt)) | |
| if len(self._targets_memory) != 0 | |
| else dt | |
| ) | |
| # Exemplar mean | |
| idx_dataset = data_manager.get_dataset( | |
| [], source="train", mode="test", appendent=(dd, dt) | |
| ) | |
| idx_loader = DataLoader( | |
| idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4 | |
| ) | |
| vectors, _ = self._extract_vectors(idx_loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| mean = np.mean(vectors, axis=0) | |
| mean = mean / np.linalg.norm(mean) | |
| self._class_means[class_idx, :] = mean | |
| def _construct_exemplar(self, data_manager, m): | |
| logging.info("Constructing exemplars...({} per classes)".format(m)) | |
| for class_idx in range(self._known_classes, self._total_classes): | |
| data, targets, idx_dataset = data_manager.get_dataset( | |
| np.arange(class_idx, class_idx + 1), | |
| source="train", | |
| mode="test", | |
| ret_data=True, | |
| ) | |
| idx_loader = DataLoader( | |
| idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4 | |
| ) | |
| vectors, _ = self._extract_vectors(idx_loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| class_mean = np.mean(vectors, axis=0) | |
| # Select | |
| selected_exemplars = [] | |
| exemplar_vectors = [] # [n, feature_dim] | |
| for k in range(1, m + 1): | |
| S = np.sum( | |
| exemplar_vectors, axis=0 | |
| ) # [feature_dim] sum of selected exemplars vectors | |
| mu_p = (vectors + S) / k # [n, feature_dim] sum to all vectors | |
| i = np.argmin(np.sqrt(np.sum((class_mean - mu_p) ** 2, axis=1))) | |
| selected_exemplars.append( | |
| np.array(data[i]) | |
| ) # New object to avoid passing by inference | |
| exemplar_vectors.append( | |
| np.array(vectors[i]) | |
| ) # New object to avoid passing by inference | |
| vectors = np.delete( | |
| vectors, i, axis=0 | |
| ) # Remove it to avoid duplicative selection | |
| data = np.delete( | |
| data, i, axis=0 | |
| ) # Remove it to avoid duplicative selection | |
| # uniques = np.unique(selected_exemplars, axis=0) | |
| # print('Unique elements: {}'.format(len(uniques))) | |
| selected_exemplars = np.array(selected_exemplars) | |
| exemplar_targets = np.full(m, class_idx) | |
| self._data_memory = ( | |
| np.concatenate((self._data_memory, selected_exemplars)) | |
| if len(self._data_memory) != 0 | |
| else selected_exemplars | |
| ) | |
| self._targets_memory = ( | |
| np.concatenate((self._targets_memory, exemplar_targets)) | |
| if len(self._targets_memory) != 0 | |
| else exemplar_targets | |
| ) | |
| # Exemplar mean | |
| idx_dataset = data_manager.get_dataset( | |
| [], | |
| source="train", | |
| mode="test", | |
| appendent=(selected_exemplars, exemplar_targets), | |
| ) | |
| idx_loader = DataLoader( | |
| idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4 | |
| ) | |
| vectors, _ = self._extract_vectors(idx_loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| mean = np.mean(vectors, axis=0) | |
| mean = mean / np.linalg.norm(mean) | |
| self._class_means[class_idx, :] = mean | |
| def _construct_exemplar_unified(self, data_manager, m): | |
| logging.info( | |
| "Constructing exemplars for new classes...({} per classes)".format(m) | |
| ) | |
| _class_means = np.zeros((self._total_classes, self.feature_dim)) | |
| # Calculate the means of old classes with newly trained network | |
| for class_idx in range(self._known_classes): | |
| mask = np.where(self._targets_memory == class_idx)[0] | |
| class_data, class_targets = ( | |
| self._data_memory[mask], | |
| self._targets_memory[mask], | |
| ) | |
| class_dset = data_manager.get_dataset( | |
| [], source="train", mode="test", appendent=(class_data, class_targets) | |
| ) | |
| class_loader = DataLoader( | |
| class_dset, batch_size=batch_size, shuffle=False, num_workers=4 | |
| ) | |
| vectors, _ = self._extract_vectors(class_loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| mean = np.mean(vectors, axis=0) | |
| mean = mean / np.linalg.norm(mean) | |
| _class_means[class_idx, :] = mean | |
| # Construct exemplars for new classes and calculate the means | |
| for class_idx in range(self._known_classes, self._total_classes): | |
| data, targets, class_dset = data_manager.get_dataset( | |
| np.arange(class_idx, class_idx + 1), | |
| source="train", | |
| mode="test", | |
| ret_data=True, | |
| ) | |
| class_loader = DataLoader( | |
| class_dset, batch_size=batch_size, shuffle=False, num_workers=4 | |
| ) | |
| vectors, _ = self._extract_vectors(class_loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| class_mean = np.mean(vectors, axis=0) | |
| # Select | |
| selected_exemplars = [] | |
| exemplar_vectors = [] | |
| for k in range(1, m + 1): | |
| S = np.sum( | |
| exemplar_vectors, axis=0 | |
| ) # [feature_dim] sum of selected exemplars vectors | |
| mu_p = (vectors + S) / k # [n, feature_dim] sum to all vectors | |
| i = np.argmin(np.sqrt(np.sum((class_mean - mu_p) ** 2, axis=1))) | |
| selected_exemplars.append( | |
| np.array(data[i]) | |
| ) # New object to avoid passing by inference | |
| exemplar_vectors.append( | |
| np.array(vectors[i]) | |
| ) # New object to avoid passing by inference | |
| vectors = np.delete( | |
| vectors, i, axis=0 | |
| ) # Remove it to avoid duplicative selection | |
| data = np.delete( | |
| data, i, axis=0 | |
| ) # Remove it to avoid duplicative selection | |
| selected_exemplars = np.array(selected_exemplars) | |
| exemplar_targets = np.full(m, class_idx) | |
| self._data_memory = ( | |
| np.concatenate((self._data_memory, selected_exemplars)) | |
| if len(self._data_memory) != 0 | |
| else selected_exemplars | |
| ) | |
| self._targets_memory = ( | |
| np.concatenate((self._targets_memory, exemplar_targets)) | |
| if len(self._targets_memory) != 0 | |
| else exemplar_targets | |
| ) | |
| # Exemplar mean | |
| exemplar_dset = data_manager.get_dataset( | |
| [], | |
| source="train", | |
| mode="test", | |
| appendent=(selected_exemplars, exemplar_targets), | |
| ) | |
| exemplar_loader = DataLoader( | |
| exemplar_dset, batch_size=batch_size, shuffle=False, num_workers=4 | |
| ) | |
| vectors, _ = self._extract_vectors(exemplar_loader) | |
| vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
| mean = np.mean(vectors, axis=0) | |
| mean = mean / np.linalg.norm(mean) | |
| _class_means[class_idx, :] = mean | |
| self._class_means = _class_means | |
| def softmax(x): | |
| """Compute softmax values for each sets of scores in x.""" | |
| e_x = np.exp(x - np.max(x)) | |
| return e_x / (e_x.sum(axis=0) + 1e-7) # only difference | |