#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import shutil
import tempfile
import zipfile
import numpy as np
import pandas as pd
import os
import json
IDENTIFIER_LEN = 27
[docs]def train_val_test_split(df,
val_ratio=0,
test_ratio=0.1,
look_back=0,
horizon=1):
"""
split input dataframe into train_df, val_df and test_df according to split ratio.
The dataframe is splitted in its originally order in timeline.
e.g. |......... train_df(80%) ........ | ... val_df(10%) ...| ...test_df(10%)...|
:param df: dataframe to be splitted
:param val_ratio: validation ratio
:param test_ratio: test ratio
:param look_back: the length to look back
:param horizon: num of steps to look forward
:return:
"""
# suitable to nyc taxi dataset.
total_num = df.index.size
test_num = int(total_num * test_ratio)
val_num = int(total_num * val_ratio)
test_split_index = test_num + look_back + horizon - 1
val_split_index = test_split_index + val_num
train_df = df.iloc[:-(test_num + val_num)]
val_df = df.iloc[-val_split_index: -test_num]
test_df = df.iloc[-test_split_index:]
if not pd.api.types.is_datetime64_any_dtype(df.index.dtype):
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
return train_df, val_df, test_df
[docs]class NumpyEncoder(json.JSONEncoder):
"""
convert numpy array to list for JSON serialize
"""
[docs] def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
[docs]def save_config(file_path, config, replace=False):
"""
:param file_path: the file path of config to be saved.
:param config: dict. The config to be saved
:param replace: whether to replace if the config file already existed.
:return:
"""
if os.path.isfile(file_path) and not replace:
with open(file_path, "r") as input_file:
old_config = json.load(input_file)
old_config.update(config)
config = old_config.copy()
file_dirname = os.path.dirname(os.path.abspath(file_path))
if file_dirname and not os.path.exists(file_dirname):
os.makedirs(file_dirname)
with open(file_path, "w") as output_file:
json.dump(config, output_file, cls=NumpyEncoder)
[docs]def load_config(file_path):
with open(file_path, "r") as input_file:
data = json.load(input_file)
return data
[docs]def save(file_path, feature_transformers=None, model=None, config=None):
if not os.path.isdir(file_path):
os.mkdir(file_path)
config_path = os.path.join(file_path, "config.json")
model_path = os.path.join(file_path, "weights_tune.h5")
if feature_transformers is not None:
feature_transformers.save(config_path, replace=True)
if model is not None:
model.save(model_path, config_path)
if config is not None:
save_config(config_path, config)
[docs]def save_zip(file, feature_transformers=None, model=None, config=None):
file_dirname = os.path.dirname(os.path.abspath(file))
if file_dirname and not os.path.exists(file_dirname):
os.makedirs(file_dirname)
dirname = tempfile.mkdtemp(prefix="automl_save_")
try:
save(dirname,
feature_transformers=feature_transformers,
model=model,
config=config)
with zipfile.ZipFile(file, 'w') as f:
for dirpath, dirnames, filenames in os.walk(dirname):
for filename in filenames:
f.write(os.path.join(dirpath, filename), filename)
assert os.path.isfile(file)
finally:
shutil.rmtree(dirname)
[docs]def process(cmd):
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
# process.wait()
outs, errors = proc.communicate()
# if outs:
# print("hdfs std out:", outs)
if errors:
print("hdfs errors:", errors)
return outs, errors
[docs]def get_remote_list(dir_in):
# dir_in = "hdfs://172.16.0.103:9000/yushan/"
args = "hdfs dfs -ls " + dir_in + " | awk '{print $8}'"
s_output, _ = process(args)
all_dart_dirs = s_output.split()
names = []
for filename in all_dart_dirs:
filename = filename.decode()
name_list = filename.split('/')
names.append(name_list[-1])
# print(names)
return names
[docs]def upload_ppl_hdfs(upload_dir, ckpt_name):
# The default upload_dir is {remote_root}/ray_results/automl
# The name of ray checkpoint_dir is train_func_0_{config}_{time}_{tmp},
# with a max identifier length of 130.
# If there is a list([]) in config and is truncated into part of [],
# then the path name can't be identified by hadoop command.
# Therefore we use the last IDENTIFIER_LEN=27 of checkpoint_dir as upload_dir_name,
# with a format of {time}_{tmp}, in order to avoid misinterpretation.
log_dir = os.path.abspath(".")
log_name = os.path.basename(log_dir)[-IDENTIFIER_LEN:]
remote_log_dir = os.path.join(upload_dir, log_name)
if log_name not in get_remote_list(upload_dir):
cmd = "hadoop fs -mkdir {remote_log_dir};" \
" hadoop fs -put -f {local_file} {remote_log_dir}"\
.format(local_file=ckpt_name, remote_log_dir=remote_log_dir)
else:
cmd = " hadoop fs -put -f {local_file} {remote_log_dir}".format(
local_file=ckpt_name,
remote_log_dir=remote_log_dir)
# print("upload hdfs cmd is:", sync_cmd)
process(cmd)
[docs]def restore(file, feature_transformers=None, model=None, config=None):
model_path = os.path.join(file, "weights_tune.h5")
config_path = os.path.join(file, "config.json")
local_config = load_config(config_path)
if config is not None:
all_config = config.copy()
all_config.update(local_config)
else:
all_config = local_config
if model:
model.restore(model_path, **all_config)
if feature_transformers:
feature_transformers.restore(**all_config)
return all_config
[docs]def restore_zip(file, feature_transformers=None, model=None, config=None):
dirname = tempfile.mkdtemp(prefix="automl_save_")
try:
with zipfile.ZipFile(file) as zf:
zf.extractall(dirname)
all_config = restore(dirname, feature_transformers, model, config)
finally:
shutil.rmtree(dirname)
return all_config
[docs]def restore_hdfs(model_path, remote_dir, feature_transformers=None, model=None, config=None):
model_name = os.path.basename(model_path)
local_best_dirname = os.path.basename(os.path.dirname(model_path))
remote_model = os.path.join(remote_dir, local_best_dirname[-IDENTIFIER_LEN:], model_name)
tmp_dir = tempfile.mkdtemp(prefix="automl_save_")
try:
cmd = "hadoop fs -get {} {}".format(remote_model, tmp_dir)
# print("get hdfs cmd is:", cmd)
process(cmd)
with zipfile.ZipFile(os.path.join(tmp_dir, model_name)) as zf:
zf.extractall(tmp_dir)
# print(os.listdir(tmp_dir))
all_config = restore(tmp_dir, feature_transformers, model, config)
finally:
shutil.rmtree(tmp_dir)
return all_config
[docs]def convert_bayes_configs(config):
selected_features = []
new_config = {}
for config_name, config_value in config.items():
if config_name.startswith('bayes_feature'):
# print(config_name, config_value)
if config_value >= 0.5:
feature_name = config_name.replace('bayes_feature_', '')
selected_features.append(feature_name)
elif config_name == 'batch_size_log':
batch_size = int(2 ** config_value)
new_config['batch_size'] = batch_size
elif config_name.endswith('float'):
int_config_name = config_name.replace('_float', '')
int_config_value = int(config_value)
new_config[int_config_name] = int_config_value
else:
new_config[config_name] = config_value
if selected_features:
new_config['selected_features'] = json.dumps(selected_features)
# print("config after bayes conversion is ", new_config)
return new_config