zoo.orca.data package

Submodules

zoo.orca.data.file module

zoo.orca.data.file.exists(path)[source]

Check if a path exists or not. It supports local, hdfs, s3 file systems. :param path: file or directory path string. :return: if path exists or not.

zoo.orca.data.file.load_numpy(path)[source]

Load arrays or pickled objects from .npy, .npz or pickled files. It supports local, hdfs, s3 file systems. :param path: file path :return: array, tuple, dict, etc.

Data stored in the file. For .npz files, the returned instance of NpzFile class must be closed to avoid leaking file descriptors.
zoo.orca.data.file.makedirs(path)[source]

Make a directory with creating intermediate directories. It supports local, hdfs, s3 file systems. :param path: directory path string to be created.

zoo.orca.data.file.open_image(path)[source]

Open a image file. It supports local, hdfs, s3 file systems. :param path: an image file path :return: An Image object.

zoo.orca.data.file.open_text(path)[source]

Read a text file to list of lines. It supports local, hdfs, s3 file systems. :param path: text file path :return: list of lines

zoo.orca.data.file.write_text(path, text)[source]

Write text to a file. It supports local, hdfs, s3 file systems. :param path: file path :param text: text string :return: number of bytes written or AWS response(s3 file systems)

zoo.orca.data.shard module

class zoo.orca.data.shard.RayPartition(object_id, node_ip, object_store_address)[source]

Bases: object

A partition of RayXShards containing the plasma ObjectID, the plasma object_store_address, and the node of the partition.

get_data()[source]
class zoo.orca.data.shard.RayXShards(partitions)[source]

Bases: zoo.orca.data.shard.XShards

A collection of data which can be pre-processed in parallel on Ray

collect()[source]

Returns a list that contains all of the elements in this XShards :return: list of elements

colocate_actors(actors)[source]

Sort Ray actors and RayPartitions by node_ip so that each actor is colocated with the data partition on the same node.

get_partitions()[source]

Return the list of RayPartition of the RayXShards

num_partitions()[source]

return the number of partitions in this XShards :return: an int

transform_shard(func, *args)[source]

Transform each shard in the XShards using specified function. :param func: pre-processing function :param args: arguments for the pre-processing function :return: DataShard

class zoo.orca.data.shard.SharedValue(data)[source]

Bases: object

unpersist()[source]
value
class zoo.orca.data.shard.SparkXShards(rdd, transient=False)[source]

Bases: zoo.orca.data.shard.XShards

A collection of data which can be pre-processed in parallel on Spark

cache()[source]

Persist this SparkXShards in memory :return:

collect()[source]

Returns a list that contains all of the elements in this SparkXShards :return: a list of data elements.

compute()[source]
is_cached()[source]
num_partitions()[source]

Get number of partitions for this SparkXShards. :return: number of partitions.

partition_by(cols, num_partitions=None)[source]

Return a new SparkXShards partitioned using the specified columns. This is only applicable for SparkXShards of Pandas DataFrame. :param cols: specified columns to partition by. :param num_partitions: target number of partitions. If not specified, the new SparkXShards would keep the current partition number. :return: a new SparkXShards.

repartition(num_partitions)[source]

Return a new SparkXShards that has exactly num_partitions partitions. :param num_partitions: target number of partitions :return: a new SparkXshards object.

save_pickle(path, batchSize=10)[source]

Save this SparkXShards as a SequenceFile of serialized objects. The serializer used is pyspark.serializers.PickleSerializer, default batch size is 10. :param path: target path. :param batchSize: batch size for each sequence file chunk.

split()[source]

Split SparkXShards into multiple SparkXShards. Each element in the SparkXShards needs be a list or tuple with same length. :return: Splits of SparkXShards. If element in the input SparkDataShard is not

list or tuple, return list of input SparkDataShards.
to_ray()[source]

Put data of this SparkXShards to Ray cluster object store. :return: a new RayXShards which contains data of this SparkXShards.

transform_shard(func, *args)[source]

Return a new SparkXShards by applying a function to each shard of this SparkXShards :param func: python function to process data. The first argument is the data shard. :param args: other arguments in this function. :return: a new SparkXShards.

uncache()[source]

Make this SparkXShards as non-persistent, and remove all blocks for it from memory :return:

unique()[source]

Return a unique list of elements of this SparkXShards. This is only applicable for SparkXShards of Pandas Series. :return: a unique list of elements of this SparkXShards.

zip(other)[source]

Zips this SparkXShards with another one, returning key-value pairs with the first element in each SparkXShards, second element in each SparkXShards, etc. Assumes that the two SparkXShards have the same number of partitions and the *same number of elements in each partition*(e.g. one was made through a transform_shard on the other :param other: another SparkXShards :return:

class zoo.orca.data.shard.XShards[source]

Bases: object

A collection of data which can be pre-processed in parallel.

collect()[source]

Returns a list that contains all of the elements in this XShards :return: list of elements

classmethod load_pickle(path, minPartitions=None)[source]

Load XShards from pickle files. :param path: The pickle file path/directory :param minPartitions: The minimum partitions for the XShards :return: SparkXShards object

num_partitions()[source]

return the number of partitions in this XShards :return: an int

static partition(data)[source]

Partition local in memory data and form a SparkXShards :param data: np.ndarray, a tuple, list, dict of np.ndarray, or a nested structure made of tuple, list, dict with ndarray as the leaf value :return: a SparkXShards

transform_shard(func, *args)[source]

Transform each shard in the XShards using specified function. :param func: pre-processing function :param args: arguments for the pre-processing function :return: DataShard

zoo.orca.data.utils module

zoo.orca.data.utils.check_type_and_convert(data, allow_tuple=True, allow_list=True)[source]
Parameters:
  • allow_tuple – boolean, if the model accepts a tuple as input. Default: True
  • allow_list – boolean, if the model accepts a list as input. Default: True
Returns:

zoo.orca.data.utils.extract_one_path(file_path, env)[source]
zoo.orca.data.utils.flatten_xy(allow_tuple=True, allow_list=True)[source]
Parameters:
  • allow_tuple – boolean, if the model accepts a tuple as input. Default: True
  • allow_list – boolean, if the model accepts a list as input. Default: True
Returns:

zoo.orca.data.utils.get_class_name(obj)[source]
zoo.orca.data.utils.get_spec(allow_tuple=True, allow_list=True)[source]
Parameters:
  • allow_tuple – boolean, if the model accepts a tuple as input. Default: True
  • allow_list – boolean, if the model accepts a list as input. Default: True
Returns:

zoo.orca.data.utils.list_s3_file(file_path, env)[source]
zoo.orca.data.utils.ray_partition_get_data_label(partition_data, allow_tuple=True, allow_list=True)[source]
Parameters:
  • partition_data
  • allow_tuple – boolean, if the model accepts a tuple as input. Default: True
  • allow_list – boolean, if the model accepts a list as input. Default: True
Returns:

zoo.orca.data.utils.read_pd_file(path, file_type, **kwargs)[source]
zoo.orca.data.utils.read_pd_hdfs_file_list(iterator, file_type, **kwargs)[source]
zoo.orca.data.utils.read_pd_s3_file_list(iterator, file_type, **kwargs)[source]
zoo.orca.data.utils.to_sample(data)[source]

Module contents