import pandas as pd import numpy as np NEW_LABEL_COLUMN_NAME = 'new_label' class IQRforOutbreakDetection: def __init__(self, window_size=7, stride=1, k=1.5): self.window_size = window_size self.stride = stride self.k = k def _iqr_rolling(self, timeseries): q1 = np.percentile(timeseries, 25) q3 = np.percentile(timeseries, 75) iqr = q3 - q1 ub = q3 + self.k * iqr lb = q1 - self.k * iqr return ub, lb def detect_anomalies(self, df, news_or_cases='news'): """" input methods: k """ if isinstance(df, pd.Series): timeseries = df else: timeseries = df[news_or_cases] tot_peaks, final_peaks, _ = self._windowed_iqr(timeseries) result_df = self._prepare_resulting_dataframe(final_peaks, timeseries) processed_df = self._postprocess_anomalies(result_df, news_or_cases) print(processed_df) return processed_df, NEW_LABEL_COLUMN_NAME def _windowed_iqr(self, df): tot_peaks = {} for i in range(0, len(df) - self.window_size + 1, self.stride): end_id = i + self.window_size window = df[i:end_id] ub, _ = self._iqr_rolling(window) for j in window.index: peaks_list = tot_peaks.setdefault(f'{j}', []) peaks_list.append(window.loc[j] > ub) final_peaks = {k: True if True in v else False for k, v in tot_peaks.items()} return tot_peaks, final_peaks, end_id def _prepare_resulting_dataframe(self, peaks_df, news_or_cases_df): final_df_iqr = pd.DataFrame.from_dict(peaks_df, orient='index') dff = pd.DataFrame(news_or_cases_df) dff['peaks'] = final_df_iqr.loc[:, 0].values dff['peaks'] = dff['peaks'].map({True: 1, False: 0}) return dff def _postprocess_anomalies(self, dataframe, col_name='news'): dataframe['derivative'] = dataframe[col_name].diff().fillna(0) dataframe['new_label'] = [0 if v.derivative < 0 and v.peaks == 1 else v.peaks for _, v in dataframe.iterrows()] return dataframe