logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId()
// Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM // 设置启动命令 val containerContext = createContainerLaunchContext(newAppResponse) // 根据 containerContext 设置 appContext,准备提交给AM val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId } catch { case e: Throwable => if (appId != null) { cleanupStagingDir(appId) } throw e } }
val userClass = // 设置主运行函数 if (isClusterMode) { Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) } else { Nil } val userJar = if (args.userJar != null) { Seq("--jar", args.userJar) } else { Nil } val primaryPyFile = if (isClusterMode && args.primaryPyFile != null) { Seq("--primary-py-file", newPath(args.primaryPyFile).getName()) } else { Nil } val primaryRFile = if (args.primaryRFile != null) { Seq("--primary-r-file", args.primaryRFile) } else { Nil } // 设置AM执行参数 val amClass = if (isClusterMode) { // 进入 ApplicationMaster 看看如何启动 Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs } val userArgs = args.userArgs.flatMap { arg => Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) }
// Command for the ApplicationMaster val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null"else s).toList amContainer.setCommands(printableCommands.asJava)
privatedefrunDriver(): Unit = { addAmIpFilter(None) // 启动用户提交的 class userClassThread = startUserApplication() // 等待SparkContext初始化 // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl))
val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) // 创建Executor资源 createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. thrownewIllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkExceptionif e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
privatedefstartUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread")
var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { // When running pyspark, the app is run using PythonRunner. The second argument is the list // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. userArgs = Seq(args.primaryPyFile, "") ++ userArgs } if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { // TODO(davies): add R dependencies here }
val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]])
val userThread = newThread { overridedefrun() { try { if (!Modifier.isStatic(mainMethod.getModifiers)) { logError(s"Could not find static main method in object ${args.userClass}") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) } else { // 这里执行main函数 mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running user class") } } catch { case e: InvocationTargetException => e.getCause match { case _: InterruptedException => // Reporter thread can interrupt to stop user class caseSparkUserAppException(exitCode) => val msg = s"User application exited with status $exitCode" logError(msg) finish(FinalApplicationStatus.FAILED, exitCode, msg) case cause: Throwable => logError("User class threw exception: " + cause, cause) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, "User class threw exception: " + StringUtils.stringifyException(cause)) } sparkContextPromise.tryFailure(e.getCause()) } finally { // Notify the thread waiting for the SparkContext, in case the application did not // instantiate one. This will do nothing when the user code instantiates a SparkContext // (with the correct master), or when the user code throws an exception (due to the // tryFailure above). sparkContextPromise.trySuccess(null) } } } userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread }
privatedefcreateAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { val appId = client.getAttemptId().getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString // Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. // Use placeholders for information that changes such as executor IDs. logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) val dummyRunner = newExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>", "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources) dummyRunner.launchContextDebugInfo() } // 申请资源并创建executor allocator = client.createAllocator( yarnConf, _sparkConf, driverUrl, driverRef, securityMgr, localResources)
credentialRenewer.foreach(_.setDriverRef(driverRef)) // 注册RPC服务,创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息 // Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), // the allocator is ready to service requests. rpcEnv.setupEndpoint("YarnAM", newAMEndpoint(rpcEnv, driverRef))
allocator.allocateResources() val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr) val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) // 注册服务,监听信息 ms.registerSource(newApplicationMasterSource(prefix, allocator)) // do not register static sources in this case as per SPARK-25277 ms.start(false) metricsSystem = Some(ms) reporterThread = launchReporterThread() }