Source code for zoo.models.recommendation.utils

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np

from bigdl.util.common import JTensor, Sample

from zoo.common.utils import callZooFunc
from zoo.models.recommendation import UserItemFeature


[docs]def hash_bucket(content, bucket_size=1000, start=0): return (hash(str(content)) % bucket_size + bucket_size) % bucket_size + start
[docs]def categorical_from_vocab_list(sth, vocab_list, default=-1, start=0): if sth in vocab_list: return vocab_list.index(sth) + start else: return default + start
[docs]def get_boundaries(target, boundaries, default=-1, start=0): if target == '?': return default + start else: for i in range(len(boundaries)): if target < boundaries[i]: return i + start return len(boundaries) + start
[docs]def get_negative_samples(indexed): return callZooFunc("float", "getNegativeSamples", indexed)
[docs]def get_wide_tensor(row, column_info): """ prepare tensor for wide part of WideAndDeep model based on SparseDense :param row: Row of userId, itemId, features and label :param column_info: ColumnFeatureInfo specify information of different features :return: an array of tensors as input for wide part of a WideAndDeep model """ wide_columns = column_info.wide_base_cols + column_info.wide_cross_cols wide_dims = column_info.wide_base_dims + column_info.wide_cross_dims wide_length = len(wide_columns) acc = 0 indices = [] for i in range(0, wide_length): index = row[wide_columns[i]] if i == 0: res = index else: acc += wide_dims[i - 1] res = acc + index indices.append(res) values = np.ones(len(indices)) shape = np.array([sum(wide_dims)]) return JTensor.sparse(values, np.array(indices), shape)
[docs]def get_deep_tensors(row, column_info): """ convert a row to tensors given column feature information of a WideAndDeep model :param row: Row of userId, itemId, features and label :param column_info: ColumnFeatureInfo specify information of different features :return: an array of tensors as input for deep part of a WideAndDeep model """ ind_col = column_info.indicator_cols emb_col = column_info.embed_cols cont_col = column_info.continuous_cols ind_tensor = np.zeros(sum(column_info.indicator_dims), ) # setup indicators acc = 0 for i in range(0, len(ind_col)): index = row[ind_col[i]] if i == 0: res = index else: acc += column_info.indicator_dims[i - 1] res = acc + index ind_tensor[res] = 1 emb_tensor = np.zeros(len(emb_col), ) for i in range(0, len(emb_col)): emb_tensor[i] = float(row[emb_col[i]]) cont_tensor = np.zeros(len(cont_col), ) for i in range(0, len(cont_col)): cont_tensor[i] = float(row[cont_col[i]]) has_ind = len(ind_col) > 0 has_emd = len(emb_col) > 0 has_cont = len(cont_col) > 0 if (has_ind and has_emd and has_cont): deep_tensor = [ind_tensor, emb_tensor, cont_tensor] elif ((not has_ind) and has_emd and has_cont): deep_tensor = [emb_tensor, cont_tensor] elif (has_ind and (not has_emd) and has_cont): deep_tensor = [ind_tensor, cont_tensor] elif (has_ind and has_emd and (not has_cont)): deep_tensor = [ind_tensor, emb_tensor] elif ((not has_ind) and (not has_emd) and has_cont): deep_tensor = [cont_tensor] elif ((not has_ind) and has_emd and (not has_cont)): deep_tensor = [emb_tensor] elif (has_ind and (not has_emd) and (not has_cont)): deep_tensor = [ind_tensor] else: raise TypeError("Empty deep tensors") return deep_tensor
[docs]def row_to_sample(row, column_info, model_type="wide_n_deep"): """ convert a row to sample given column feature information of a WideAndDeep model :param row: Row of userId, itemId, features and label :param column_info: ColumnFeatureInfo specify information of different features :return: TensorSample as input for WideAndDeep model """ wide_tensor = get_wide_tensor(row, column_info) deep_tensor = get_deep_tensors(row, column_info) deep_tensors = [JTensor.from_ndarray(ele) for ele in deep_tensor] label = row[column_info.label] model_type = model_type.lower() if model_type == "wide_n_deep": feature = [wide_tensor] + deep_tensors elif model_type == "wide": feature = wide_tensor elif model_type == "deep": feature = deep_tensors else: raise TypeError("Unsupported model_type: %s" % model_type) return Sample.from_jtensor(feature, label)
[docs]def to_user_item_feature(row, column_info, model_type="wide_n_deep"): """ convert a row to UserItemFeature given column feature information of a WideAndDeep model :param row: Row of userId, itemId, features and label :param column_info: ColumnFeatureInfo specify information of different features :return: UserItemFeature for recommender model """ return UserItemFeature(row["userId"], row["itemId"], row_to_sample(row, column_info, model_type))