# 命令行提交Spark应用样例:
#./bin/spark-submit \
#  --class com.imooc.spark.Test.TestOfSparkContext2 \
#  --conf spark.master spark://localhost:7077 \
#  --master local[2] \
#  /home/hadoop/data/test-jar/sql-1.0.jar arg1 arg2
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

构建入口类

构建SparkSumit对象,它是一个Spark应用程序的入口类(工具类),可以完成提交、停止、查询状态等功能。

  override def main(args: Array[String]): Unit = {
  	// 创建自定义SparkSubmit类,使用匿名子类的创建方式来override一些方法
    val submit = new SparkSubmit() {
      self => // 创建SparkSubmit类的一个别名
      // 自定义参数解析类匿名子类对象,主要自定义了如何打印日志
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
      override protected def logInfo(msg: => String): Unit = printMessage(msg)
      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
      // 重载此方法,主要是添加try...catch语句,捕获异常
      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
    submit.doSubmit(args)

在SparkSubmit类对象主要有2个功能,一个是完成参数的解析及加载,一个是尝试提交、停止、查询某一个Spark应用。

参数解析及加载

参数的解析及加载过程通过SparkSubmitArguments(...)完成,详细的主流程如下代码片段。这段代码也告诉了我们Spark加载参数的顺序,根据参数的作用优先级低到高排列如下:

  • 加载通过参数--properties-file指定的文件中加载配置信息作为默认的属性
  • 加载用户通过命令行指定的各项属性,包括--conf | --jars | --class等,作为
  • 如果用户没有通过参数--properties-file,指定属性文件,则加载环境变量SPARK_CONF_DIR指定的路径或是${SPARK_HOME}/conf路径下的spark-defaults.conf文件中的配置信息,并与前面所有读取的属性合并
  • 加载通过环境变量指定的各种属性,后续在访问每个变量时,优先使用相应的环境变量

此方法会尽最大可能的来符合Spark定义的参数信息,而忽略掉那些可以不符合规则(不以spark.开头的属性)的属性,最终如果参数解析完成就会生成有效的类对象,否则会输出有效的提示信息并退出当前进程。

/** args函数参数:通过启动脚本接收到的所有在/bin/spark-submit之后的参数 **/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser with Logging {
  /** Default properties present in the currently defined defaults file. */
  lazy val defaultSparkProperties: HashMap[String, String] = {
    val defaultProperties = new HashMap[String, String]()
    if (verbose) {
      logInfo(s"Using properties file: $propertiesFile")
    Option(propertiesFile).foreach { filename =>
      val properties = Utils.getPropertiesFromFile(filename)
      properties.foreach { case (k, v) =>
        defaultProperties(k) = v
      // Property files may contain sensitive information, so redact before printing
      if (verbose) {
        Utils.redact(properties).foreach { case (k, v) =>
          logInfo(s"Adding default property: $k=$v")
    defaultProperties
  // Set parameters from command line arguments
  parse(args.asJava)
  // Populate `sparkProperties` map from properties file
  mergeDefaultSparkProperties()
  // Remove keys that don't start with "spark." from `sparkProperties`.
  ignoreNonSparkProperties()
  // Use `sparkProperties` map along with env vars to fill in any missing parameters
  loadEnvironmentArguments()
  useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean
  validateArguments()

从下面代码可以看到Spark CLI支持4种操作,但这里主要关注submit流程,其它方法暂不深究,详细的分析见下一小节。

  def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()

submit(...)方法主要的功能就是,解析参数、构建和执行用户指定的入口类或是Spark内部类。有关加载各种参数的过程在前面的小节已经分析过,这里我们主要看一下runMain(...)方法。

* Submit the application using the provided parameters. * This runs in two steps. First, we prepare the launch environment by setting up * the appropriate classpath, system properties, and application arguments for * running the child main class based on the cluster manager and the deploy mode. * Second, we use this launch environment to invoke the main method of the child * main class. @tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // 分类参数,child指用户指定的入口类的子进程的概念: // childArgs Array,包含了传递给 // childClasspath Array,包含了用户通过spark.jars属性、--jars参数及指定的 // 入口jar包,其中当提交的任务模式为client时,会首先尝试下载通过spark.jars或 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") } else { throw e } else { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) // In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => logWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args, false) // In all other modes, just run the main class as prepared } else { doRunMain()

runMain方法

此方法通过反射的方式,生成用户指定的入口类或是Spark的内置类,由于生成的类可能是实现了SparkApplication接口的子类抑或是一个自定义的类,因此需要根据这两种情况分析选择是直接执行生成类的start(...)方法间接调用生成类的main(...)方法,同时传递所有解析到的spark参数及需要应用接收的各个args。

* Run the main method of the child class using the provided launch environment. * Note that this main class will not be the one provided by the user if we're * running cluster deploy mode or python applications. private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sparkConf: SparkConf, childMainClass: String, verbose: Boolean): Unit = { // ... 忽略添加jar包到JAVA的系统路径下的代码逻辑,这里会根据用户是否指定了 // spark.driver.userClassPathFirst // 这个参数,来选择添加jar包的优先级 var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { case e: ClassNotFoundException => logWarning(s"Failed to load $childMainClass.", e) if (childMainClass.contains("thriftserver")) { logInfo(s"Failed to load main class $childMainClass.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) case e: NoClassDefFoundError => logWarning(s"Failed to load $childMainClass: ${e.getMessage()}") if (e.getMessage.contains("org/apache/hadoop/hive")) { logInfo(s"Failed to load hive class.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.newInstance().asInstanceOf[SparkApplication] } else { // SPARK-4170 if (classOf[scala.App].isAssignableFrom(mainClass)) { logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") new JavaMainApplication(mainClass) @tailrec def findCause(t: Throwable): Throwable = t match { case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e case e: Throwable => try { app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => throw findCause(t)

到此,整个通过spark-submit命令提交任务的流程已简单剖析完毕,更详细的内容读者可以自行查看源码。

下一篇浅析创建Kubernetes任务的流程。

spark-submit方式提交应用启动脚本文件# 命令行提交Spark应用样例:#./bin/spark-submit \# --class com.imooc.spark.Test.TestOfSparkContext2 \# --conf spark.master spark://localhost:7077 \# --master local[2] \# /home/hadoop/data/test-jar/sql-1.0.jar arg1 arg2#if [ -z "${
spark-submit 命令使用详解 spark-submit 用户打包 Spark 应用程序并部署到 Spark 支持的集群管理气上,命令语法如下: spark-submit [options] <python file> [app arguments] app arguments 是传递给应用程序的参数,常用的命令行参数如下所示: –master: 设置主节点 URL 的参数...
启动spark-shell或执行spark-submit失败的问题 楼主搭建了一套CDH大数据平台,其中包括Spark2服务,近来在终端启动spark-shell以及利用spark-submit提交任务时,分别报以下错误: [root@cdh0 ~]# spark-shell Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream at org.apac
String[] args = new String[]{"--jars", "/root/kafka_2.12-0.10.2.0/libs/kafka-clients-0.10.2.0.jar,/root/spark-streaming-kafka-0-10_2.11-2.1.0.jar", "--driver-class-path", "/roo
 --master        MASTER_URL, 可 以 是 spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local --deploy-mode        DEPLOY_MODE, Driver 程序运行的地方,client 或者 cluster,默认是client。 Spark-submit脚本提交任务时最简易的命令格式如下: ./bin/spark-submit \ --master spark://localhost:7077 \ 任务任务参数 而实际开发中用的一般是如下的格式 ./bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 11
今天主要分析一下Spark源码中提交任务脚本的处理逻辑,从spark-submit一步步深入进去看看任务提交的整体流程,首先看一下整体的流程概要图: 接下来按照图中结构出发一步步看源码:spark-submit#!/usr/bin/env bash# # Licensed to the Apache Software Foundation (ASF) under one or more # con
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.4.0.jar 100 其中 spark - submit 脚...
大数据实验教学系统使用spark-submit工具提交Spark作业对于数据的批处理,通常采用编写程序、打.jar包提交给集群来执行,这需要使用Spark自带的spark-submit工具。   一般的部署策略是在一个网关机器上提交应用程序,这个机器和Worker机器部署在一个网络中(例如,Standalone模式的集群中的Master节点)。在此部署策略中,client模式更为合适,client模式中的driver直接跟spark-submit进程一起启动,spark-submit进程在此扮演集群中一个c
原来程序是将所有jar打包到libs目录下,然后运行生成好的run.sh。现在要使用spark-submit将它提交spark上运行。几经波折之后,终于圆满完成。 首先遇到的问题是如何使用gradle将工程打包成可执行的jar文件。这个问题网上已有答案,就是使用插件 "com.github.johnrengelman.shadow"。gradle的配置如下: apply plugin: '
其中,`[options]`是可选的命令行选项,可以用来设置Spark应用程序的配置参数,例如`--master`用于设置Spark集群的主节点地址,`--num-executors`用于设置执行器的数量等等。`<app jar | python file>`是必须的参数,用于指定要提交的应用程序的jar包或Python文件。`[app arguments]`是可选的应用程序参数,用于传递给应用程序的命令行参数。 例如,以下命令将提交一个Java应用程序到Spark集群中运行: spark2-submit --class com.example.MyApp --master yarn --num-executors 10 myapp.jar arg1 arg2 该命令指定了应用程序的主类为`com.example.MyApp`,使用YARN作为集群管理器,设置了10个执行器,并传递了两个应用程序参数`arg1`和`arg2`。 总之,spark2-submitSpark应用程序提交的重要工具,可以通过命令行选项来配置应用程序的运行环境和参数,方便地将应用程序提交Spark集群中运行。
Ahxing1985: 除了你说的这种pipeline,还有其他的为了更高效率的算子,比如MergingAggregatedTransform,是将已有的聚合结果拿出来合并成最终结果。比如: [code=sql] explain pipeline select count() from events; [/code] 得到的执行pipeline是这样的,SourceFromSingleChunk -> MergingAggregatedTransform. ┌─explain─────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (MergingAggregated) │ │ MergingAggregatedTransform │ │ (ReadFromPreparedSource) │ │ SourceFromSingleChunk 0 → 1 │ └─────────────────────────────────┘ 如果是开了optimize_aggregation_in_order 优化,且group by 的列恰好是定义MergeTree表的ORDER BY关键字列时,会采用两阶段聚合,用AggregatingInOrderTransform, FinishAggregatingInOrderTransform 和 MergingAggregatedBucketTransform 算子。 ClickHouse 聚合函数的执行过程 Ahxing1985: 写得相当好 ClickHouse 聚合函数的执行过程 weixin_40866498: 真是厉害呢~学习了 ClickHouse 聚合函数的执行过程 weixin_40866498: 真是厉害呢~学习了 轻量、高可用的任务调度系统实之背包问题 weixin_46333924: 您好! github的链接是指向本文的。