Source code for zoo.ray.raycontext

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import os
import re
import random
import signal
import warnings
import multiprocessing

from zoo.ray.process import session_execute, ProcessMonitor
from zoo.ray.utils import is_local
from zoo.ray.utils import resource_to_bytes


[docs]class JVMGuard: """ The registered pids would be put into the killing list of Spark Executor. """
[docs] @staticmethod def register_pids(pids): import traceback try: from zoo.common.utils import callZooFunc import zoo callZooFunc("float", "jvmGuardRegisterPids", pids) except Exception as err: print(traceback.format_exc()) print("Cannot successfully register pid into JVMGuard") for pid in pids: os.kill(pid, signal.SIGKILL) raise err
[docs]def kill_redundant_log_monitors(redis_address): """ Killing redundant log_monitor.py processes. If multiple ray nodes are started on the same machine, there will be multiple ray log_monitor.py processes monitoring the same log dir. As a result, the logs will be replicated multiple times and forwarded to driver. See issue https://github.com/ray-project/ray/issues/10392 """ import psutil import subprocess log_monitor_processes = [] for proc in psutil.process_iter(["name", "cmdline"]): try: # Avoid throw exception when listing lwsslauncher in macOS if proc.name() is None or proc.name() == "lwsslauncher": continue cmdline = subprocess.list2cmdline(proc.cmdline()) is_log_monitor = "log_monitor.py" in cmdline is_same_redis = "--redis-address={}".format(redis_address) if is_log_monitor and is_same_redis in cmdline: log_monitor_processes.append(proc) except psutil.AccessDenied: # psutil may encounter AccessDenied exceptions # when it's trying to visit core services if psutil.MACOS: continue else: raise if len(log_monitor_processes) > 1: for proc in log_monitor_processes[1:]: proc.kill()
[docs]class RayServiceFuncGenerator(object): """ This should be a pickable class. """ def _prepare_env(self): modified_env = os.environ.copy() if self.python_loc == "python_env/bin/python": # In this case the executor is using the conda yarn archive under the current # working directory. Need to get the full path. executor_python_path = "{}/{}".format( os.getcwd(), "/".join(self.python_loc.split("/")[:-1])) else: executor_python_path = "/".join(self.python_loc.split("/")[:-1]) if "PATH" in os.environ: modified_env["PATH"] = "{}:{}".format(executor_python_path, os.environ["PATH"]) else: modified_env["PATH"] = executor_python_path modified_env["LC_ALL"] = "C.UTF-8" modified_env["LANG"] = "C.UTF-8" modified_env.pop("MALLOC_ARENA_MAX", None) modified_env.pop("RAY_BACKEND_LOG_LEVEL", None) # Unset all MKL setting as Analytics Zoo would give default values when init env. # Running different programs may need different configurations. modified_env.pop("intra_op_parallelism_threads", None) modified_env.pop("inter_op_parallelism_threads", None) modified_env.pop("OMP_NUM_THREADS", None) modified_env.pop("KMP_BLOCKTIME", None) modified_env.pop("KMP_AFFINITY", None) modified_env.pop("KMP_SETTINGS", None) if self.env: # Add in env argument if any MKL setting is needed. modified_env.update(self.env) if self.verbose: print("Executing with these environment settings:") for pair in modified_env.items(): print(pair) print("The $PATH is: {}".format(modified_env["PATH"])) return modified_env def __init__(self, python_loc, redis_port, ray_node_cpu_cores, password, object_store_memory, verbose=False, env=None, extra_params=None): """object_store_memory: integer in bytes""" self.env = env self.python_loc = python_loc self.redis_port = redis_port self.password = password self.ray_node_cpu_cores = ray_node_cpu_cores self.ray_exec = self._get_ray_exec() self.object_store_memory = object_store_memory self.extra_params = extra_params self.verbose = verbose # _mxnet_worker and _mxnet_server are resource tags for distributed MXNet training only # in order to diff worker from server. # This is useful to allocate workers and servers in the cluster. # Leave some reserved custom resources free to avoid unknown crash due to resources. self.labels = \ """--resources '{"_mxnet_worker": %s, "_mxnet_server": %s, "_reserved": %s}'""" \ % (1, 1, 2)
[docs] def gen_stop(self): def _stop(iter): command = "{} stop".format(self.ray_exec) print("Start to end the ray services: {}".format(command)) session_execute(command=command, fail_fast=True) return iter return _stop
@staticmethod def _enrich_command(command, object_store_memory, extra_params): if object_store_memory: command = command + " --object-store-memory {}".format(str(object_store_memory)) if extra_params: for pair in extra_params.items(): command = command + " --{} {}".format(pair[0], pair[1]) return command def _gen_master_command(self): command = "{} start --head " \ "--include-webui true --redis-port {} " \ "--redis-password {} --num-cpus {}". \ format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores) if self.labels: command = command + " " + self.labels return RayServiceFuncGenerator._enrich_command(command=command, object_store_memory=self.object_store_memory, extra_params=self.extra_params) @staticmethod def _get_raylet_command(redis_address, ray_exec, password, ray_node_cpu_cores, labels="", object_store_memory=None, extra_params=None): command = "{} start --address {} --redis-password {} --num-cpus {}".format( ray_exec, redis_address, password, ray_node_cpu_cores) if labels: command = command + " " + labels return RayServiceFuncGenerator._enrich_command(command=command, object_store_memory=object_store_memory, extra_params=extra_params) def _start_ray_node(self, command, tag): modified_env = self._prepare_env() print("Starting {} by running: {}".format(tag, command)) process_info = session_execute(command=command, env=modified_env, tag=tag) JVMGuard.register_pids(process_info.pids) import ray.services as rservices process_info.node_ip = rservices.get_node_ip_address() return process_info def _get_ray_exec(self): if "envs" in self.python_loc: # conda environment python_bin_dir = "/".join(self.python_loc.split("/")[:-1]) return "{}/python {}/ray".format(python_bin_dir, python_bin_dir) else: # system environment with ray installed; for example: /usr/local/bin/ray return "ray"
[docs] def gen_ray_start(self, master_ip): def _start_ray_services(iter): from pyspark import BarrierTaskContext from zoo.util.utils import get_node_ip tc = BarrierTaskContext.get() current_ip = get_node_ip() print("current address {}".format(current_ip)) print("master address {}".format(master_ip)) redis_address = "{}:{}".format(master_ip, self.redis_port) process_info = None import tempfile import filelock base_path = tempfile.gettempdir() master_flag_path = os.path.join(base_path, "ray_master_initialized") if current_ip == master_ip: # Start the ray master. lock_path = os.path.join(base_path, "ray_master_start.lock") # It is possible that multiple executors are on one node. In this case, # the first executor that gets the lock would be the master and it would # create a flag to indicate the master has initialized. # The flag file is removed when ray start processes finish so that this # won't affect other programs. with filelock.FileLock(lock_path): if not os.path.exists(master_flag_path): print("partition id is : {}".format(tc.partitionId())) process_info = self._start_ray_node(command=self._gen_master_command(), tag="ray-master") process_info.master_addr = redis_address os.mknod(master_flag_path) tc.barrier() if not process_info: # Start raylets. lock_path = os.path.join(base_path, "raylet_start.lock") with filelock.FileLock(lock_path): print("partition id is : {}".format(tc.partitionId())) process_info = self._start_ray_node( command=RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec=self.ray_exec, password=self.password, ray_node_cpu_cores=self.ray_node_cpu_cores, labels=self.labels, object_store_memory=self.object_store_memory, extra_params=self.extra_params), tag="raylet") kill_redundant_log_monitors(redis_address=redis_address) if os.path.exists(master_flag_path): os.remove(master_flag_path) yield process_info return _start_ray_services
[docs]class RayContext(object): _active_ray_context = None def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None, verbose=False, env=None, extra_params=None, num_ray_nodes=None, ray_node_cpu_cores=None): """ The RayContext would initiate a ray cluster on top of the configuration of SparkContext. After creating RayContext, call the init method to set up the cluster. - For Spark local mode: The total available cores for Ray is equal to the number of Spark local cores. - For Spark cluster mode: The number of raylets to be created is equal to the number of Spark executors. The number of cores allocated for each raylet is equal to the number of cores for each Spark executor. You are allowed to specify num_ray_nodes and ray_node_cpu_cores for configurations to start raylets. :param sc: An instance of SparkContext. :param redis_port: redis port for the "head" node. The value would be randomly picked if not specified. :param password: Password for the redis. Default to be "123456" if not specified. :param object_store_memory: The memory size for ray object_store in string. This can be specified in bytes(b), kilobytes(k), megabytes(m) or gigabytes(g). For example, 50b, 100k, 250m, 30g. :param verbose: True for more logs when starting ray. Default is False. :param env: The environment variable dict for running ray processes. Default is None. :param extra_params: The key value dict for extra options to launch ray. For example, extra_params={"temp-dir": "/tmp/ray/"} :param num_ray_nodes: The number of raylets to start across the cluster. For Spark local mode, you don't need to specify this value. For Spark cluster mode, it is default to be the number of Spark executors. If spark.executor.instances can't be detected in your SparkContext, you need to explicitly specify this. It is recommended that num_ray_nodes is not larger than the number of Spark executors to make sure there are enough resources in your cluster. :param ray_node_cpu_cores: The number of available cores for each raylet. For Spark local mode, it is default to be the number of Spark local cores. For Spark cluster mode, it is default to be the number of cores for each Spark executor. If spark.executor.cores or spark.cores.max can't be detected in your SparkContext, you need to explicitly specify this. It is recommended that ray_node_cpu_cores is not larger than the number of cores for each Spark executor to make sure there are enough resources in your cluster. """ assert sc is not None, "sc cannot be None, please create a SparkContext first" self.sc = sc self.initialized = False self.is_local = is_local(sc) self.verbose = verbose self.redis_password = password self.object_store_memory = resource_to_bytes(object_store_memory) self.ray_processesMonitor = None self.env = env self.extra_params = extra_params self._address_info = None if self.is_local: self.num_ray_nodes = 1 spark_cores = self._get_spark_local_cores() if ray_node_cpu_cores: ray_node_cpu_cores = int(ray_node_cpu_cores) if ray_node_cpu_cores > spark_cores: warnings.warn("ray_node_cpu_cores is larger than available Spark cores, " "make sure there are enough resources on your machine") self.ray_node_cpu_cores = ray_node_cpu_cores else: self.ray_node_cpu_cores = spark_cores # For Spark local mode, directly call ray.init() and ray.shutdown(). # ray.shutdown() would clear up all the ray related processes. # Ray Manager is only needed for Spark cluster mode to monitor ray processes. else: if self.sc.getConf().contains("spark.executor.cores"): executor_cores = int(self.sc.getConf().get("spark.executor.cores")) else: executor_cores = None if ray_node_cpu_cores: ray_node_cpu_cores = int(ray_node_cpu_cores) if executor_cores and ray_node_cpu_cores > executor_cores: warnings.warn("ray_node_cpu_cores is larger than Spark executor cores, " "make sure there are enough resources on your cluster") self.ray_node_cpu_cores = ray_node_cpu_cores elif executor_cores: self.ray_node_cpu_cores = executor_cores else: raise Exception("spark.executor.cores not detected in the SparkContext, " "you need to manually specify num_ray_nodes and ray_node_cpu_cores " "for RayContext to start ray services") if self.sc.getConf().contains("spark.executor.instances"): num_executors = int(self.sc.getConf().get("spark.executor.instances")) elif self.sc.getConf().contains("spark.cores.max"): import math num_executors = math.floor( int(self.sc.getConf().get("spark.cores.max")) / self.ray_node_cpu_cores) else: num_executors = None if num_ray_nodes: num_ray_nodes = int(num_ray_nodes) if num_executors and num_ray_nodes > num_executors: warnings.warn("num_ray_nodes is larger than the number of Spark executors, " "make sure there are enough resources on your cluster") self.num_ray_nodes = num_ray_nodes elif num_executors: self.num_ray_nodes = num_executors else: raise Exception("spark.executor.cores not detected in the SparkContext, " "you need to manually specify num_ray_nodes and ray_node_cpu_cores " "for RayContext to start ray services") from zoo.util.utils import detect_python_location self.python_loc = os.environ.get("PYSPARK_PYTHON", detect_python_location()) self.redis_port = random.randint(10000, 65535) if not redis_port else int(redis_port) self.ray_service = RayServiceFuncGenerator( python_loc=self.python_loc, redis_port=self.redis_port, ray_node_cpu_cores=self.ray_node_cpu_cores, password=self.redis_password, object_store_memory=self.object_store_memory, verbose=self.verbose, env=self.env, extra_params=self.extra_params) RayContext._active_ray_context = self
[docs] @classmethod def get(cls, initialize=True): if RayContext._active_ray_context: ray_ctx = RayContext._active_ray_context if initialize and not ray_ctx.initialized: ray_ctx.init() return ray_ctx else: raise Exception("No active RayContext. Please create a RayContext and init it first")
def _gather_cluster_ips(self): """ Get the ips of all Spark executors in the cluster. The first ip returned would be the ray master. """ def info_fn(iter): from zoo.util.utils import get_node_ip yield get_node_ip() ips = self.sc.range(0, self.num_ray_nodes, numSlices=self.num_ray_nodes).barrier().mapPartitions(info_fn).collect() return ips
[docs] def stop(self): if not self.initialized: print("The Ray cluster has not been launched.") return import ray ray.shutdown() if not self.is_local: if not self.ray_processesMonitor: print("Please start the runner first before closing it") else: self.ray_processesMonitor.clean_fn() self.initialized = False
[docs] def purge(self): """ Invoke ray stop to clean ray processes. """ if not self.initialized: print("The Ray cluster has not been launched.") return if self.is_local: import ray ray.shutdown() else: self.sc.range(0, self.num_ray_nodes, numSlices=self.num_ray_nodes).barrier().mapPartitions( self.ray_service.gen_stop()).collect() self.initialized = False
def _get_spark_local_cores(self): local_symbol = re.match(r"local\[(.*)\]", self.sc.master).group(1) if local_symbol == "*": return multiprocessing.cpu_count() else: return int(local_symbol)
[docs] def init(self, driver_cores=0): """ Initiate the ray cluster. :param driver_cores: The number of cores for the raylet on driver for Spark cluster mode. Default is 0 and in this case the local driver wouldn't have any ray workload. :return The dictionary of address information about the ray cluster. Information contains node_ip_address, redis_address, object_store_address, raylet_socket_name, webui_url and session_dir. """ if self.initialized: print("The Ray cluster has been launched.") else: if self.is_local: if self.env: os.environ.update(self.env) import ray self._address_info = ray.init(num_cpus=self.ray_node_cpu_cores, object_store_memory=self.object_store_memory, resources=self.extra_params) else: self.cluster_ips = self._gather_cluster_ips() from bigdl.util.common import init_executor_gateway init_executor_gateway(self.sc) print("JavaGatewayServer has been successfully launched on executors") self._start_cluster() self._address_info = self._start_driver(num_cores=driver_cores) print(self._address_info) kill_redundant_log_monitors(self._address_info["redis_address"]) self.initialized = True return self._address_info
@property def address_info(self): if self._address_info: return self._address_info else: raise Exception("The Ray cluster has not been launched yet. Please call init first") def _start_cluster(self): print("Start to launch ray on cluster") ray_rdd = self.sc.range(0, self.num_ray_nodes, numSlices=self.num_ray_nodes) # The first ip would be used to launch ray master. process_infos = ray_rdd.barrier().mapPartitions( self.ray_service.gen_ray_start(self.cluster_ips[0])).collect() self.ray_processesMonitor = ProcessMonitor(process_infos, self.sc, ray_rdd, self, verbose=self.verbose) self.redis_address = self.ray_processesMonitor.master.master_addr return self def _start_restricted_worker(self, num_cores, node_ip_address): extra_param = {"node-ip-address": node_ip_address} if self.extra_params is not None: extra_param.update(self.extra_params) command = RayServiceFuncGenerator._get_raylet_command( redis_address=self.redis_address, ray_exec="ray", password=self.redis_password, ray_node_cpu_cores=num_cores, object_store_memory=self.object_store_memory, extra_params=extra_param) modified_env = self.ray_service._prepare_env() print("Executing command: {}".format(command)) process_info = session_execute(command=command, env=modified_env, tag="raylet", fail_fast=True) ProcessMonitor.register_shutdown_hook(pgid=process_info.pgid) def _start_driver(self, num_cores=0): print("Start to launch ray driver on local") import ray.services node_ip = ray.services.get_node_ip_address(self.redis_address) self._start_restricted_worker(num_cores=num_cores, node_ip_address=node_ip) ray.shutdown() return ray.init(address=self.redis_address, redis_password=self.ray_service.password, node_ip_address=node_ip)