| import pandas as pd | |
| import numpy as np | |
| NEW_LABEL_COLUMN_NAME = 'new_label' | |
| class IQRforOutbreakDetection: | |
| def __init__(self, window_size=7, stride=1, k=1.5): | |
| self.window_size = window_size | |
| self.stride = stride | |
| self.k = k | |
| def _iqr_rolling(self, timeseries): | |
| q1 = np.percentile(timeseries, 25) | |
| q3 = np.percentile(timeseries, 75) | |
| iqr = q3 - q1 | |
| ub = q3 + self.k * iqr | |
| lb = q1 - self.k * iqr | |
| return ub, lb | |
| def detect_anomalies(self, df, news_or_cases='news'): | |
| """" | |
| input methods: k | |
| """ | |
| if isinstance(df, pd.Series): | |
| timeseries = df | |
| else: | |
| timeseries = df[news_or_cases] | |
| tot_peaks, final_peaks, _ = self._windowed_iqr(timeseries) | |
| result_df = self._prepare_resulting_dataframe(final_peaks, timeseries) | |
| processed_df = self._postprocess_anomalies(result_df, news_or_cases) | |
| print(processed_df) | |
| return processed_df, NEW_LABEL_COLUMN_NAME | |
| def _windowed_iqr(self, df): | |
| tot_peaks = {} | |
| for i in range(0, len(df) - self.window_size + 1, self.stride): | |
| end_id = i + self.window_size | |
| window = df[i:end_id] | |
| ub, _ = self._iqr_rolling(window) | |
| for j in window.index: | |
| peaks_list = tot_peaks.setdefault(f'{j}', []) | |
| peaks_list.append(window.loc[j] > ub) | |
| final_peaks = {k: True if True in v else False | |
| for k, v in tot_peaks.items()} | |
| return tot_peaks, final_peaks, end_id | |
| def _prepare_resulting_dataframe(self, peaks_df, news_or_cases_df): | |
| final_df_iqr = pd.DataFrame.from_dict(peaks_df, orient='index') | |
| dff = pd.DataFrame(news_or_cases_df) | |
| dff['peaks'] = final_df_iqr.loc[:, 0].values | |
| dff['peaks'] = dff['peaks'].map({True: 1, False: 0}) | |
| return dff | |
| def _postprocess_anomalies(self, dataframe, col_name='news'): | |
| dataframe['derivative'] = dataframe[col_name].diff().fillna(0) | |
| dataframe['new_label'] = [0 if v.derivative < 0 and v.peaks == 1 else v.peaks | |
| for _, v in dataframe.iterrows()] | |
| return dataframe |