Spark Yarn 启动流程

Spark启动流程

Spark应用提交到YARN执行的命令:

1
2
3
4
5
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
$SPARK_HOME/examples/jars/spark-examples*.jar

要完成提交到YARN执行的过程,需要经过以下几个阶段:

  1. 应用提交:确定应用主类
  2. 执行环境准备:创建SparkContext
  3. 任务的调度和执行:执行任务并返回结果

三个阶段

第一阶段:应用提交

这个阶段在Driver端进行,主要目标是:准备依赖包并确定Spark应用的执行主类。

  • 解析任务提交的参数,并对参数进行解析和保存;
  • 准备(可能会下载)任务启动参数指定的依赖文件或程序包;
  • 根据Spark应用的执行模式和应用的编写语言,来确定执行的主类名称;
  • 实例化执行主类,生成SparkApplication对象,并调用SparkApplication#start()函数来运行Spark应用(若是Java或scala代码其实是:执行Spark应用中的main函数);

yarn/Client.scala

1
2
3
4
5
def run(): Unit = {
// 执行提交应用
this.appId = submitApplication()
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
// 设置启动命令
val containerContext = createContainerLaunchContext(newAppResponse)
// 根据 containerContext 设置 appContext,准备提交给AM
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)

appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// createContainerLaunchContext

val userClass =
// 设置主运行函数
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
val primaryPyFile =
if (isClusterMode && args.primaryPyFile != null) {
Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
} else {
Nil
}
val primaryRFile =
if (args.primaryRFile != null) {
Seq("--primary-r-file", args.primaryRFile)
} else {
Nil
}
// 设置AM执行参数
val amClass =
if (isClusterMode) {
// 进入 ApplicationMaster 看看如何启动
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}

// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)

第二阶段:执行环境准备

通过第一阶段,已经找到了运行在Driver端的Spark应用的执行主类,并创建了SparkApplication对象:app。此时,在app.start()函数中会直接调用主类的main函数开始执行应用,从而进入第二阶段。

第二阶段主要目标是:创建SparkSession(包括SparkContext和SparkEnv),完成资源的申请和Executor的创建。第2阶段完成后Task的执行环境就准备好了。

也就是说,第2阶段不仅会在Driver端进行初始化,而且还要准备好Executor。这一阶段的任务主要是在Driver端执行创建SparkSession的代码来完成,也就是执行下面一行代码:

val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

第二阶段的Driver端主要完成以下步骤:

  • 创建SparkContext和SparkEnv对象,在创建这两个对象时,向Cluster Manager申请资源,启动各个服务模块,并对服务模块进行初始化。
  • 这些服务模块包括:DAG调度服务,任务调度服务,shuffle服务,文件传输服务,数据块管理服务,内存管理服务等。

第二阶段的Executor端主要完成以下步骤:

  • Driver端向Cluster Manager申请资源,若是Yarn模式会在NodeManager上创建ApplicationMaster,并由ApplicationMaster向Cluster Manager来申请资源,并启动Container,在Container中启动Executor。
  • 在启动Executor时向Driver端注册BlockManager服务,并创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息。

相关代码:

ApplicationMaster

从上面的参数可以看到,AM启动时也是通过命令行,这样我们直接找到 AM 的 main 函数。

1
2
3
4
5
6
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
System.exit(master.run())
}

master.run() 进去:

1
2
3
4
5
6
final def run(): Int = {
doAsUser {
runImpl()
}
exitCode
}

看看runImpl的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private def runImpl(): Unit = {
val appAttemptId = client.getAttemptId()

var attemptID: Option[String] = None

if (isClusterMode) {
// 配置系统环境
}

...

if (isClusterMode) {
// 主要在这里,启动driver
runDriver()
} else {
runExecutorLauncher()
}
}

runDriver()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private def runDriver(): Unit = {
addAmIpFilter(None)
// 启动用户提交的 class
userClassThread = startUserApplication()
// 等待SparkContext初始化
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv

val userConf = sc.getConf
val host = userConf.get("spark.driver.host")
val port = userConf.get("spark.driver.port").toInt
registerAM(host, port, userConf, sc.ui.map(_.webUrl))

val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
// 创建Executor资源
createAllocator(driverRef, userConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
resumeDriver()
}
}

startUserApplication()

获取main函数:val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]])

执行main方法:mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")

var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
}

val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])

val userThread = new Thread {
override def run() {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
// 这里执行main函数
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
case SparkUserAppException(exitCode) =>
val msg = s"User application exited with status $exitCode"
logError(msg)
finish(FinalApplicationStatus.FAILED, exitCode, msg)
case cause: Throwable =>
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + StringUtils.stringifyException(cause))
}
sparkContextPromise.tryFailure(e.getCause())
} finally {
// Notify the thread waiting for the SparkContext, in case the application did not
// instantiate one. This will do nothing when the user code instantiates a SparkContext
// (with the correct master), or when the user code throws an exception (due to the
// tryFailure above).
sparkContextPromise.trySuccess(null)
}
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}

创建Executor启动资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>", "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
// 申请资源并创建executor
allocator = client.createAllocator(
yarnConf,
_sparkConf,
driverUrl,
driverRef,
securityMgr,
localResources)

credentialRenewer.foreach(_.setDriverRef(driverRef))
// 注册RPC服务,创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
// 注册服务,监听信息
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
// do not register static sources in this case as per SPARK-25277
ms.start(false)
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}

第三阶段:任务的调度和执行

通过第二阶段已经完成了Task执行环境的初始化,此时,在Driver端已经完成了SparkContext和SparkEnv的创建,资源已经申请到了,并且已经启动了Executor。

第三阶段Driver端主要完成以下步骤:

  • 执行Spark的处理代码,当执行map操作时,生成新的RDD;
  • 当执行Action操作时,触发Job的提交,此时会执行以下步骤:
  • 根据RDD的血缘,把Job划分成相互依赖的Stage;
  • 把每个Stage拆分成一个或多个Task;
  • 把这些Task提交给已经创建好的Executor去执行;
  • 获取Executor的执行状态信息,直到Executor完成所有Task的执行;
  • 获取执行结果和最终的执行状态。