Source code for zoo.zouwu.model.anomaly

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math
import numpy as np
import pandas as pd

from abc import ABCMeta, abstractmethod


[docs]class Distance(metaclass=ABCMeta): """ The Base Distance Class. """
[docs] @abstractmethod def distance(self, x, y): """ Calculate the distance between x and y. a and b should be in same shape. :param x: the first tensor :param y: the second tensor :return: the absolute distance between x and y """ pass
[docs]class EuclideanDistance(Distance): """ Euclidean Distance Measure """ def __init__(self): pass
[docs] def distance(self, x, y): return np.linalg.norm(x - y)
[docs]class ThresholdEstimator: """ An estimator to find the proper threshold. """
[docs] def fit(self, y, yhat, mode="default", ratio=0.01, dist_measure=EuclideanDistance() ): """ fit the y and yhat and find the proper threshold :param y: actual values :param yhat: predicted values :param mode: types of ways to find threshold "default" : fit data to a uniform distribution (the percentile way) "gaussian": fit data to a gaussian distribution *TBD :param ratio: the ratio of anomaly to consider as anomaly. :return: the threshold """ assert y.shape == yhat.shape diff = [dist_measure.distance(m, n) for m, n in zip(y, yhat)] if mode == "default": threshold = np.percentile(diff, (1-ratio)*100) return threshold elif mode == "gaussian": from scipy.stats import norm mu, sigma = norm.fit(diff) t = norm.ppf(1-ratio) return t*sigma+mu else: raise Exception("Does not support", mode)
[docs]class DetectorBase(metaclass=ABCMeta): """ Base class for detector """
[docs] @abstractmethod def detect(self, y, **kwargs): """ Detect anomalies in dataset :param y: the dataset :return: the anomaly indexes in y """ pass
[docs]class ThresholdDetector(DetectorBase): """ Anomaly detector """ def __init__(self): pass
[docs] def detect(self, y, yhat=None, threshold=math.inf, dist_measure=EuclideanDistance()): """ Detect anomalies. Each sample can have 1 or more dimensions. :param dist_measure: :param y: the values to detect. shape could be 1-D (num_samples,) or 2-D array (num_samples, features) :param yhat: the estimated values, a tensor with same shape as y, could be None when threshold is a tuple :param threshold: threshold, could be 1. a single value - absolute distance threshold, same for all samples 2. a 1-D array in shape (num_samples,) - per sample absolute distance threshold 3. a tensor in same shape as y and yhat - per dimension absolute distance threshold 4. a tuple (min, max) min and max tensors, same shape as y, yhat is ignored in this case :return: the anomaly values indexes in the samples, i.e. num_samples dimension. """ self.threshold = threshold self.dist_measure = dist_measure if isinstance(threshold, int) or \ isinstance(threshold, float): return self._check_all_distance(y, yhat) elif isinstance(threshold, np.ndarray): if len(threshold.shape) == 2: self._check_per_dim_distance(y, yhat, threshold) elif len(threshold.shape) == 1: self._check_per_sample_distance(y, yhat, threshold) else: raise ValueError( "threshold shape", str( threshold.shape), "is not valid") elif isinstance(threshold, tuple) \ and len(threshold) == 2 \ and threshold[0].shape == y.shape \ and threshold[-1].shape == y.shape: return self._check_range(y, threshold) else: raise ValueError( "threshold shape", str(threshold), "is not valid")
def _check_all_distance(self, y, yhat): index = [] for i in range(y.shape[0]): diff = self.dist_measure.distance(y[i], yhat[i]) if diff >= self.threshold: index.append(i) return index def _check_per_dim_distance(self, y, yhat, threshold): raise NotImplementedError("Does not support check per dim distance") def _check_per_sample_distance(self, y, yhat, threshold): raise NotImplementedError("Does not support check per sample distance") def _check_range(self, y, threshold): min_diff = y - threshold[0] anomaly_index = set(np.where(min_diff < 0)[0]) max_diff = y - threshold[1] anomaly_index.update(np.where(max_diff > 0)[0]) return list(anomaly_index)