Spaces:
Runtime error
Runtime error
| """ | |
| The interface to load log datasets. The datasets currently supported include | |
| HDFS and BGL. | |
| Authors: | |
| LogPAI Team | |
| """ | |
| import pandas as pd | |
| import os | |
| import numpy as np | |
| import re | |
| from sklearn.utils import shuffle | |
| from collections import OrderedDict | |
| def _split_data(x_data, y_data=None, train_ratio=0, split_type='uniform'): | |
| if split_type == 'uniform' and y_data is not None: | |
| pos_idx = y_data > 0 | |
| x_pos = x_data[pos_idx] | |
| y_pos = y_data[pos_idx] | |
| x_neg = x_data[~pos_idx] | |
| y_neg = y_data[~pos_idx] | |
| train_pos = int(train_ratio * x_pos.shape[0]) | |
| train_neg = int(train_ratio * x_neg.shape[0]) | |
| x_train = np.hstack([x_pos[0:train_pos], x_neg[0:train_neg]]) | |
| y_train = np.hstack([y_pos[0:train_pos], y_neg[0:train_neg]]) | |
| x_test = np.hstack([x_pos[train_pos:], x_neg[train_neg:]]) | |
| y_test = np.hstack([y_pos[train_pos:], y_neg[train_neg:]]) | |
| elif split_type == 'sequential': | |
| num_train = int(train_ratio * x_data.shape[0]) | |
| x_train = x_data[0:num_train] | |
| x_test = x_data[num_train:] | |
| if y_data is None: | |
| y_train = None | |
| y_test = None | |
| else: | |
| y_train = y_data[0:num_train] | |
| y_test = y_data[num_train:] | |
| # Random shuffle | |
| indexes = shuffle(np.arange(x_train.shape[0])) | |
| x_train = x_train[indexes] | |
| if y_train is not None: | |
| y_train = y_train[indexes] | |
| return (x_train, y_train), (x_test, y_test) | |
| def load_HDFS(log_file, label_file=None, window='session', train_ratio=0.5, split_type='sequential', save_csv=False, window_size=0): | |
| """ Load HDFS structured log into train and test data | |
| Arguments | |
| --------- | |
| log_file: str, the file path of structured log. | |
| label_file: str, the file path of anomaly labels, None for unlabeled data | |
| window: str, the window options including `session` (default). | |
| train_ratio: float, the ratio of training data for train/test split. | |
| split_type: `uniform` or `sequential`, which determines how to split dataset. `uniform` means | |
| to split positive samples and negative samples equally when setting label_file. `sequential` | |
| means to split the data sequentially without label_file. That is, the first part is for training, | |
| while the second part is for testing. | |
| Returns | |
| ------- | |
| (x_train, y_train): the training data | |
| (x_test, y_test): the testing data | |
| """ | |
| print('====== Input data summary ======') | |
| if log_file.endswith('.npz'): | |
| # Split training and validation set in a class-uniform way | |
| data = np.load(log_file) | |
| x_data = data['x_data'] | |
| y_data = data['y_data'] | |
| (x_train, y_train), (x_test, y_test) = _split_data(x_data, y_data, train_ratio, split_type) | |
| elif log_file.endswith('.csv'): | |
| assert window == 'session', "Only window=session is supported for HDFS dataset." | |
| print("Loading", log_file) | |
| struct_log = pd.read_csv(log_file, engine='c', | |
| na_filter=False, memory_map=True) | |
| data_dict = OrderedDict() | |
| for idx, row in struct_log.iterrows(): | |
| blkId_list = re.findall(r'(blk_-?\d+)', row['Content']) | |
| blkId_set = set(blkId_list) | |
| for blk_Id in blkId_set: | |
| if not blk_Id in data_dict: | |
| data_dict[blk_Id] = [] | |
| data_dict[blk_Id].append(row['EventId']) | |
| data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence']) | |
| if label_file: | |
| # Split training and validation set in a class-uniform way | |
| label_data = pd.read_csv(label_file, engine='c', na_filter=False, memory_map=True) | |
| label_data = label_data.set_index('BlockId') | |
| label_dict = label_data['Label'].to_dict() | |
| data_df['Label'] = data_df['BlockId'].apply(lambda x: 1 if label_dict[x] == 'Anomaly' else 0) | |
| # Split train and test data | |
| (x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values, | |
| data_df['Label'].values, train_ratio, split_type) | |
| print(y_train.sum(), y_test.sum()) | |
| if save_csv: | |
| data_df.to_csv('data_instances.csv', index=False) | |
| if window_size > 0: | |
| x_train, window_y_train, y_train = slice_hdfs(x_train, y_train, window_size) | |
| x_test, window_y_test, y_test = slice_hdfs(x_test, y_test, window_size) | |
| log = "{} {} windows ({}/{} anomaly), {}/{} normal" | |
| print(log.format("Train:", x_train.shape[0], y_train.sum(), y_train.shape[0], (1-y_train).sum(), y_train.shape[0])) | |
| print(log.format("Test:", x_test.shape[0], y_test.sum(), y_test.shape[0], (1-y_test).sum(), y_test.shape[0])) | |
| return (x_train, window_y_train, y_train), (x_test, window_y_test, y_test) | |
| if label_file is None: | |
| if split_type == 'uniform': | |
| split_type = 'sequential' | |
| print('Warning: Only split_type=sequential is supported \ | |
| if label_file=None.'.format(split_type)) | |
| # Split training and validation set sequentially | |
| x_data = data_df['EventSequence'].values | |
| (x_train, _), (x_test, _) = _split_data(x_data, train_ratio=train_ratio, split_type=split_type) | |
| print('Total: {} instances, train: {} instances, test: {} instances'.format( | |
| x_data.shape[0], x_train.shape[0], x_test.shape[0])) | |
| return (x_train, None), (x_test, None), data_df | |
| else: | |
| raise NotImplementedError('load_HDFS() only support csv and npz files!') | |
| num_train = x_train.shape[0] | |
| num_test = x_test.shape[0] | |
| num_total = num_train + num_test | |
| num_train_pos = sum(y_train) | |
| num_test_pos = sum(y_test) | |
| num_pos = num_train_pos + num_test_pos | |
| print('Total: {} instances, {} anomaly, {} normal' \ | |
| .format(num_total, num_pos, num_total - num_pos)) | |
| print('Train: {} instances, {} anomaly, {} normal' \ | |
| .format(num_train, num_train_pos, num_train - num_train_pos)) | |
| print('Test: {} instances, {} anomaly, {} normal\n' \ | |
| .format(num_test, num_test_pos, num_test - num_test_pos)) | |
| return (x_train, y_train), (x_test, y_test) | |
| def slice_hdfs(x, y, window_size): | |
| results_data = [] | |
| print("Slicing {} sessions, with window {}".format(x.shape[0], window_size)) | |
| for idx, sequence in enumerate(x): | |
| seqlen = len(sequence) | |
| i = 0 | |
| while (i + window_size) < seqlen: | |
| slice = sequence[i: i + window_size] | |
| results_data.append([idx, slice, sequence[i + window_size], y[idx]]) | |
| i += 1 | |
| else: | |
| slice = sequence[i: i + window_size] | |
| slice += ["#Pad"] * (window_size - len(slice)) | |
| results_data.append([idx, slice, "#Pad", y[idx]]) | |
| results_df = pd.DataFrame(results_data, columns=["SessionId", "EventSequence", "Label", "SessionLabel"]) | |
| print("Slicing done, {} windows generated".format(results_df.shape[0])) | |
| return results_df[["SessionId", "EventSequence"]], results_df["Label"], results_df["SessionLabel"] | |
| def load_BGL(log_file, label_file=None, window='sliding', time_interval=60, stepping_size=60, | |
| train_ratio=0.8): | |
| """ TODO | |
| """ | |
| def bgl_preprocess_data(para, raw_data, event_mapping_data): | |
| """ split logs into sliding windows, built an event count matrix and get the corresponding label | |
| Args: | |
| -------- | |
| para: the parameters dictionary | |
| raw_data: list of (label, time) | |
| event_mapping_data: a list of event index, where each row index indicates a corresponding log | |
| Returns: | |
| -------- | |
| event_count_matrix: event count matrix, where each row is an instance (log sequence vector) | |
| labels: a list of labels, 1 represents anomaly | |
| """ | |
| # create the directory for saving the sliding windows (start_index, end_index), which can be directly loaded in future running | |
| if not os.path.exists(para['save_path']): | |
| os.mkdir(para['save_path']) | |
| log_size = raw_data.shape[0] | |
| sliding_file_path = para['save_path']+'sliding_'+str(para['window_size'])+'h_'+str(para['step_size'])+'h.csv' | |
| #=============divide into sliding windows=========# | |
| start_end_index_list = [] # list of tuples, tuple contains two number, which represent the start and end of sliding time window | |
| label_data, time_data = raw_data[:,0], raw_data[:, 1] | |
| if not os.path.exists(sliding_file_path): | |
| # split into sliding window | |
| start_time = time_data[0] | |
| start_index = 0 | |
| end_index = 0 | |
| # get the first start, end index, end time | |
| for cur_time in time_data: | |
| if cur_time < start_time + para['window_size']*3600: | |
| end_index += 1 | |
| end_time = cur_time | |
| else: | |
| start_end_pair=tuple((start_index,end_index)) | |
| start_end_index_list.append(start_end_pair) | |
| break | |
| # move the start and end index until next sliding window | |
| while end_index < log_size: | |
| start_time = start_time + para['step_size']*3600 | |
| end_time = end_time + para['step_size']*3600 | |
| for i in range(start_index,end_index): | |
| if time_data[i] < start_time: | |
| i+=1 | |
| else: | |
| break | |
| for j in range(end_index, log_size): | |
| if time_data[j] < end_time: | |
| j+=1 | |
| else: | |
| break | |
| start_index = i | |
| end_index = j | |
| start_end_pair = tuple((start_index, end_index)) | |
| start_end_index_list.append(start_end_pair) | |
| inst_number = len(start_end_index_list) | |
| print('there are %d instances (sliding windows) in this dataset\n'%inst_number) | |
| np.savetxt(sliding_file_path,start_end_index_list,delimiter=',',fmt='%d') | |
| else: | |
| print('Loading start_end_index_list from file') | |
| start_end_index_list = pd.read_csv(sliding_file_path, header=None).values | |
| inst_number = len(start_end_index_list) | |
| print('there are %d instances (sliding windows) in this dataset' % inst_number) | |
| # get all the log indexes in each time window by ranging from start_index to end_index | |
| expanded_indexes_list=[] | |
| for t in range(inst_number): | |
| index_list = [] | |
| expanded_indexes_list.append(index_list) | |
| for i in range(inst_number): | |
| start_index = start_end_index_list[i][0] | |
| end_index = start_end_index_list[i][1] | |
| for l in range(start_index, end_index): | |
| expanded_indexes_list[i].append(l) | |
| event_mapping_data = [row[0] for row in event_mapping_data] | |
| event_num = len(list(set(event_mapping_data))) | |
| print('There are %d log events'%event_num) | |
| #=============get labels and event count of each sliding window =========# | |
| labels = [] | |
| event_count_matrix = np.zeros((inst_number,event_num)) | |
| for j in range(inst_number): | |
| label = 0 #0 represent success, 1 represent failure | |
| for k in expanded_indexes_list[j]: | |
| event_index = event_mapping_data[k] | |
| event_count_matrix[j, event_index] += 1 | |
| if label_data[k]: | |
| label = 1 | |
| continue | |
| labels.append(label) | |
| assert inst_number == len(labels) | |
| print("Among all instances, %d are anomalies"%sum(labels)) | |
| assert event_count_matrix.shape[0] == len(labels) | |
| return event_count_matrix, labels | |