# 事先准备好的Spark任务(源码example LocalPi)基于local模式
bash spark-submit \
--class com.lp.test.app.LocalPi \
--master local \
/Users/lipan/Desktop/spark-local/original-spark-local-train-1.0.jar \
2. 提交流程
我们在提交Spark任务时都是从spark-submit(或者spark-shell)来提交一个作业的,从spark-submit脚本一步步深入进去看看任务的整体提交流程。首先看一下整体的流程概要图:
根据上图中的整体流程,接下来我们对里面的每一个流程的源码进行一一剖析跟踪。
2.1 spark-submit脚本
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
echo "${SPARK_HOME}"
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 这里可以看到将接收到的参数提交到了spark-class脚本执行
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
2.2 spark-class脚本
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
# 配置一些环境变量,它会将conf/spark-env.sh中的环境变量加载进来:
. "${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary 如果有java_home环境变量会将java_home/bin/java给RUNNER
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
if [ "$(command -v java)" ]; then
RUNNER="java"
echo "JAVA_HOME is not set" >&2
exit 1
# Find Spark jars.
# 这一段,主要是寻找java命令 寻找spark的jar包
# 这里如果我们的jar包数量多,而且内容大,可以事先放到每个机器的对应目录下,这里是一个优化点
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
# 启动程序库将打印由NULL字符分隔的参数,以允许与shell进行其他解释的字符进行参数。在while循环中读取它,填充将用于执行最终命令的数组。
# command array and checks the value to see if the launcher succeeded.
# 启动程序的退出代码被追加到输出,因此父shell从命令数组中删除它,并检查其值,看看启动器是否成功。
# 这里spark启动了以SparkSubmit为主类的JVM进程。
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
# Turn off posix mode since it does not allow process substitution
# 关闭posix模式,因为它不允许进程替换。
# 调用build_command org.apache.spark.launcher.Main拼接提交命令
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
# 某些JVM失败会导致错误被打印到stdout(而不是stderr),这会导致解析启动程序输出的代码变得混乱。
# 在这些情况下,检查退出代码是否为整数,如果不是,将其作为特殊的错误处理。
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
CMD=("${CMD[@]:0:$LAST}")
# ${CMD[@]} 参数如下
# /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/bin/java -cp /Users/lipan/workspace/source_code/spark-2.3.3/conf/:/Users/lipan/workspace/source_code/spark-2.3.3/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --master local --class com.lp.test.app.LocalPi /Users/lipan/Desktop/spark-local/original-spark-local-train-1.0.jar 10
exec "${CMD[@]}"
相对于spark-submit,spark-class文件的执行逻辑稍显复杂,总体如下:
检查SPARK_HOME执行环境
执行load-spark-env.sh文件,加载一些默认的环境变量(包括加载spark-env.sh文件)
检查JAVA_HOME执行环境
寻找Spark相关的jar包
执行org.apache.spark.launcher.Main解析参数,构建CMD命令
CMD命令判断
执行org.apache.spark.deploy.SparkSubmit这个类。
2.3 org.apache.spark.launcher.Main
java -Xmx128m -cp ...jars org.apache.spark.launcher.Main "$@"
也就是说org.apache.spark.launcher.Main是被spark-class调用,从spark-class接收参数。这个类是提供spark内部脚本调用的工具类,并不是真正的执行入口。它负责调用其他类,对参数进行解析,并生成执行命令,最后将命令返回给spark-class的 exec “${CMD[@]}”执行。
可以把"$@"执行相关参数带入IDEA中的org.apache.spark.launcher.Main方法中执行,操作参考如下:
package org.apache.spark.launcher;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
* Command line interface for the Spark launcher. Used internally by Spark scripts.
* 这是提供spark内部脚本使用工具类
class Main {
* Usage: Main [class] [class args]
* 分为spark-submit和spark-class两种模式
* 如果提交的是class类的话,会包含其他如:master/worker/history等等
* unix系统的输出的参数是集合,而windows参数是空格分隔
* spark-class提交过来的参数如下:
* org.apache.spark.deploy.SparkSubmit \
* --class com.lp.test.app.LocalPi \
* --master local \
* /Users/lipan/Desktop/spark-local/spark-local-train-1.0.jar
public static void main(String[] argsArray) throws Exception {
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
AbstractCommandBuilder builder;
* 构建执行程序对象:spark-submit/spark-class
* 把参数都取出并解析,放入执行程序对象中
* 意思是,submit还是master和worker等程序在这里拆分,并获取对应的执行参数
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
System.err.println();
MainClassOptionParser parser = new MainClassOptionParser();
try {
parser.parse(args);
} catch (Exception ignored) {
List<String> help = new ArrayList<>();
if (parser.className != null) {
help.add(parser.CLASS);
help.add(parser.className);
help.add(parser.USAGE_ERROR);
builder = new SparkSubmitCommandBuilder(help);
} else {
builder = new SparkClassCommandBuilder(className, args);
* 这里才真正构建了执行命令
* 调用了SparkClassCommandBuilder的buildCommand方法
* 把执行参数解析成了k/v格式
Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
if (isWindows()) {
System.out.println(prepareWindowsCommand(cmd, env));
} else {
* 输出参数:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/bin/java
* -cp /Users/lipan/workspace/source_code/spark-2.3.3/conf/:/Users/lipan/workspace/source_code/spark-2.3.3/assembly/target/scala-2.11/jars/*
* -Xmx1g org.apache.spark.deploy.SparkSubmit
* --master local
* --class com.lp.test.app.LocalPi
* /Users/lipan/Desktop/spark-local/original-spark-local-train-1.0.jar 10
* java -cp / org.apache.spark.deploy.SparkSubmit启动该类
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
* windows环境下
private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
StringBuilder cmdline = new StringBuilder();
for (Map.Entry<String, String> e : childEnv.entrySet()) {
cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
cmdline.append(" && ");
for (String arg : cmd) {
cmdline.append(quoteForBatchScript(arg));
cmdline.append(" ");
return cmdline.toString();
* bash环境,如:Linux
private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
if (childEnv.isEmpty()) {
return cmd;
List<String> newCmd = new ArrayList<>();
newCmd.add("env");
for (Map.Entry<String, String> e : childEnv.entrySet()) {
newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
newCmd.addAll(cmd);
return newCmd;
* 当spark-submit提交失败时,这里会再进行一次解析,再不行才会提示用法
private static class MainClassOptionParser extends SparkSubmitOptionParser {
String className;
@Override
protected boolean handle(String opt, String value) {
if (CLASS.equals(opt)) {
className = value;
return false;
@Override
protected boolean handleUnknown(String opt) {
return false;
@Override
protected void handleExtraArgs(List<String> extra) {
Main中主要涉及到的一些类SparkSubmitCommandBuilder、SparkClassCommandBuilder 和 buildCommand都是在对参数和构建命令进行处理,这里不一一展开详解。
2.4 org.apache.spark.deploy.SparkSubmit
org.apache.spark.launcher.Main中会解析过滤参数,构建执行命令,返回给spark-class脚本,最后通过 exec "${CMD[@]}" 真正调用SparkSubmit类。
可通过解析后提交的参数"$@"设置在IDEA中逐步跟踪源码,操作参考如下:
2.4.1 SparkSubmitAction
在org.apache.spark.launcher.Main类的最前面定义了一个类SparkSubmitAction枚举状态类。
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone and Mesos cluster modes.
* 这个类主要是提交app,终止和请求状态,但目前终止和请求只能在standalone和mesos模式下
private[deploy] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS, PRINT_VERSION = Value
2.4.2 SparkSubmit
在SparkSubmit类中的方法执行可参考如下,在每个方法中都有详细的注释。具体细节也可以根据文末的链接地址中载源码断进行断点调试。
2.4.2.1 Main
override def main(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
* 构建spark提交需要的参数并进行赋值 SparkSubmitArguments
* 1.解析参数
* 2.从属性文件填充“sparkProperties”映射(未指定默认情况下未:spark-defaults.conf)
* 3.移除不是以"spark." 开头的变量
* 4.参数填充对应到实体属性上
* 5.action参数验证
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
2.4.2.2 submit
* 通过匹配SUBMIT执行的submit()
* 首先是根据不同调度模式和yarn不同模式,导入调用类的路径,默认配置及输入参数,准备相应的启动环境
* 然后通过对应的环境来调用相应子类的main方法
* 这里因为涉及到重复调用,所以采用了@tailrec尾递归,即重复调用方法的最后一句并返回结果
* 即:runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
* 先准备运行环境,传入解析的各种参数
* 这里会先进入
* lazy val secMgr = new SecurityManager(sparkConf)
* 先初始化SecurityManager后,再进入prepareSubmitEnvironment()
* prepareSubmitEnvironment()代码比较长,放到最下面去解析
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
} catch {
case e: Exception =>
if (e.getStackTrace().length == 0) {
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
exitFn(1)
} else {
throw e
} else {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
if (uninitLog) {
Logging.uninitialize()
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
} else {
doRunMain()
2.4.2.3 prepareSubmitEnvironment
* 准备各种模式的配置参数
* @param args 用于环境准备的已分析SparkSubmitArguments
* @param conf 在Hadoop配置中,仅在单元测试中设置此参数。
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* 返回一个4元组(childArgs, childClasspath, sparkConf, childMainClass)
* childArgs:子进程的参数
* childClasspath:子级的类路径条目列表
* sparkConf:系统参数map集合
* childMainClass:子级的主类
* Exposed for testing.
* 由于不同的部署方式其卖弄函数是不一样的,主要是由spark的提交参数决定
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
try {
doPrepareSubmitEnvironment(args, conf)
} catch {
case e: SparkException =>
printErrorAndExit(e.getMessage)
throw e
private def doPrepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
var childMainClass = ""
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
if (clusterManager == YARN) {
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
args.master = "yarn"
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn"
if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
if (!isMesosCluster && !isStandAloneCluster) {
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
args.ivySettingsPath)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
sparkConf.set(KEYTAB, args.keytab)
sparkConf.set(PRINCIPAL, args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
lazy val secMgr = new SecurityManager(sparkConf)
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
if (clusterManager == YARN) {
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
Try {
FileSystem.getFileSystemClass(scheme, hadoopConf)
}.isFailure
def downloadResource(resource: String): String = {
val uri = Utils.resolveURI(resource)
uri.getScheme match {
case "local" | "file" => resource
case e if shouldDownload(e) =>
val file = new File(targetDir, new Path(uri).getName)
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
case _ => uri.toString
args.primaryResource = Option(args.primaryResource).map {
downloadResource
}.orNull
args.files = Option(args.files).map { files =>
Utils.stringToSeq(files).map(downloadResource).mkString(",")
}.orNull
args.pyFiles = Option(args.pyFiles).map { pyFiles =>
Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
}.orNull
args.jars = Option(args.jars).map { jars =>
Utils.stringToSeq(jars).map(downloadResource).mkString(",")
}.orNull
args.archives = Option(args.archives).map { archives =>
Utils.stringToSeq(archives).map(downloadResource).mkString(",")
}.orNull
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
if (clusterManager != YARN) {
args.files = mergeFileLists(args.files, args.primaryResource)
if (clusterManager != YARN) {
args.files = mergeFileLists(args.files, args.pyFiles)
if (localPyFiles != null) {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
if (args.isR && clusterManager == YARN) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit("Failed to zip all the built R packages.")
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with mesos cluster is not supported.")
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
args.mainClass = "org.apache.spark.api.r.RBackend"
} else {
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
if (isYarnCluster && args.isR) {
args.files = mergeFileLists(args.files, args.primaryResource)
sys.props("SPARK_SUBMIT") = "true"
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraLibraryPath"),
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
CLUSTER, confKey = "spark.jars.excludes"),
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
confKey = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
if (localJars != null) {
childClasspath ++= localJars.split(",")
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
if (args.jars != null) {
childClasspath ++= args.jars.split(",")
if (deployMode == CLIENT) {
if (args.childArgs != null) {
childArgs ++= args.childArgs
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
if (opt.confKey != null) {
sparkConf.set(opt.confKey, opt.value)
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
sparkConf.set("spark.jars", jars.mkString(","))
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) {
childArgs += "--supervise"
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
if (clusterManager == YARN) {
if (args.isPython) {
sparkConf.set("spark.yarn.isPython", "true")
if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sparkConf)
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = REST_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sparkConf.set("spark.submit.pyFiles", args.pyFiles)
} else if (args.isR) {
childArgs += (args.primaryResource, "")
} else {
childArgs += (args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
if (deployMode == CLUSTER) {
sparkConf.remove("spark.driver.host")
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
sparkConf.getOption(config).foreach { oldValue =>
sparkConf.set(config, Utils.resolveURIs(oldValue))
sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
resolvedPyFiles
sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
(childArgs, childClasspath, sparkConf, childMainClass)
2.4.2.4 doRunMain
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
} catch {
case e: Exception =>
if (e.getStackTrace().length == 0) {
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
exitFn(1)
} else {
throw e
} else {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
2.4.2.5 runMain
* 使用提供的启动环境运行子类的main方法。
* 请注意,如果我们正在运行集群部署模式或python应用程序,则该主类将不是用户提供的主类。
* 这里的参数有子类需要的参数,子类路径,sparkConf,子类main()路径,参数重复判断
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sparkConf: SparkConf,
childMainClass: String,
verbose: Boolean): Unit = {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
val loader = if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
* 通过classOf[]构建从属于mainClass的SparkApplication对象
* 然后通过mainclass实例化了SparkApplication
* SparkApplication是一个抽象类,这里主要是实现它的start()
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
new JavaMainApplication(mainClass)
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
2.4.3 SparkApplication
package org.apache.spark.deploy
import java.lang.reflect.Modifier
import org.apache.spark.SparkConf
* 这是spark任务的入口抽象类,需要实现它的无参构造
private[spark] trait SparkApplication {
def start(args: Array[String], conf: SparkConf): Unit
* 用main方法包装标准java类的SparkApplication实现
* 用main方法包装标准java类的SparkApplication实现配置是通过系统配置文件传递,在同一个JVM中加载太多配置会可能导致配置溢出
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
mainMethod.invoke(null, args)
如果是在本地模式,到SparkApplication这个类这里已经运行结束。
但是如果是yarn cluster模式,它创建的实例是不同的,也就是start()启动的类其实是YarnClusterApplication,同样继承了SparkApplication,在后续的文章中回继续跟进。
3. 源码地址
github.com/perkinls/sp…
4. 参考文献
《Spark内核设计艺术》 关注公众号Data Porter 回复: Spark内核设计艺术免费领取
github.com/apache/spar…
github.com/CrestOfWave…
blog.csdn.net/do_yourself…
blog.csdn.net/lingeio/art…
关注公众号 数据工匠记
,专注于大数据领域离线、实时技术干货定期分享!个人网站 www.lllpan.top