| import pandas as pd | |
| import numpy as np | |
| from statsmodels.tsa.stattools import adfuller, acf, pacf | |
| from statsmodels.tsa.arima.model import ARIMA | |
| NEW_ANOMALY_COLUMN_NAME = 'anomaly' | |
| class ARIMAforOutbreakDetection: | |
| def __init__(self, window_size=7, stride=1, k=1.5, significance=0.05, max_lag=30): | |
| self.window_size = window_size | |
| self.stride = stride | |
| self.k = k | |
| self.significance = significance | |
| self.max_lag = max_lag | |
| def test_stationarity(self, ts_data, column=''): | |
| if isinstance(ts_data, pd.Series): | |
| adf_test = adfuller(ts_data, autolag='AIC') | |
| else: | |
| adf_test = adfuller(ts_data[column], autolag='AIC') | |
| return "Stationary" if adf_test[1] <= self.significance else "Non-Stationary" | |
| def make_stationary(self, dataframe, column): | |
| df_to_return = None | |
| result = self.test_stationarity(dataframe, column) | |
| if result == "Stationary": | |
| return dataframe | |
| diff_series = dataframe.copy() | |
| for diff_count in range(5): | |
| diff_series = diff_series.diff().fillna(0) | |
| if self.test_stationarity(diff_series, column) == "Stationary": | |
| return diff_series | |
| return diff_series | |
| def create_windows(self, df): | |
| windows, gts = [], [] | |
| for i in range(0, len(df) - self.window_size, self.stride): | |
| end_id = i + self.window_size | |
| windows.append(df.iloc[i:end_id, :]) | |
| gts.append(df.iloc[end_id, :]) | |
| return np.stack(windows), np.stack(gts) | |
| def find_p_q(self, series): | |
| N = len(series) | |
| acf_values, _ = acf(series, nlags=self.max_lag, alpha=self.significance, fft=False) | |
| pacf_values, _ = pacf(series, nlags=self.max_lag, alpha=self.significance) | |
| threshold = 1.96 / np.sqrt(N) | |
| def find_last_consecutive_outlier(values): | |
| for i in range(1, len(values)): | |
| if values[i] < 0 or (values[i] > 0 and abs(values[i]) < threshold): | |
| return i | |
| return len(values) - 1 | |
| return find_last_consecutive_outlier(pacf_values), find_last_consecutive_outlier(acf_values) | |
| def detect_anomalies(self, dataset, news_or_cases='news'): | |
| stationary_data = self.make_stationary(dataset, news_or_cases) | |
| p, q = self.find_p_q(stationary_data[news_or_cases]) | |
| anomalies, means, stdevs, residuals, predictions, gts = self._train_arima_model(stationary_data, p, q) | |
| result_df = self._prepare_resulting_dataframe( | |
| residuals, means, stdevs, dataset.iloc[self.window_size:], | |
| anomalies, gts, predictions | |
| ) | |
| return self._postprocess_anomalies(result_df, news_or_cases), NEW_ANOMALY_COLUMN_NAME | |
| def _train_arima_model(self, dataset, p, q): | |
| predictions, residuals, means, stdevs, anomalies = [], [], [], [], [] | |
| windows, gts = self.create_windows(dataset) | |
| for window, gt in zip(windows, gts): | |
| model = ARIMA(window, order=(p, 0, q)) | |
| model.initialize_approximate_diffuse() | |
| fit = model.fit() | |
| pred = fit.forecast(steps=1)[0] | |
| residual = np.abs(gt - pred) | |
| mu, std = np.mean(fit.resid), np.std(fit.resid) | |
| anomalies.append( | |
| 1 if residual > mu + self.k * std or residual < mu - self.k * std else 0 | |
| ) | |
| means.append(mu) | |
| stdevs.append(std) | |
| residuals.append(residual) | |
| predictions.append(pred) | |
| return anomalies, means, stdevs, residuals, predictions, gts | |
| def _prepare_resulting_dataframe(self, residuals, means, stdevs, original_dataset, | |
| anomalies, gts, predictions): | |
| result_df = original_dataset.copy() | |
| result_df['residuals'] = residuals | |
| result_df['mu'] = means | |
| result_df['sigma'] = stdevs | |
| result_df['anomaly'] = anomalies | |
| result_df['gts_diff'] = gts | |
| result_df['pred_diff'] = predictions | |
| return result_df | |
| def _postprocess_anomalies(self, dataframe, col_name='news'): | |
| dataframe['derivative'] = dataframe[col_name].diff().fillna(0) | |
| dataframe['new_anomaly'] = [ | |
| 0 if row.derivative < 0 and row.anomaly == 1 else row.anomaly | |
| for _, row in dataframe.iterrows() | |
| ] | |
| return dataframe |