def main(args: Array[String]): Unit = {
//这里将传入的args参数进行初始化
val appArgs = new SparkSubmitArguments(args)
//判断参数是否有效合法
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
//判断执行类别
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
如果提交命令正确,开始执行spark:
/**
* Submit the application using the provided parameters.
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
/**准备执行环境,这里主要得到了以下4个参数:
(1)childArgs: 子进程的参数
(2)childClasspath: 子进程的执行环境
(3)sysProps:系统参数
(4)childMainClass:子类名
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
//开始执行Spark任务
def doRunMain(): Unit = {
//是否需要创建代理用户
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}