Source code for zoo.util.spark

#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import platform

from pyspark import SparkContext
from zoo import init_nncontext, init_spark_conf
from zoo.util.utils import detect_python_location, pack_penv
from zoo.util.utils import get_executor_conda_zoo_classpath, get_zoo_bigdl_classpath_on_driver


[docs]class SparkRunner: standalone_env = None def __init__(self, spark_log_level="WARN", redirect_spark_log=True): self.spark_log_level = spark_log_level self.redirect_spark_log = redirect_spark_log with SparkContext._lock: if SparkContext._active_spark_context: raise Exception("There's existing SparkContext. Please close it first.") import pyspark print("Current pyspark location is : {}".format(pyspark.__file__))
[docs] def create_sc(self, submit_args, conf): submit_args = submit_args + " pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = submit_args spark_conf = init_spark_conf(conf) sc = init_nncontext(conf=spark_conf, spark_log_level=self.spark_log_level, redirect_spark_log=self.redirect_spark_log) return sc
[docs] def init_spark_on_local(self, cores, conf=None, python_location=None): print("Start to getOrCreate SparkContext") if "PYSPARK_PYTHON" not in os.environ: os.environ["PYSPARK_PYTHON"] = \ python_location if python_location else detect_python_location() master = "local[{}]".format(cores) zoo_conf = init_spark_conf(conf).setMaster(master) sc = init_nncontext(conf=zoo_conf, spark_log_level=self.spark_log_level, redirect_spark_log=self.redirect_spark_log) print("Successfully got a SparkContext") return sc
[docs] def init_spark_on_yarn(self, hadoop_conf, conda_name, num_executors, executor_cores, executor_memory="2g", driver_cores=4, driver_memory="1g", extra_executor_memory_for_ray=None, extra_python_lib=None, penv_archive=None, additional_archive=None, hadoop_user_name="root", spark_yarn_archive=None, conf=None, jars=None): print("Initializing SparkContext for yarn-client mode") executor_python_env = "python_env" os.environ["HADOOP_CONF_DIR"] = hadoop_conf os.environ["HADOOP_USER_NAME"] = hadoop_user_name os.environ["PYSPARK_PYTHON"] = "{}/bin/python".format(executor_python_env) pack_env = False assert penv_archive or conda_name, \ "You should either specify penv_archive or conda_name explicitly" try: if not penv_archive: penv_archive = pack_penv(conda_name, executor_python_env) pack_env = True archive = "{}#{}".format(penv_archive, executor_python_env) if additional_archive: archive = archive + "," + additional_archive submit_args = "--master yarn --deploy-mode client" submit_args = submit_args + " --archives {}".format(archive) submit_args = submit_args + gen_submit_args( driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_python_lib, jars) conf = enrich_conf_for_spark(conf, driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_executor_memory_for_ray) py_version = ".".join(platform.python_version().split(".")[0:2]) preload_so = executor_python_env + "/lib/libpython" + py_version + "m.so" ld_path = executor_python_env + "/lib:" + executor_python_env + "/lib/python" +\ py_version + "/lib-dynload" if "spark.executor.extraLibraryPath" in conf: ld_path = "{}:{}".format(ld_path, conf["spark.executor.extraLibraryPath"]) conf.update({"spark.scheduler.minRegisteredResourcesRatio": "1.0", "spark.executorEnv.PYTHONHOME": executor_python_env, "spark.executor.extraLibraryPath": ld_path, "spark.executorEnv.LD_PRELOAD": preload_so}) if spark_yarn_archive: conf["spark.yarn.archive"] = spark_yarn_archive zoo_bigdl_path_on_executor = ":".join( list(get_executor_conda_zoo_classpath(executor_python_env))) if "spark.executor.extraClassPath" in conf: conf["spark.executor.extraClassPath"] = "{}:{}".format( zoo_bigdl_path_on_executor, conf["spark.executor.extraClassPath"]) else: conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor sc = self.create_sc(submit_args, conf) finally: if conda_name and penv_archive and pack_env: os.remove(penv_archive) return sc
[docs] def init_spark_standalone(self, num_executors, executor_cores, executor_memory="2g", driver_cores=4, driver_memory="1g", master=None, extra_executor_memory_for_ray=None, extra_python_lib=None, conf=None, jars=None, python_location=None, enable_numa_binding=False): import subprocess import pyspark from zoo.util.utils import get_node_ip if "PYSPARK_PYTHON" not in os.environ: os.environ["PYSPARK_PYTHON"] = \ python_location if python_location else detect_python_location() if not master: pyspark_home = os.path.abspath(pyspark.__file__ + "/../") zoo_standalone_home = os.path.abspath(__file__ + "/../../share/bin/standalone") node_ip = get_node_ip() SparkRunner.standalone_env = { "SPARK_HOME": pyspark_home, "ZOO_STANDALONE_HOME": zoo_standalone_home, # If not set this, by default master is hostname but not ip, "SPARK_MASTER_HOST": node_ip} if 'JAVA_HOME' in os.environ: SparkRunner.standalone_env["JAVA_HOME"] = os.environ["JAVA_HOME"] # The scripts installed from pip don't have execution permission # and need to first give them permission. pro = subprocess.Popen(["chmod", "-R", "+x", "{}/sbin".format(zoo_standalone_home)]) os.waitpid(pro.pid, 0) # Start master start_master_pro = subprocess.Popen( "{}/sbin/start-master.sh".format(zoo_standalone_home), shell=True, env=SparkRunner.standalone_env) _, status = os.waitpid(start_master_pro.pid, 0) if status != 0: raise RuntimeError("starting master failed") master = "spark://{}:7077".format(node_ip) # 7077 is the default port # Start worker if enable_numa_binding: worker_script = "start-worker-with-numactl.sh" SparkRunner.standalone_env["SPARK_WORKER_INSTANCES"] = str(num_executors) else: worker_script = "start-worker.sh" start_worker_pro = subprocess.Popen( "{}/sbin/{} {}".format(zoo_standalone_home, worker_script, master), shell=True, env=SparkRunner.standalone_env) _, status = os.waitpid(start_worker_pro.pid, 0) if status != 0: raise RuntimeError("starting worker failed") else: # A Spark standalone cluster has already been started by the user. assert master.startswith("spark://"), \ "Please input a valid master address for your Spark standalone cluster: " \ "spark://master:port" # Start pyspark-shell submit_args = "--master " + master submit_args = submit_args + gen_submit_args( driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_python_lib, jars) conf = enrich_conf_for_spark(conf, driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_executor_memory_for_ray) conf.update({ "spark.cores.max": num_executors * executor_cores, "spark.executorEnv.PYTHONHOME": "/".join(detect_python_location().split("/")[:-2]) }) zoo_bigdl_jar_path = ":".join(list(get_zoo_bigdl_classpath_on_driver())) if "spark.executor.extraClassPath" in conf: conf["spark.executor.extraClassPath"] = "{}:{}".format( zoo_bigdl_jar_path, conf["spark.executor.extraClassPath"]) else: conf["spark.executor.extraClassPath"] = zoo_bigdl_jar_path sc = self.create_sc(submit_args, conf) return sc
[docs] @staticmethod def stop_spark_standalone(): import subprocess env = SparkRunner.standalone_env if env is not None: stop_worker_pro = subprocess.Popen( "{}/sbin/stop-worker.sh".format(env["ZOO_STANDALONE_HOME"]), shell=True, env=env) os.waitpid(stop_worker_pro.pid, 0) stop_master_pro = subprocess.Popen( "{}/sbin/stop-master.sh".format(env["ZOO_STANDALONE_HOME"]), shell=True, env=env) os.waitpid(stop_master_pro.pid, 0) else: # if env is None, then the standalone cluster is not started by analytics zoo pass
[docs] def init_spark_on_k8s(self, master, container_image, num_executors, executor_cores, executor_memory="2g", driver_memory="1g", driver_cores=4, extra_executor_memory_for_ray=None, extra_python_lib=None, conf=None, jars=None, python_location=None): print("Initializing SparkContext for k8s-client mode") python_env = "/".join(detect_python_location().split("/")[:-2]) if "PYSPARK_PYTHON" not in os.environ: os.environ["PYSPARK_PYTHON"] = \ python_location if python_location else detect_python_location() submit_args = "--master " + master + " --deploy-mode client" submit_args = submit_args + gen_submit_args( driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_python_lib, jars) conf = enrich_conf_for_spark(conf, driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_executor_memory_for_ray) py_version = ".".join(platform.python_version().split(".")[0:2]) preload_so = python_env + "/lib/libpython" + py_version + "m.so" ld_path = python_env + "/lib:" + python_env + "/lib/python" +\ py_version + "/lib-dynload" if "spark.executor.extraLibraryPath" in conf: ld_path = "{}:{}".format(ld_path, conf["spark.executor.extraLibraryPath"]) conf.update({"spark.cores.max": num_executors * executor_cores, "spark.executorEnv.PYTHONHOME": python_env, "spark.executor.extraLibraryPath": ld_path, "spark.executorEnv.LD_PRELOAD": preload_so, "spark.kubernetes.container.image": container_image}) # Not targeted to use pip install. BIGDL_CLASSPATH is supposed to set. if "BIGDL_CLASSPATH" in os.environ: zoo_bigdl_jar_path = os.environ["BIGDL_CLASSPATH"] else: zoo_bigdl_jar_path = ":".join(list(get_zoo_bigdl_classpath_on_driver())) if "spark.executor.extraClassPath" in conf: conf["spark.executor.extraClassPath"] = "{}:{}".format( zoo_bigdl_jar_path, conf["spark.executor.extraClassPath"]) else: conf["spark.executor.extraClassPath"] = zoo_bigdl_jar_path sc = self.create_sc(submit_args, conf) return sc
[docs]def gen_submit_args(driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_python_lib=None, jars=None): submit_args = " --driver-cores {} --driver-memory {} --num-executors {}" \ " --executor-cores {} --executor-memory {}" \ .format(driver_cores, driver_memory, num_executors, executor_cores, executor_memory) if extra_python_lib: submit_args = submit_args + " --py-files {}".format(extra_python_lib) if jars: submit_args = submit_args + " --jars {}".format(jars) return submit_args
[docs]def enrich_conf_for_spark(conf, driver_cores, driver_memory, num_executors, executor_cores, executor_memory, extra_executor_memory_for_ray=None): if not conf: conf = {} conf.update({"spark.driver.cores": driver_cores, "spark.driver.memory": driver_memory, "spark.executor.instances": num_executors, "spark.executor.cores": executor_cores, "spark.executor.memory": executor_memory}) if extra_executor_memory_for_ray: conf["spark.executor.memoryOverhead"] = extra_executor_memory_for_ray return conf