Source code for zoo.ray.utils

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import re
import os
import signal


[docs]def to_list(input): if isinstance(input, (list, tuple)): return list(input) else: return [input]
[docs]def resource_to_bytes(resource_str): if not resource_str: return resource_str matched = re.compile("([0-9]+)([a-z]+)?").match(resource_str.lower()) fraction_matched = re.compile("([0-9]+\\.[0-9]+)([a-z]+)?").match(resource_str.lower()) if fraction_matched: raise Exception( "Fractional values are not supported. Input was: {}".format(resource_str)) try: value = int(matched.group(1)) postfix = matched.group(2) if postfix == 'b': value = value elif postfix == 'k': value = value * 1000 elif postfix == "m": value = value * 1000 * 1000 elif postfix == 'g': value = value * 1000 * 1000 * 1000 else: raise Exception("Not supported type: {}".format(resource_str)) return value except Exception: raise Exception("Size must be specified as bytes(b)," "kilobytes(k), megabytes(m), gigabytes(g). " "E.g. 50b, 100k, 250m, 30g")
[docs]def gen_shutdown_per_node(pgids, node_ips=None): import ray.services as rservices pgids = to_list(pgids) def _shutdown_per_node(iter): print("Stopping pgids: {}".format(pgids)) if node_ips: current_node_ip = rservices.get_node_ip_address() effect_pgids = [pair[0] for pair in zip(pgids, node_ips) if pair[1] == current_node_ip] else: effect_pgids = pgids for pgid in effect_pgids: print("Stopping by pgid {}".format(pgid)) try: os.killpg(pgid, signal.SIGTERM) except Exception: print("WARNING: cannot kill pgid: {}".format(pgid)) return _shutdown_per_node
[docs]def is_local(sc): master = sc.getConf().get("spark.master") return master == "local" or master.startswith("local[")