#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from bigdl.util.common import *
from zoo.common.utils import callZooFunc
from bigdl.dataset.dataset import DataSet
from pyspark.serializers import CloudPickleSerializer
import sys
import math
import warnings
if sys.version >= '3':
long = int
unicode = str
[docs]class Relation(object):
"""
It represents the relationship between two items.
"""
def __init__(self, id1, id2, label, bigdl_type="float"):
self.id1 = id1
self.id2 = id2
self.label = int(label)
self.bigdl_type = bigdl_type
def __reduce__(self):
return Relation, (self.id1, self.id2, self.label)
def __str__(self):
return "Relation [id1: %s, id2: %s, label: %s]" % (
self.id1, self.id2, self.label)
[docs] def to_tuple(self):
return self.id1, self.id2, self.label
[docs]class Relations(object):
[docs] @staticmethod
def read(path, sc=None, min_partitions=1, bigdl_type="float"):
"""
Read relations from csv or txt file.
Each record is supposed to contain the following three fields in order:
id1(string), id2(string) and label(int).
For csv file, it should be without header.
For txt file, each line should contain one record with fields separated by comma.
:param path: The path to the relations file, which can either be a local or disrtibuted file
system (such as HDFS) path.
:param sc: An instance of SparkContext.
If specified, return RDD of Relation.
Default is None and in this case return list of Relation.
:param min_partitions: Int. A suggestion value of the minimal partition number for input
texts. Only need to specify this when sc is not None. Default is 1.
"""
if sc:
jvalue = callZooFunc(bigdl_type, "readRelations", path, sc, min_partitions)
res = jvalue.map(lambda x: Relation(str(x[0]), str(x[1]), int(x[2])))
else:
jvalue = callZooFunc(bigdl_type, "readRelations", path)
res = [Relation(str(x[0]), str(x[1]), int(x[2])) for x in jvalue]
return res
[docs] @staticmethod
def read_parquet(path, sc, bigdl_type="float"):
"""
Read relations from parquet file.
Schema should be the following:
"id1"(string), "id2"(string) and "label"(int).
:param path: The path to the parquet file.
:param sc: An instance of SparkContext.
:return: RDD of Relation.
"""
jvalue = callZooFunc(bigdl_type, "readRelationsParquet", path, sc)
return jvalue.map(lambda x: Relation(str(x[0]), str(x[1]), int(x[2])))
[docs]class Preprocessing(JavaValue):
"""
Preprocessing defines data transform action during feature preprocessing. Python wrapper for
the scala Preprocessing
"""
def __init__(self, bigdl_type="float", *args):
self.bigdl_type = bigdl_type
self.value = callZooFunc(bigdl_type, JavaValue.jvm_class_constructor(self), *args)
def __call__(self, input):
"""
Transform ImageSet or TextSet.
"""
# move the import here to break circular import
if "zoo.feature.image.imageset.ImageSet" not in sys.modules:
from zoo.feature.image import ImageSet
if "zoo.feature.text.text_set.TextSet" not in sys.modules:
from zoo.feature.text import TextSet
# if type(input) is ImageSet:
if isinstance(input, ImageSet):
jset = callZooFunc(self.bigdl_type, "transformImageSet", self.value, input)
return ImageSet(jvalue=jset)
elif isinstance(input, TextSet):
jset = callZooFunc(self.bigdl_type, "transformTextSet", self.value, input)
return TextSet(jvalue=jset)
[docs]class ChainedPreprocessing(Preprocessing):
"""
chains two Preprocessing together. The output type of the first
Preprocessing should be the same with the input type of the second Preprocessing.
"""
def __init__(self, transformers, bigdl_type="float"):
for transfomer in transformers:
assert isinstance(transfomer, Preprocessing), \
str(transfomer) + " should be subclass of Preprocessing "
super(ChainedPreprocessing, self).__init__(bigdl_type, transformers)
[docs]class ScalarToTensor(Preprocessing):
"""
a Preprocessing that converts a number to a Tensor.
"""
def __init__(self, bigdl_type="float"):
super(ScalarToTensor, self).__init__(bigdl_type)
[docs]class SeqToTensor(Preprocessing):
"""
a Transformer that converts an Array[_] or Seq[_] to a Tensor.
:param size dimensions of target Tensor.
"""
def __init__(self, size=[], bigdl_type="float"):
super(SeqToTensor, self).__init__(bigdl_type, size)
[docs]class SeqToMultipleTensors(Preprocessing):
"""
a Transformer that converts an Array[_] or Seq[_] or ML Vector to several tensors.
:param size, list of int list, dimensions of target Tensors, e.g. [[2],[4]]
"""
def __init__(self, size=[], bigdl_type="float"):
super(SeqToMultipleTensors, self).__init__(bigdl_type, size)
[docs]class ArrayToTensor(Preprocessing):
"""
a Transformer that converts an Array[_] to a Tensor.
:param size dimensions of target Tensor.
"""
def __init__(self, size, bigdl_type="float"):
super(ArrayToTensor, self).__init__(bigdl_type, size)
[docs]class MLlibVectorToTensor(Preprocessing):
"""
a Transformer that converts MLlib Vector to a Tensor.
.. note:: Deprecated in 0.4.0. NNEstimator will automatically extract Vectors now.
:param size dimensions of target Tensor.
"""
def __init__(self, size, bigdl_type="float"):
super(MLlibVectorToTensor, self).__init__(bigdl_type, size)
[docs]class FeatureLabelPreprocessing(Preprocessing):
"""
construct a Transformer that convert (Feature, Label) tuple to a Sample.
The returned Transformer is robust for the case label = null, in which the
Sample is derived from Feature only.
:param feature_transformer transformer for feature, transform F to Tensor[T]
:param label_transformer transformer for label, transform L to Tensor[T]
"""
def __init__(self, feature_transformer, label_transformer, bigdl_type="float"):
super(FeatureLabelPreprocessing, self).__init__(bigdl_type,
feature_transformer, label_transformer)
[docs]class TensorToSample(Preprocessing):
"""
a Transformer that converts Tensor to Sample.
"""
def __init__(self, bigdl_type="float"):
super(TensorToSample, self).__init__(bigdl_type)
[docs]class FeatureToTupleAdapter(Preprocessing):
def __init__(self, sample_transformer, bigdl_type="float"):
super(FeatureToTupleAdapter, self).__init__(bigdl_type, sample_transformer)
[docs]class BigDLAdapter(Preprocessing):
def __init__(self, bigdl_transformer, bigdl_type="float"):
super(BigDLAdapter, self).__init__(bigdl_type, bigdl_transformer)
[docs]class ToTuple(Preprocessing):
"""
a Transformer that converts Feature to (Feature, None).
"""
def __init__(self, bigdl_type="float"):
super(ToTuple, self).__init__(bigdl_type)
# todo support padding param
[docs]class SampleToMiniBatch(Preprocessing):
"""
a Transformer that converts Feature to (Feature, None).
"""
def __init__(self,
batch_size,
bigdl_type="float"):
super(SampleToMiniBatch, self).__init__(bigdl_type, batch_size)
[docs]class FeatureSet(DataSet):
"""
A set of data which is used in the model optimization process. The FeatureSet can be accessed in
a random data sample sequence. In the training process, the data sequence is a looped endless
sequence. While in the validation process, the data sequence is a limited length sequence.
Different from BigDL's DataSet, this FeatureSet could be cached to Intel Optane DC Persistent
Memory, if you set memory_type to PMEM when creating FeatureSet.
"""
def __init__(self, jvalue=None, bigdl_type="float"):
self.bigdl_type = bigdl_type
if jvalue:
self.value = jvalue
[docs] @classmethod
def image_frame(cls, image_frame, memory_type="DRAM",
sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from ImageFrame.
:param image_frame: ImageFrame
:param memory_type: string, DRAM, PMEM or a Int number.
If it's DRAM, will cache dataset into dynamic random-access memory
If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
If it's a Int number n, will cache dataset into disk, and only hold 1/n
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order for training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type: numeric type
:return: A feature set
"""
jvalue = callZooFunc(bigdl_type, "createFeatureSetFromImageFrame",
image_frame, memory_type, sequential_order, shuffle)
return cls(jvalue=jvalue)
[docs] @classmethod
def image_set(cls, imageset, memory_type="DRAM",
sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from ImageFrame.
:param imageset: ImageSet
:param memory_type: string, DRAM or PMEM
If it's DRAM, will cache dataset into dynamic random-access memory
If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
If it's a Int number n, will cache dataset into disk, and only hold 1/n
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order for training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type: numeric type
:return: A feature set
"""
jvalue = callZooFunc(bigdl_type, "createFeatureSetFromImageFrame",
imageset.to_image_frame(), memory_type,
sequential_order, shuffle)
return cls(jvalue=jvalue)
[docs] @classmethod
def sample_rdd(cls, rdd, memory_type="DRAM",
sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from RDD[Sample].
:param rdd: A RDD[Sample]
:param memory_type: string, DRAM or PMEM
If it's DRAM, will cache dataset into dynamic random-access memory
If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
If it's a Int number n, will cache dataset into disk, and only hold 1/n
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type:numeric type
:return: A feature set
"""
jvalue = callZooFunc(bigdl_type, "createSampleFeatureSetFromRDD", rdd,
memory_type, sequential_order, shuffle)
return cls(jvalue=jvalue)
[docs] @classmethod
def rdd(cls, rdd, memory_type="DRAM", sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from RDD.
:param rdd: A RDD
:param memory_type: string, DRAM, PMEM or a Int number.
If it's DRAM, will cache dataset into dynamic random-access memory
If it's PMEM, will cache dataset into Intel Optane DC Persistent Memory
If it's a Int number n, will cache dataset into disk, and only hold 1/n
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type:numeric type
:return: A feature set
"""
jvalue = callZooFunc(bigdl_type, "createFeatureSetFromRDD", rdd,
memory_type, sequential_order, shuffle)
return cls(jvalue=jvalue)
[docs] @classmethod
def tf_dataset(cls, func, total_size, bigdl_type="float"):
"""
:param func: a function return a tensorflow dataset
:param total_size: total size of this dataset
:param bigdl_type: numeric type
:return: A feature set
"""
func = CloudPickleSerializer.dumps(CloudPickleSerializer, func)
jvalue = callZooFunc(bigdl_type, "createFeatureSetFromTfDataset", func, total_size)
return cls(jvalue=jvalue)
[docs] @classmethod
def pytorch_dataloader(cls, dataloader,
features="_data[0]", labels="_data[1]", bigdl_type="float"):
"""
Create FeatureSet from pytorch dataloader
:param dataloader: a pytorch dataloader, or a function return pytorch dataloader.
:param features: features in _data, _data is get from dataloader.
:param labels: lables in _data, _data is get from dataloader.
:param bigdl_type: numeric type
:return: A feature set
"""
import torch
if isinstance(dataloader, torch.utils.data.DataLoader):
node_num, core_num = get_node_and_core_number()
if dataloader.batch_size % node_num != 0:
true_bs = math.ceil(dataloader.batch_size / node_num) * node_num
warning_msg = "Detect dataloader's batch_size is not divisible by node number(" + \
str(node_num) + "), will adjust batch_size to " + str(true_bs) + \
" automatically"
warnings.warn(warning_msg)
bys = CloudPickleSerializer.dumps(CloudPickleSerializer, dataloader)
jvalue = callZooFunc(bigdl_type, "createFeatureSetFromPyTorch", bys,
False, features, labels)
return cls(jvalue=jvalue)
elif callable(dataloader):
bys = CloudPickleSerializer.dumps(CloudPickleSerializer, dataloader)
jvalue = callZooFunc(bigdl_type, "createFeatureSetFromPyTorch", bys,
True, features, labels)
return cls(jvalue=jvalue)
else:
raise ValueError("Unsupported dataloader type, please pass pytorch dataloader" +
" or a function to create pytorch dataloader.")
[docs] def to_dataset(self):
"""
To BigDL compatible DataSet
:return:
"""
jvalue = callZooFunc(self.bigdl_type, "featureSetToDataSet", self.value)
return FeatureSet(jvalue=jvalue)