Source code for zoo.models.anomalydetection.anomaly_detector

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import warnings

from zoo.pipeline.api.keras.models import Sequential
from zoo.pipeline.api.keras.layers import *
from zoo.models.common import *
from zoo.common.utils import callZooFunc
from bigdl.util.common import Sample

if sys.version >= '3':
    long = int
    unicode = str


[docs]class AnomalyDetector(KerasZooModel): """ The anomaly detector model for sequence data based on LSTM. # Arguments feature_shape: The input shape of features, including unroll_length and feature_size. hidden_layers: Units of hidden layers of LSTM. dropouts: Fraction of the input units to drop out. Float between 0 and 1. """ def __init__(self, feature_shape, hidden_layers=[8, 32, 15], dropouts=[0.2, 0.2, 0.2], **kwargs): assert len(hidden_layers) == len(dropouts), \ "sizes of dropouts and hidden_layers should be equal" if 'bigdl_type' in kwargs: kwargs.pop('bigdl_type') self.bigdl_type = kwargs.get("bigdl_type") else: self.bigdl_type = "float" if kwargs: raise TypeError('Wrong arguments for AnomalyDetector: ' + str(kwargs)) self.feature_shape = feature_shape self.hidden_layers = hidden_layers self.dropouts = dropouts self.model = self.build_model() super(AnomalyDetector, self).__init__(None, self.bigdl_type, feature_shape, hidden_layers, dropouts, self.model)
[docs] def build_model(self): model = Sequential() model.add(InputLayer(input_shape=self.feature_shape)) \ .add(LSTM(input_shape=self.feature_shape, output_dim=self.hidden_layers[0], return_sequences=True)) for ilayer in range(1, len(self.hidden_layers) - 1): model.add(LSTM(output_dim=self.hidden_layers[ilayer], return_sequences=True)) \ .add(Dropout(self.dropouts[ilayer])) model.add(LSTM(self.hidden_layers[-1], return_sequences=False)) \ .add(Dropout(self.dropouts[-1])) model.add(Dense(output_dim=1)) return model
[docs] @staticmethod def load_model(path, weight_path=None, bigdl_type="float"): """ Load an existing AnomalyDetector model (with weights). # Arguments path: The path for the pre-defined model. Local file system, HDFS and Amazon S3 are supported. HDFS path should be like 'hdfs://[host]:[port]/xxx'. Amazon S3 path should be like 's3a://bucket/xxx'. weight_path: The path for pre-trained weights if any. Default is None. """ jmodel = callZooFunc(bigdl_type, "loadAnomalyDetector", path, weight_path) model = KerasZooModel._do_load(jmodel, bigdl_type) model.__class__ = AnomalyDetector return model
[docs] def predict(self, x, batch_per_thread=8): """ Precict on RDD[Sample]. """ results = callZooFunc(self.bigdl_type, "modelPredictRDD", self.value, x, batch_per_thread) return results.map(lambda data: data.to_ndarray())
[docs] @classmethod def unroll(cls, data_rdd, unroll_length, predict_step=1): """ Unroll a rdd of arrays to prepare features and labels. # Arguments data_rdd: RDD[Array]. data to be unrolled, it holds original time series features unroll_length: Int. the length of precious values to predict future value. predict_step: Int. How many time steps to predict future value, default is 1. return: an rdd of FeatureLableIndex a simple example data: (1,2,3,4,5,6); unrollLength: 2, predictStep: 1 features, label, index (1,2), 3, 0 (2,3), 4, 1 (3,4), 5, 2 (4,5), 6, 3 """ result = callZooFunc("float", "unroll", data_rdd, unroll_length, predict_step) return cls._to_indexed_rdd(result)
[docs] @classmethod def detect_anomalies(cls, ytruth, ypredict, anomaly_size): """ # Arguments :param ytruth: RDD of float or double values. Truth to be compared. :param ypredict: RDD of float or double values. Predictions. :param anomaly_size: Int. The size to be considered as anomalies. :return: RDD of [ytruth, ypredict, anomaly], anomaly is None or ytruth """ return callZooFunc("float", "detectAnomalies", ytruth, ypredict, anomaly_size)
[docs] @staticmethod def standardScale(df): return callZooFunc("float", "standardScaleDF", df)
[docs] @staticmethod def train_test_split(unrolled, test_size): cutPoint = unrolled.count() - test_size train = unrolled.filter(lambda x: x.index < cutPoint) \ .map(lambda x: Sample.from_ndarray(np.array(x.feature), np.array(x.label))) test = unrolled.filter(lambda x: x.index >= cutPoint) \ .map(lambda x: Sample.from_ndarray(np.array(x.feature), np.array(x.label))) return [train, test]
@staticmethod def _to_indexed_rdd(unrolled_rdd): def row_to_feature(feature_str): feature = [x.split("|") for x in feature_str.split(",")] matrix = [] for i in range(0, len(feature)): line = [] for j in range(0, len(feature[0])): line.append(float(feature[i][j])) matrix.append(line) return matrix return unrolled_rdd \ .map(lambda y: FeatureLableIndex(row_to_feature(y[0]), float(y[1]), long(y[2])))
[docs]class FeatureLableIndex(object): """ Each record should contain the following fields: feature: List[List[float]]. label: float. index: long. """ def __init__(self, feature, label, index, bigdl_type="float"): self.feature = feature self.label = label self.index = index self.bigdl_type = bigdl_type def __reduce__(self): return FeatureLableIndex, (self.feature, self.label, self.index) def __str__(self): return "FeatureLableIndex [feature: %s, label: %s, index: %s]" % ( self.feature, self.label, self.index)