#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.ml.param.shared import *
from pyspark.ml.wrapper import JavaModel, JavaEstimator, JavaTransformer
from bigdl.optim.optimizer import SGD
from zoo.common.utils import callZooFunc
from bigdl.util.common import *
from zoo.feature.common import *
if sys.version >= '3':
long = int
unicode = str
[docs]class HasBatchSize(Params):
"""
Mixin for param batchSize: batch size.
"""
# a placeholder to make it appear in the generated doc
batchSize = Param(Params._dummy(), "batchSize", "batchSize (>= 0).")
def __init__(self):
super(HasBatchSize, self).__init__()
#: param for batch size.
self.batchSize = Param(self, "batchSize", "batchSize")
self._setDefault(batchSize=1)
[docs] def setBatchSize(self, val):
"""
Sets the value of :py:attr:`batchSize`.
"""
self._paramMap[self.batchSize] = val
return self
[docs] def getBatchSize(self):
"""
Gets the value of batchSize or its default value.
"""
return self.getOrDefault(self.batchSize)
[docs]class HasSamplePreprocessing:
"""
Mixin for param samplePreprocessing
"""
samplePreprocessing = None
def __init__(self):
super(HasSamplePreprocessing, self).__init__()
[docs] def setSamplePreprocessing(self, val):
"""
Sets samplePreprocessing
"""
pythonBigDL_method_name = "setSamplePreprocessing"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value, val)
self.samplePreprocessing = val
return self
[docs] def getSamplePreprocessing(self):
return self.samplePreprocessing
[docs]class HasOptimMethod:
def __init__(self):
super(HasOptimMethod, self).__init__()
self.optimMethod = SGD()
[docs] def setOptimMethod(self, val):
"""
Sets optimization method. E.g. SGD, Adam, LBFGS etc. from bigdl.optim.optimizer.
default: SGD()
"""
pythonBigDL_method_name = "setOptimMethod"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value, val)
self.optimMethod = val
return self
[docs] def getOptimMethod(self):
"""
Gets the optimization method
"""
return self.optimMethod
[docs]class HasThreshold(Params):
"""
Mixin for param Threshold in binary classification.
The threshold applies to the raw output of the model. If the output is greater than
threshold, then predict 1, else 0. A high threshold encourages the model to predict 0
more often; a low threshold encourages the model to predict 1 more often.
Note: the param is different from the one in Spark ProbabilisticClassifier which is compared
against estimated probability.
Default is 0.5.
"""
def __init__(self):
super(HasThreshold, self).__init__()
self.threshold = Param(self, "threshold", "threshold")
self._setDefault(threshold=0.5)
[docs] def setThreshold(self, val):
"""
Sets the value of :py:attr:`threshold`.
"""
self._paramMap[self.threshold] = val
return self
[docs] def getThreshold(self):
"""
Gets the value of threshold or its default value.
"""
return self.getOrDefault(self.threshold)
[docs]class NNEstimator(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasBatchSize,
HasOptimMethod, HasSamplePreprocessing, JavaValue):
"""
NNEstimator extends org.apache.spark.ml.Estimator and supports training a BigDL model with
Spark DataFrame data. It can be integrated into a standard Spark ML Pipeline to enable
users for combined usage with Spark MLlib.
NNEstimator supports different feature and label data type through operation defined in
Preprocessing. We provide pre-defined Preprocessing for popular data types like Array
or Vector in package zoo.feature, while user can also develop customized Preprocess
which extends from feature.common.Preprocessing. During fit, NNEstimator
will extract feature and label data from input DataFrame and use the Preprocessing to prepare
data for the model.
Using the Preprocessing allows NNEstimator to cache only the raw data and decrease the
memory consumption during feature conversion and training.
More concrete examples are available in package com.intel.analytics.zoo.examples.nnframes
"""
def __init__(self, model, criterion,
feature_preprocessing=None,
label_preprocessing=None,
jvalue=None, bigdl_type="float"):
"""
Construct a NNEstimator with BigDL model, criterion and Preprocessing for feature and label
data.
:param model: BigDL Model to be trained.
:param criterion: BigDL criterion.
:param feature_preprocessing: The param converts the data in feature column to a
Tensor or to a Sample directly. It expects a List of Int as the size of the
converted Tensor, or a Preprocessing[F, Tensor[T]]
If a List of Int is set as feature_preprocessing, it can only handle the case that
feature column contains the following data types:
Float, Double, Int, Array[Float], Array[Double], Array[Int] and MLlib Vector. The
feature data are converted to Tensors with the specified sizes before
sending to the model. Internally, a SeqToTensor is generated according to the
size, and used as the feature_preprocessing.
Alternatively, user can set feature_preprocessing as Preprocessing[F, Tensor[T]]
that transforms the feature data to a Tensor[T]. Some pre-defined Preprocessing are
provided in package zoo.feature. Multiple Preprocessing can be combined as a
ChainedPreprocessing.
The feature_preprocessing will also be copied to the generated NNModel and applied
to feature column during transform.
:param label_preprocessing: similar to feature_preprocessing, but applies to Label data.
:param jvalue: Java object create by Py4j
:param bigdl_type: optional parameter. data type of model, "float"(default) or "double".
"""
super(NNEstimator, self).__init__()
# avoid initialization during import.
if not feature_preprocessing:
feature_preprocessing = SeqToTensor()
if not label_preprocessing:
label_preprocessing = SeqToTensor()
if type(feature_preprocessing) is list:
if type(feature_preprocessing[0]) is list:
feature_preprocessing = SeqToMultipleTensors(feature_preprocessing)
elif isinstance(feature_preprocessing[0], int):
feature_preprocessing = SeqToTensor(feature_preprocessing)
if type(label_preprocessing) is list:
assert (all(isinstance(x, int) for x in label_preprocessing))
label_preprocessing = SeqToTensor(label_preprocessing)
sample_preprocessing = FeatureLabelPreprocessing(feature_preprocessing, label_preprocessing)
self.value = jvalue if jvalue else callZooFunc(
bigdl_type, self.jvm_class_constructor(), model, criterion, sample_preprocessing)
self.model = model
self.samplePreprocessing = sample_preprocessing
self.bigdl_type = bigdl_type
self._java_obj = self.value
self.maxEpoch = Param(self, "maxEpoch", "number of max Epoch")
self.learningRate = Param(self, "learningRate", "learning rate")
self.learningRateDecay = Param(self, "learningRateDecay", "learning rate decay")
self.cachingSample = Param(self, "cachingSample", "cachingSample")
self.train_summary = None
self.validation_config = None
self.checkpoint_config = None
self.validation_summary = None
self.endWhen = None
self.dataCacheLevel = "DRAM"
[docs] def setSamplePreprocessing(self, val):
"""
Sets the value of sample_preprocessing
:param val: a Preprocesing[(Feature, Option(Label), Sample]
"""
super(NNEstimator, self).setSamplePreprocessing(val)
return self
[docs] def setMaxEpoch(self, val):
"""
Sets the value of :py:attr:`maxEpoch`.
"""
self._paramMap[self.maxEpoch] = val
return self
[docs] def getMaxEpoch(self):
"""
Gets the value of maxEpoch or its default value.
"""
return self.getOrDefault(self.maxEpoch)
[docs] def setEndWhen(self, trigger):
"""
When to stop the training, passed in a Trigger. E.g. maxIterations(100)
"""
pythonBigDL_method_name = "setEndWhen"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value, trigger)
self.endWhen = trigger
return self
[docs] def getEndWhen(self):
"""
Gets the value of endWhen or its default value.
"""
return self.endWhen
[docs] def setDataCacheLevel(self, level, numSlice=None):
"""
:param level: string, "DRAM", "PMEM" or "DISK_AND_DRAM".
If it's DRAM, will cache dataset into dynamic random-access memory
If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
If it's DISK_AND_DRAM, will cache dataset into disk, and only hold 1/numSlice
of the data into memory during the training. After going through the
1/numSlice, we will release the current cache, and load another slice into
memory.
"""
pythonBigDL_method_name = "setDataCacheLevel"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value, level, numSlice)
self.dataCacheLevel = level if numSlice is None else (level, numSlice)
return self
[docs] def getDataCacheLevel(self):
return self.dataCacheLevel
[docs] def setLearningRate(self, val):
"""
Sets the value of :py:attr:`learningRate`.
.. note:: Deprecated in 0.4.0. Please set learning rate with optimMethod directly.
"""
self._paramMap[self.learningRate] = val
return self
[docs] def getLearningRate(self):
"""
Gets the value of learningRate or its default value.
"""
return self.getOrDefault(self.learningRate)
[docs] def setLearningRateDecay(self, val):
"""
Sets the value of :py:attr:`learningRateDecay`.
.. note:: Deprecated in 0.4.0. Please set learning rate decay with optimMethod directly.
"""
self._paramMap[self.learningRateDecay] = val
return self
[docs] def getLearningRateDecay(self):
"""
Gets the value of learningRateDecay or its default value.
"""
return self.getOrDefault(self.learningRateDecay)
[docs] def setCachingSample(self, val):
"""
whether to cache the Samples after preprocessing. Default: True
"""
self._paramMap[self.cachingSample] = val
return self
[docs] def isCachingSample(self):
"""
Gets the value of cachingSample or its default value.
"""
return self.getOrDefault(self.cachingSample)
[docs] def setTrainSummary(self, val):
"""
Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the
training data, which can be used for visualization via Tensorboard.
Use setTrainSummary to enable train logger. Then the log will be saved to
logDir/appName/train as specified by the parameters of TrainSummary.
Default: Not enabled
:param summary: a TrainSummary object
"""
pythonBigDL_method_name = "setTrainSummary"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value, val)
self.train_summary = val
return self
[docs] def getTrainSummary(self):
"""
Gets the train summary
"""
return self.train_summary
[docs] def setValidationSummary(self, val):
"""
Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the
validation data if validation data is set, which can be used for visualization via
Tensorboard. Use setValidationSummary to enable validation logger. Then the log will be
saved to logDir/appName/ as specified by the parameters of validationSummary.
Default: None
"""
pythonBigDL_method_name = "setValidationSummary"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value, val)
self.validation_summary = val
return self
[docs] def getValidationSummary(self):
"""
Gets the Validation summary
"""
return self.validation_summary
[docs] def setValidation(self, trigger, val_df, val_method, batch_size):
"""
Set a validate evaluation during training
:param trigger: validation interval
:param val_df: validation dataset
:param val_method: the ValidationMethod to use,e.g. "Top1Accuracy", "Top5Accuracy", "Loss"
:param batch_size: validation batch size
"""
pythonBigDL_method_name = "setValidation"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value,
trigger, val_df, val_method, batch_size)
self.validation_config = [trigger, val_df, val_method, batch_size]
return self
[docs] def getValidation(self):
"""
Gets the validate configuration. If validation config has been set, getValidation will
return a List of [ValidationTrigger, Validation data, Array[ValidationMethod[T]],
batchsize]
"""
return self.validation_config
[docs] def clearGradientClipping(self):
"""
Clear clipping params, in this case, clipping will not be applied.
In order to take effect, it needs to be called before fit.
"""
callZooFunc(self.bigdl_type, "nnEstimatorClearGradientClipping",
self.value)
return self
[docs] def setConstantGradientClipping(self, min, max):
"""
Set constant gradient clipping during the training process.
In order to take effect, it needs to be called before fit.
# Arguments
min: The minimum value to clip by. Float.
max: The maximum value to clip by. Float.
"""
callZooFunc(self.bigdl_type, "nnEstimatorSetConstantGradientClipping",
self.value,
float(min),
float(max))
return self
[docs] def setGradientClippingByL2Norm(self, clip_norm):
"""
Clip gradient to a maximum L2-Norm during the training process.
In order to take effect, it needs to be called before fit.
# Arguments
clip_norm: Gradient L2-Norm threshold. Float.
"""
callZooFunc(self.bigdl_type, "nnEstimatorSetGradientClippingByL2Norm",
self.value,
float(clip_norm))
return self
[docs] def setCheckpoint(self, path, trigger, isOverWrite=True):
"""
Set check points during training. Not enabled by default
:param path: the directory to save the model
:param trigger: how often to save the check point
:param isOverWrite: whether to overwrite existing snapshots in path. Default is True
:return: self
"""
pythonBigDL_method_name = "setCheckpoint"
callZooFunc(self.bigdl_type, pythonBigDL_method_name, self.value,
path, trigger, isOverWrite)
self.checkpoint_config = [path, trigger, isOverWrite]
return self
[docs] def getCheckpoint(self):
"""
:return: a tuple containing (checkpointPath, checkpointTrigger, checkpointOverwrite)
"""
return self.checkpoint_config
def _create_model(self, java_model):
# explicity reset SamplePreprocessing even though java_model already has the preprocessing,
# so that python NNModel also has sample_preprocessing
estPreprocessing = self.getSamplePreprocessing()
nnModel = NNModel(model=self.model, feature_preprocessing=None, jvalue=java_model,
bigdl_type=self.bigdl_type) \
.setSamplePreprocessing(ChainedPreprocessing([ToTuple(), estPreprocessing]))
nnModel.setFeaturesCol(self.getFeaturesCol()) \
.setPredictionCol(self.getPredictionCol()) \
.setBatchSize(java_model.getBatchSize())
return nnModel
[docs]class NNModel(JavaTransformer, HasFeaturesCol, HasPredictionCol, HasBatchSize,
HasSamplePreprocessing, JavaValue):
"""
NNModel extends Spark ML Transformer and supports BigDL model with Spark DataFrame.
NNModel supports different feature data type through Preprocessing. Some common
Preprocessing have been defined in com.intel.analytics.zoo.feature.
After transform, the prediction column contains the output of the model as Array[T], where
T (Double or Float) is decided by the model type.
"""
def __init__(self, model, feature_preprocessing=None, jvalue=None, bigdl_type="float"):
"""
create a NNModel with a BigDL model
:param model: trained BigDL model to use in prediction.
:param feature_preprocessing: The param converts the data in feature column to a
Tensor. It expects a List of Int as
the size of the converted Tensor, or a
Preprocessing[F, Tensor[T]]
:param jvalue: Java object create by Py4j
:param bigdl_type: optional parameter. data type of model, "float"(default) or "double".
"""
super(NNModel, self).__init__()
# initialize with Java NNModel
if jvalue:
assert feature_preprocessing is None
self.value = jvalue
# initialize with Python Model and preprocessing
else:
if not feature_preprocessing:
feature_preprocessing = SeqToTensor()
if type(feature_preprocessing) is list:
if type(feature_preprocessing[0]) is list:
feature_preprocessing = SeqToMultipleTensors(feature_preprocessing)
elif isinstance(feature_preprocessing[0], int):
feature_preprocessing = SeqToTensor(feature_preprocessing)
sample_preprocessing = ChainedPreprocessing([feature_preprocessing, TensorToSample()])
self.value = callZooFunc(
bigdl_type, self.jvm_class_constructor(), model, sample_preprocessing)
self.samplePreprocessing = sample_preprocessing
self.model = model
self._java_obj = self.value
self.bigdl_type = bigdl_type
self.setBatchSize(self.value.getBatchSize())
[docs] def save(self, path):
self._transfer_params_to_java()
callZooFunc(self.bigdl_type, "saveNNModel", self.value, path)
return self
[docs] @staticmethod
def load(path):
jvalue = callZooFunc("float", "loadNNModel", path)
return NNModel(model=None, feature_preprocessing=None, jvalue=jvalue)
[docs]class NNClassifier(NNEstimator):
"""
NNClassifier is a specialized NNEstimator that simplifies the data format for
classification tasks. It only supports label column of DoubleType, and the fitted
NNClassifierModel will have the prediction column of DoubleType.
"""
def __init__(self, model, criterion, feature_preprocessing=None,
jvalue=None, bigdl_type="float"):
"""
:param model: BigDL module to be optimized
:param criterion: BigDL criterion method
:param feature_preprocessing: The param converts the data in feature column to a
Tensor. It expects a List of Int as
the size of the converted Tensor, or a
Preprocessing[F, Tensor[T]]
:param bigdl_type(optional): Data type of BigDL model, "float"(default) or "double".
"""
if not feature_preprocessing:
feature_preprocessing = SeqToTensor()
super(NNClassifier, self).__init__(
model, criterion, feature_preprocessing, ScalarToTensor(), jvalue, bigdl_type)
[docs] def setSamplePreprocessing(self, val):
"""
Sets the value of sample_preprocessing
:param val: a Preprocesing[(Feature, Option(Label), Sample]
"""
super(NNClassifier, self).setSamplePreprocessing(val)
return self
def _create_model(self, java_model):
# explicity reset SamplePreprocessing even though java_model already has the preprocessing,
# so that python NNClassifierModel also has sample_preprocessing
estPreprocessing = self.getSamplePreprocessing()
classifierModel = NNClassifierModel(model=self.model, feature_preprocessing=None,
jvalue=java_model, bigdl_type=self.bigdl_type) \
.setSamplePreprocessing(ChainedPreprocessing([ToTuple(), estPreprocessing]))
classifierModel.setFeaturesCol(self.getFeaturesCol()) \
.setPredictionCol(self.getPredictionCol()) \
.setBatchSize(java_model.getBatchSize())
return classifierModel
[docs]class NNClassifierModel(NNModel, HasThreshold):
"""
NNClassifierModel is a specialized [[NNModel]] for classification tasks. The prediction
column will have the datatype of Double.
"""
def __init__(self, model, feature_preprocessing=None, jvalue=None,
bigdl_type="float"):
"""
:param model: trained BigDL model to use in prediction.
:param feature_preprocessing: The param converts the data in feature column to a
Tensor. It expects a List of Int as
the size of the converted Tensor, or a
Preprocessing[F, Tensor[T]]
:param jvalue: Java object create by Py4j
:param bigdl_type(optional): Data type of BigDL model, "float"(default) or "double".
"""
super(NNClassifierModel, self).__init__(model, feature_preprocessing, jvalue, bigdl_type)
[docs] @staticmethod
def load(path):
jvalue = callZooFunc("float", "loadNNClassifierModel", path)
return NNClassifierModel(model=None, feature_preprocessing=None, jvalue=jvalue)
[docs]class XGBClassifierModel:
'''
XGBClassifierModel is a trained XGBoost classification model. The prediction column
will have the prediction results.
'''
def __init__(self, jvalue):
super(XGBClassifierModel, self).__init__()
assert jvalue is not None
self.value = jvalue
[docs] def setFeaturesCol(self, features):
callZooFunc("float", "setFeaturesXGBClassifierModel", self.value, features)
[docs] def setPredictionCol(self, prediction):
callZooFunc("float", "setPredictionXGBClassifierModel", self.value, prediction)
[docs] @staticmethod
def loadModel(path, numClasses):
"""
load a pretrained XGBoostClassificationModel
:param path: pretrained model path
:param numClasses: number of classes for classification
"""
jvalue = callZooFunc("float", "loadXGBClassifierModel", path, numClasses)
return XGBClassifierModel(jvalue=jvalue)