Source code for zoo.automl.model.tcmf_model

# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABCMeta, abstractmethod

from zoo.automl.model.tcmf import DeepGLO
from zoo.automl.common.metrics import Evaluator
from zoo.automl.model.abstract import BaseModel
from zoo.orca.data import SparkXShards, XShards
import pickle
import numpy as np


[docs]class TCMF(BaseModel): """ MF regularized TCN + TCN. This version is not for automated searching yet. """ def __init__(self): """ Initialize hyper parameters :param check_optional_config: :param future_seq_len: """ # models self.model = None self.model_init = False
[docs] def set_params(self, **config): self.vbsize = config.get("vbsize", 128) self.hbsize = config.get("hbsize", 256) self.num_channels_X = config.get("num_channels_X", [32, 32, 32, 32, 32, 1]) self.num_channels_Y = config.get("num_channels_Y", [16, 16, 16, 16, 16, 1]) self.kernel_size = config.get("kernel_size", 7) self.dropout = config.get("dropout", 0.1) self.rank = config.get("rank", 64) self.kernel_size_Y = config.get("kernel_size_Y", 7) self.lr = config.get("learning_rate", 0.0005) self.val_len = config.get("val_len", 24) self.normalize = config.get("normalize", False) self.start_date = config.get("start_date", "2020-4-1") self.freq = config.get("freq", "1H") self.covariates = config.get('covariates', None) self.use_time = config.get("use_time", True) self.dti = config.get("dti", None) self.svd = config.get("svd", True) self.period = config.get("period", 24) self.alt_iters = config.get("alt_iters", 10) self.y_iters = config.get("y_iters", 10) self.init_epoch = config.get("init_FX_epoch", 100) self.max_FX_epoch = config.get("max_FX_epoch", 300) self.max_TCN_epoch = config.get("max_TCN_epoch", 300)
def _build(self, **config): """ build the models and initialize. :param config: hyper parameters for building the model :return: """ self.set_params(**config) self.model = DeepGLO( vbsize=self.vbsize, hbsize=self.hbsize, num_channels_X=self.num_channels_X, num_channels_Y=self.num_channels_Y, kernel_size=self.kernel_size, dropout=self.dropout, rank=self.rank, kernel_size_Y=self.kernel_size_Y, lr=self.lr, val_len=self.val_len, normalize=self.normalize, start_date=self.start_date, freq=self.freq, covariates=self.covariates, use_time=self.use_time, dti=self.dti, svd=self.svd, period=self.period, forward_cov=False ) self.model_init = True
[docs] def fit_eval(self, x, y=None, verbose=0, num_workers=None, **config): """ Fit on the training data from scratch. Since the rolling process is very customized in this model, we enclose the rolling process inside this method. :param x: training data, an array in shape (nd, Td), nd is the number of series, Td is the time dimension :param y: None. target is extracted from x directly :param verbose: :param num_workers: number of workers to use. :return: the evaluation metric value """ if not self.model_init: self._build(**config) if num_workers is None: num_workers = TCMF.get_default_num_workers() val_loss = self.model.train_all_models(x, alt_iters=self.alt_iters, y_iters=self.y_iters, init_epochs=self.init_epoch, max_FX_epoch=self.max_FX_epoch, max_TCN_epoch=self.max_TCN_epoch, num_workers=num_workers, ) return val_loss
[docs] def fit_incremental(self, x): """ Incremental fitting given a pre-trained model. :param x: incremental data :param config: fitting parameters :return: """ # TODO incrementally train models pass
[docs] @staticmethod def get_default_num_workers(): from zoo.ray import RayContext try: ray_ctx = RayContext.get(initialize=False) num_workers = ray_ctx.num_ray_nodes except: num_workers = 1 return num_workers
[docs] def predict(self, x=None, horizon=24, mc=False, num_workers=None): """ Predict horizon time-points ahead the input x in fit_eval :param x: We don't support input x currently. :param horizon: horizon length to predict :param mc: :param num_workers: the number of workers to use. Note that there has to be an activate RayContext if num_workers > 1. :return: """ if x is not None: raise ValueError("We don't support input x directly.") if self.model is None: raise Exception("Needs to call fit_eval or restore first before calling predict") if num_workers is None: num_workers = TCMF.get_default_num_workers() if num_workers > 1: import ray from zoo.ray import RayContext try: RayContext.get(initialize=False) except: try: # detect whether ray has been started. ray.put(None) except: raise RuntimeError(f"There must be an activate ray context while running with " f"{num_workers} workers. You can either start and init a " f"RayContext by init_orca_context(..., init_ray_on_spark=" f"True) or start Ray with ray.init()") out = self.model.predict_horizon( future=horizon, bsize=90, normalize=False, num_workers=num_workers, ) return out[:, -horizon::]
[docs] def evaluate(self, x=None, y=None, metrics=None, num_workers=None): """ Evaluate on the prediction results and y. We predict horizon time-points ahead the input x in fit_eval before evaluation, where the horizon length equals the second dimension size of y. :param x: We don't support input x currently. :param y: target. We interpret the second dimension of y as the horizon length for evaluation. :param metrics: a list of metrics in string format :param num_workers: the number of workers to use in evaluate. It defaults to 1. :return: a list of metric evaluation results """ if x is not None: raise ValueError("We don't support input x directly.") if y is None: raise ValueError("Input invalid y of None") if self.model is None: raise Exception("Needs to call fit_eval or restore first before calling predict") if len(y.shape) == 1: y = np.expand_dims(y, axis=1) horizon = 1 else: horizon = y.shape[1] result = self.predict(x=None, horizon=horizon, num_workers=num_workers) if y.shape[1] == 1: multioutput = 'uniform_average' else: multioutput = 'raw_values' return [Evaluator.evaluate(m, y, result, multioutput=multioutput) for m in metrics]
[docs] def save(self, model_file): pickle.dump(self.model, open(model_file, "wb"))
[docs] def restore(self, model_file): with open(model_file, 'rb') as f: self.model = pickle.load(f) self.model_init = True
def _get_optional_parameters(self): return {} def _get_required_parameters(self): return {}
[docs]class ModelWrapper(metaclass=ABCMeta):
[docs] @abstractmethod def fit(self, **kwargs): pass
[docs] @abstractmethod def evaluate(self, **kwargs): pass
[docs] @abstractmethod def predict(self, **kwargs): pass
[docs] @abstractmethod def is_distributed(self, **kwargs): pass
[docs] @abstractmethod def save(self, **kwargs): pass
[docs] @abstractmethod def load(self, **kwargs): pass
[docs]class TCMFDistributedModelWrapper(ModelWrapper): def __init__(self, config): self.internal = None self.config = config
[docs] def fit(self, x, incremental=False, num_workers=None): if num_workers: raise ValueError("We don't support passing num_workers in fit " "with input of xShards of dict") def orca_train_model(d, config): tcmf = TCMF() tcmf._build(**config) id_arr, train_data = split_id_and_train_data(d, True) if incremental: tcmf.fit_incremental(train_data) else: tcmf.fit_eval(train_data) return [id_arr, tcmf] if isinstance(x, SparkXShards): if x._get_class_name() == "dict": self.internal = x.transform_shard(orca_train_model, self.config) else: raise ValueError("value of x should be an xShards of dict, " "but is an xShards of " + x._get_class_name()) else: raise ValueError("value of x should be an xShards of dict, " "but isn't an xShards")
[docs] def evaluate(self, x, y, metric=None, num_workers=None): """ Evaluate the model :param x: input :param y: target :param metric: :param num_workers: :return: a list of metric evaluation results """ raise NotImplementedError
[docs] def predict(self, x, horizon=24, num_workers=None): """ Prediction. :param x: input :param horizon: :param num_workers :return: result """ if num_workers and num_workers != 1: raise ValueError("We don't support passing num_workers in predict " "with input of xShards of dict") def orca_predict(data): id_arr = data[0] tcmf = data[1] predict_results = tcmf.predict(x=None, horizon=horizon) result = dict() result['id'] = id_arr result["prediction"] = predict_results return result return self.internal.transform_shard(orca_predict)
[docs] def is_distributed(self): return True
[docs] def save(self, model_path): """ save model to file. :param model_path: the model file path to be saved to. :return: """ if self.internal is not None: self.internal.save_pickle(model_path)
[docs] def load(self, model_path, minPartitions=None): """ restore model from model file and config. :param model_path: the model file :return: the restored model """ self.internal = XShards.load_pickle(model_path, minPartitions=minPartitions)
[docs]class TCMFLocalModelWrapper(ModelWrapper): def __init__(self, config): self.internal = TCMF() self.config = config self.internal._build(**self.config) self.id_arr = None
[docs] def fit(self, x, incremental=False, num_workers=None): if isinstance(x, dict): self.id_arr, train_data = split_id_and_train_data(x, False) if incremental: self.internal.fit_incremental(train_data) else: self.internal.fit_eval(train_data, num_workers=num_workers) else: raise ValueError("value of x should be a dict of ndarray")
[docs] def evaluate(self, x, y, metric=None, num_workers=None): """ Evaluate the model :param x: input :param y: target :param metric: :param num_workers: :return: a list of metric evaluation results """ if isinstance(y, dict): if 'y' in y: y = y['y'] if not isinstance(y, np.ndarray): raise ValueError("the value of y should be an ndarray") else: raise ValueError("key y doesn't exist in y") return self.internal.evaluate(y=y, x=x, metrics=metric, num_workers=num_workers) else: raise ValueError("value of y should be a dict of ndarray")
[docs] def predict(self, x, horizon=24, num_workers=None): """ Prediction. :param x: input :param horizon :param num_workers :return: result """ pred = self.internal.predict(x=x, horizon=horizon, num_workers=num_workers) result = dict() if self.id_arr is not None: result['id'] = self.id_arr result["prediction"] = pred return result
[docs] def is_distributed(self): return False
[docs] def save(self, model_path): """ save model to file. :param model_path: the model file path to be saved to. :return: """ with open(model_path + '/id.pkl', 'wb') as f: pickle.dump(self.id_arr, f) self.internal.save(model_path + "/model")
[docs] def load(self, model_path): """ restore model from model file and config. :param model_path: the model file :return: the restored model """ self.internal = TCMF() with open(model_path + '/id.pkl', 'rb') as f: self.id_arr = pickle.load(f) self.internal.restore(model_path + "/model")
[docs]def split_id_and_train_data(d, is_distributed): if 'y' in d: train_data = d['y'] if not isinstance(train_data, np.ndarray): raise ValueError("the value of y should be an ndarray") else: raise ValueError("key `y` doesn't exist in x") id_arr = None if 'id' in d: id_arr = d['id'] if len(id_arr) != train_data.shape[0]: raise ValueError("the length of the id array should be equal to the number of " "rows in the y") elif is_distributed: raise ValueError("key `id` doesn't exist in x") return id_arr, train_data