Source code for zoo.automl.model.VanillaLSTM_pytorch

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from zoo.automl.model.abstract import BaseModel
from zoo.automl.common.util import *
from zoo.automl.common.metrics import Evaluator


[docs]class LSTMModel(nn.Module): def __init__(self, input_dim, hidden_dim, layer_num, dropout, output_dim): super(LSTMModel, self).__init__() self.hidden_dim = hidden_dim self.layer_num = layer_num self.lstm = nn.LSTM(input_dim, hidden_dim, layer_num, dropout=dropout, batch_first=True) self.fc = nn.Linear(hidden_dim, output_dim) for name, param in self.lstm.named_parameters(): if 'bias' in name: nn.init.constant_(param, 0.0) elif 'weight_ih' in name: nn.init.xavier_normal_(param) elif 'weight_hh' in name: nn.init.orthogonal_(param)
[docs] def forward(self, input_seq): # init hidden # input_seq: (batch_size, seq_len, feature_num) # h0 = torch.zeros(self.layer_num, input_seq.size[0], self.hidden_dim) # c0 = torch.zeros(self.layer_num, input_seq.size[0], self.hidden_dim) lstm_out, hidden = self.lstm(input_seq) # lstm_out: (batch_size, hidden_dim # reshaping the outputs to feed in fully connected layer # out = lstm_out[-1].contiguous().view(-1, self.hidden_dim) # out = self.linear(out, len(input_seq), -1) out = self.fc(lstm_out[:, -1, :]) return out
[docs]class VanillaLSTMPytorch(BaseModel): def __init__(self, check_optional_config=True, future_seq_len=1): """ Constructor of Vanilla LSTM model """ self.model = None self.check_optional_config = check_optional_config self.future_seq_len = future_seq_len self.feature_num = None self.output_dim = 1 self.metric = None self.criterion = None self.optimizer = None def _get_configs(self, config): super()._check_config(**config) self.metric = config.get('metric', 'mean_squared_error') self.batch_size = config.get('batch_size', 1024) self.hidden_dim = config.get('hidden_dim', 32) self.layer_num = config.get('layer_num', 2) self.dropout = config.get('dropout', 0.2) self.lr = config.get("lr", 0.001) def _load_data(self, input_data, batch_size): x, y = input_data data = TensorDataset(torch.from_numpy(x), torch.from_numpy(y)) data_loader = DataLoader(data, shuffle=True, batch_size=batch_size) return data_loader def _train_one_epoch(self, train_loader): self.model.train() train_losses = [] for input_seqs, target_seqs in train_loader: self.model.zero_grad() outputs = self.model(input_seqs) loss = self.criterion(outputs, target_seqs) # get gradients loss.backward() # update parameters self.optimizer.step() train_losses.append(loss.item()) return np.mean(train_losses) def _val_one_epoch(self, val_loader): self.model.eval() val_losses = [] for val_input, val_target in val_loader: val_out = self.model(val_input) val_loss = self.criterion(val_out, val_target) val_losses.append(val_loss.item()) return np.mean(val_losses) def _test_one_epoch(self, test_loader, mc=False): if not mc: self.model.eval() else: self.model.train() test_out_list = [] for test_input in test_loader: test_out = self.model(test_input[0]) test_out_list.append(test_out.detach().numpy()) return np.concatenate(test_out_list) def _print_model(self): # print model and parameters print(self.model) print(len(list(self.model.parameters()))) for i in range(len(list(self.model.parameters()))): print(list(self.model.parameters())[i].size())
[docs] def fit_eval(self, x, y, validation_data, mc=False, verbose=0, **config): self._get_configs(config) # get data train_loader = self._load_data((x, y), self.batch_size) if validation_data: val_loader = self._load_data(validation_data, self.batch_size) self.feature_num = x.shape[2] self.output_dim = 1 self.model = LSTMModel(self.feature_num, self.hidden_dim, self.layer_num, self.dropout, self.output_dim) # self._print_model() self.criterion = nn.MSELoss() self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr) epochs = config.get('epochs', 20) assert(epochs > 0) val_epoch = 1 for i in range(epochs): train_loss = self._train_one_epoch(train_loader) if verbose == 1: print("Epoch : {}/{}...".format(i, epochs), "Loss: {:.6f}...".format(train_loss), ) if i % val_epoch == 0: if validation_data: val_loss = self._val_one_epoch(val_loader) if verbose == 1: print("Val loss: {:.6f}...".format(val_loss)) if validation_data: result = val_loss else: result = train_loss return result
[docs] def evaluate(self, x, y, metric=['mse']): """ Evaluate on x, y :param x: input :param y: target :param metric: a list of metrics in string format :return: a list of metric evaluation results """ y_pred = self.predict(x) assert y_pred.shape == y.shape return [Evaluator.evaluate(m, y, y_pred) for m in metric]
[docs] def predict(self, x, mc=False): """ Prediction on x. :param x: input :return: predicted y """ test_x = TensorDataset(torch.from_numpy(x)) test_loader = DataLoader(test_x, shuffle=False, batch_size=self.batch_size) predictions = self._test_one_epoch(test_loader, mc=mc) return predictions
[docs] def predict_with_uncertainty(self, x, n_iter=100): test_x = TensorDataset(torch.from_numpy(x)) test_loader = DataLoader(test_x, shuffle=False, batch_size=self.batch_size) result = np.zeros((n_iter,) + (x.shape[0], self.future_seq_len)) for i in range(n_iter): result[i, :, :] = self._test_one_epoch(test_loader, mc=True) prediction = result.mean(axis=0) uncertainty = result.std(axis=0) return prediction, uncertainty
[docs] def save(self, model_path, config_path): """ save model to file. :param model_path: the model file. :param config_path: the config file :return: """ torch.save(self.model.state_dict(), model_path) # os.rename("vanilla_lstm_tmp.h5", model_path) config_to_save = { "future_seq_len": self.future_seq_len, "feature_num": self.feature_num, "metric": self.metric, "batch_size": self.batch_size, "hidden_dim": self.hidden_dim, "dropout": self.dropout, "layer_num": self.layer_num, "output_dim": self.output_dim, # "lr": self.lr } save_config(config_path, config_to_save)
[docs] def restore(self, model_path, **config): """ restore model from file :param model_path: the model file :param config: the trial config :return: the restored model """ # self.model = None # self._build(**config) # self.model = keras.models.load_model(model_path) # self.model.load_weights(file_path) self.future_seq_len = config["future_seq_len"] self.feature_num = config["feature_num"] self.output_dim = config["output_dim"] # for continuous training saved_configs = ["future_seq_len", "metric", "batch_size", "hidden_dim", "dropout", "layer_num", "output_dim"] assert all([c in config for c in saved_configs]) self._get_configs(config) self.model = LSTMModel(self.feature_num, self.hidden_dim, self.layer_num, self.dropout, self.output_dim) self.model.load_state_dict(torch.load(model_path)) self.model.eval()
def _get_required_parameters(self): return { # 'input_shape_x', # 'input_shape_y', # 'out_units' } def _get_optional_parameters(self): return { 'hidden_dim', 'layer_num', 'hidden_dim', 'dropout', 'lr', 'epochs', 'batch_size' }