Source code for zoo.automl.model.Seq2Seq_pytorch

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import random

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from zoo.automl.model.abstract import BaseModel
from zoo.automl.common.util import *
from zoo.automl.common.metrics import Evaluator


[docs]class Encoder(nn.Module): def __init__(self, input_dim, hidden_dim, layer_num, dropout): super().__init__() self.hidden_dim = hidden_dim self.layer_num = layer_num self.rnn = nn.LSTM(input_dim, hidden_dim, layer_num, dropout=dropout, batch_first=True) for name, param in self.rnn.named_parameters(): if 'bias' in name: nn.init.constant_(param, 0.0) elif 'weight_ih' in name: nn.init.xavier_normal_(param) elif 'weight_hh' in name: nn.init.orthogonal_(param)
[docs] def forward(self, input_seq): # input_seq = [batch_size, seq_len, feature_num] outputs, (hidden, cell) = self.rnn(input_seq) # outputs = [batch size, seq len, hidden dim] # hidden = [batch size, layer num, hidden dim] # cell = [batch size, layer num, hidden dim] # outputs are always from the top hidden layer return hidden, cell
[docs]class Decoder(nn.Module): def __init__(self, output_dim, hidden_dim, layer_num, dropout): super().__init__() self.output_dim = output_dim self.hidden_dim = hidden_dim self.layer_num = layer_num self.rnn = nn.LSTM(output_dim, hidden_dim, layer_num, dropout=dropout, batch_first=True) self.fc_out = nn.Linear(hidden_dim, output_dim) for name, param in self.rnn.named_parameters(): if 'bias' in name: nn.init.constant_(param, 0.0) elif 'weight_ih' in name: nn.init.xavier_normal_(param) elif 'weight_hh' in name: nn.init.orthogonal_(param)
[docs] def forward(self, decoder_input, hidden, cell): # input = [batch size] # hidden = [batch size, layer num, hidden dim] # cell = [batch size, layer num, hidden dim] # input = decoder_input.view(-1, 1) # decoder_input = [batch size, 1], since output_dim is 1 decoder_input = decoder_input.unsqueeze(1) # decoder_input = [batch_size, 1, 1] output, (hidden, cell) = self.rnn(decoder_input, (hidden, cell)) # output = [batch size, seq len, hidden dim] # hidden = [batch size, layer num, hidden dim] # cell = [batch size, layer num, hidden dim] # seq len will always be 1 in the decoder, therefore: # output = [batch size, 1, hidden dim] # hidden = [batch size, layer num, hidden dim] # cell = [batch size, layer num, hidden dim] prediction = self.fc_out(output.squeeze()) # prediction = [batch size, output dim] return prediction, hidden, cell
[docs]class Seq2Seq(nn.Module): def __init__(self, encoder, decoder, target_seq_len=1): super().__init__() self.encoder = encoder self.decoder = decoder self.target_seq_len = target_seq_len
[docs] def forward(self, source, target=None): # past_seq_len batch_size = source.shape[0] output_dim = self.decoder.output_dim # tensor to store the predicted outputs target_seq = torch.zeros(batch_size, self.target_seq_len, output_dim) # last hidden state of the encoder is used as the initial hidden state of the decoder hidden, cell = self.encoder(source) # Populate the first target sequence with end of encoding series value # decoder_input : [batch_size, output_dim] decoder_input = source[:, -1, :output_dim] for i in range(self.target_seq_len): decoder_output, hidden, cell = self.decoder(decoder_input, hidden, cell) target_seq[:, i] = decoder_output if target is None: # in test mode decoder_input = decoder_output else: decoder_input = target[:, i] return target_seq
[docs]class Seq2SeqPytorch(BaseModel): def __init__(self, check_optional_config=True, future_seq_len=1): """ Constructor of Vanilla LSTM model """ self.model = None self.check_optional_config = check_optional_config self.future_seq_len = future_seq_len self.feature_num = None self.output_dim = None self.metric = None self.batch_size = None self.criterion = None self.optimizer = None def _get_configs(self, config): super()._check_config(**config) self.metric = config.get('metric', 'mean_squared_error') self.batch_size = config.get('batch_size', 32) self.hidden_dim = config.get('hidden_dim', 32) self.layer_num = config.get('layer_num', 2) self.dropout = config.get('dropout', 0.2) self.lr = config.get("lr", 0.001) def _load_data(self, input_data, batch_size): x, y = input_data data = TensorDataset(torch.from_numpy(x), torch.from_numpy(y)) data_loader = DataLoader(data, shuffle=True, batch_size=batch_size) return data_loader def _train_one_epoch(self, train_loader): self.model.train() train_losses = [] for input_seqs, target_seqs in train_loader: self.model.zero_grad() outputs = self.model(input_seqs, target_seqs) loss = self.criterion(outputs, target_seqs) # get gradients loss.backward() # update parameters self.optimizer.step() train_losses.append(loss.item()) return np.mean(train_losses) def _val_one_epoch(self, val_loader): self.model.eval() val_losses = [] for val_input, val_target in val_loader: val_out = self.model(val_input) val_loss = self.criterion(val_out, val_target) val_losses.append(val_loss.item()) return np.mean(val_losses) def _test_one_epoch(self, test_loader, mc=False): if not mc: self.model.eval() else: self.model.train() test_out_list = [] for test_input in test_loader: # test_input is a list with one element test_out = self.model(test_input[0]) test_out_list.append(test_out.detach().numpy()) predictions = np.concatenate(test_out_list) y_pred = np.squeeze(predictions, axis=2) return y_pred def _print_model(self): # print model and parameters print(self.model) print(len(list(self.model.parameters()))) for i in range(len(list(self.model.parameters()))): print(list(self.model.parameters())[i].size()) def _expand_y(self, y): """ expand dims for y. :param y: :return: """ while len(y.shape) < 3: y = np.expand_dims(y, axis=2) return y def _pre_processing(self, x, y, validation_data): """ pre_process input data 1. expand dims for y and vay_y 2. get input lengths :param x: :param y: :param validation_data: :return: """ y = self._expand_y(y) self.feature_num = x.shape[2] self.output_dim = y.shape[2] if validation_data is not None: val_x, val_y = validation_data val_y = self._expand_y(val_y) return x, y, (val_x, val_y)
[docs] def fit_eval(self, x, y, validation_data=None, mc=False, verbose=0, **config): self._get_configs(config) x, y, validation_data = self._pre_processing(x, y, validation_data) # get data train_loader = self._load_data((x, y), self.batch_size) if validation_data: val_loader = self._load_data(validation_data, self.batch_size) encoder = Encoder(self.feature_num, self.hidden_dim, self.layer_num, self.dropout) decoder = Decoder(self.output_dim, self.hidden_dim, self.layer_num, self.dropout) self.model = Seq2Seq(encoder, decoder, target_seq_len=self.future_seq_len) print(encoder) print(decoder) # self._print_model() self.criterion = nn.MSELoss() self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr) epochs = config.get('epochs', 20) assert (epochs > 0) val_epoch = 1 for i in range(epochs): train_loss = self._train_one_epoch(train_loader) if verbose == 1: print("Epoch : {}/{}...".format(i, epochs), "Loss: {:.6f}...".format(train_loss), ) if i % val_epoch == 0: if validation_data: val_loss = self._val_one_epoch(val_loader) if verbose == 1: print("Val loss: {:.6f}...".format(val_loss)) if validation_data: result = val_loss else: result = train_loss return result
[docs] def evaluate(self, x, y, metric=['mse']): """ Evaluate on x, y :param x: input :param y: target :param metric: a list of metrics in string format :return: a list of metric evaluation results """ y_pred = self.predict(x) assert y_pred.shape == y.shape return [Evaluator.evaluate(m, y, y_pred) for m in metric]
[docs] def predict(self, x, mc=False): """ Prediction on x. :param x: input :return: predicted y """ test_x = TensorDataset(torch.from_numpy(x)) test_loader = DataLoader(test_x, shuffle=False, batch_size=self.batch_size) y_pred = self._test_one_epoch(test_loader, mc=mc) return y_pred
[docs] def predict_with_uncertainty(self, x, n_iter=100): test_x = TensorDataset(torch.from_numpy(x)) test_loader = DataLoader(test_x, shuffle=False, batch_size=self.batch_size) result = np.zeros((n_iter,) + (x.shape[0], self.future_seq_len)) for i in range(n_iter): result[i, :, :] = self._test_one_epoch(test_loader, mc=True) prediction = result.mean(axis=0) uncertainty = result.std(axis=0) return prediction, uncertainty
[docs] def save(self, model_path, config_path): """ save model to file. :param model_path: the model file. :param config_path: the config file :return: """ torch.save(self.model.state_dict(), model_path) # os.rename("vanilla_lstm_tmp.h5", model_path) config_to_save = { "future_seq_len": self.future_seq_len, "feature_num": self.feature_num, "metric": self.metric, "batch_size": self.batch_size, "hidden_dim": self.hidden_dim, "dropout": self.dropout, "layer_num": self.layer_num, "output_dim": self.output_dim, # "lr": self.lr } save_config(config_path, config_to_save)
[docs] def restore(self, model_path, **config): """ restore model from file :param model_path: the model file :param config: the trial config :return: the restored model """ # self.model = None # self._build(**config) # self.model = keras.models.load_model(model_path) # self.model.load_weights(file_path) self.future_seq_len = config["future_seq_len"] self.feature_num = config["feature_num"] self.output_dim = config["output_dim"] # for continuous training saved_configs = ["future_seq_len", "metric", "batch_size", "hidden_dim", "dropout", "layer_num", "output_dim"] assert all([c in config for c in saved_configs]) self._get_configs(config) encoder = Encoder(self.feature_num, self.hidden_dim, self.layer_num, self.dropout) decoder = Decoder(self.output_dim, self.hidden_dim, self.layer_num, self.dropout) self.model = Seq2Seq(encoder, decoder, target_seq_len=self.future_seq_len) self.model.load_state_dict(torch.load(model_path)) self.model.eval()
def _get_required_parameters(self): return { # 'input_shape_x', # 'input_shape_y', # 'out_units' } def _get_optional_parameters(self): return { 'hidden_dim', 'layer_num', 'hidden_dim', 'dropout', 'lr', 'epochs', 'batch_size' }