Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Operation method of task submission in yarn-cluster mode of spark source code

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the operation method of spark source code yarn-cluster mode task submission, the content is very detailed, interested friends can refer to, hope to be helpful to you.

First, run the command bin/spark-submit\-- master yarn\-- deploy-mode cluster\-- class org.apache.spark.examples.SparkPi\ examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar II, and submit the task flow chart

Third, start the script

View the spark-submit script file, and the program entry is

Exec "${SPARK_HOME}" / bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

Looking at ${SPARK_HOME} "/ bin/spark-class, you can see that the script executes the java-cp main-class command to start a java process called the SparkSubmit,main function in the main class org.apache.spark.deploy.SparkSubmit.

The specific commands actually executed are:

/ etc/alternatives/jre/bin/java-Dhdp.version=3.0.1.0-187-cp / usr/hdp/3.0.1.0-187/spark2/conf/:/usr/hdp/3.0.1.0-187/spark2/jars/*:/usr/hdp/3.0.1.0-187/hadoop/conf/-Xmx1g org.apache.spark.deploy.SparkSubmit-- master yarn-- class org.apache.spark.examples.SparkPi examples/jars/spark-examples_ 2.11-2.3.1.3.0.1.0-187.jar four Program entry class org.apache.spark.deploy.SparkSubmit

This class has a companion object, which has the main function, creates the SparkSubmit object and executes doSubmit ()

Override def main (args: Array [String]): Unit = {val submit = new SparkSubmit () {...} submit.doSubmit (args)}

DoSubmit parses the args parameter, wraps it into an appArgs:SparkSubmitArguments object, and then executes submit (appArgs, uninitLog).

Def doSubmit (args: Array [String]): Unit = {/ / Initialize logging if it hasn't been done yet. Keep track of whether logging needs to / / be reset before the application starts. Val uninitLog = initializeLogIfNecessary (true, silent = true) val appArgs = parseArguments (args) if (appArgs.verbose) {logInfo (appArgs.toString)} appArgs.action match {case SparkSubmitAction.SUBMIT = > submit (appArgs, uninitLog) case SparkSubmitAction.KILL = > kill (appArgs) case SparkSubmitAction.REQUEST_STATUS = > requestStatus (appArgs) case SparkSubmitAction.PRINT_VERSION = > printVersion ()}}

Submit (appArgs, uninitLog) calls runMain (args: SparkSubmitArguments, uninitLog: Boolean)

Private def runMain (args: SparkSubmitArguments, uninitLog: Boolean): Unit = {val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment (args). . . Try {mainClass = Utils.classForName (childMainClass)} catch {...} val app: SparkApplication = if (classof [SparkApplication] .isAssignableFrom (mainClass)) {mainClass.getConstructor (). NewInstance (). AsInstanceOf [SparkApplication]} else {new JavaMainApplication (mainClass)}. . . Try {app.start (childArgs.toArray, sparkConf)} catch {case t: Throwable = > throw findCause (t)}}

MainClass is very important here, first determine whether mainClass is a subclass of SparkApplication, and if so, call its constructor to create an object through reflection.

If not, create a JavaMainApplication (which is a subclass of SparkApplication) and use reflection to execute the main function in mainClass in its override def start (args: Array [String], conf: SparkConf) function.

After SparkApplication is created, its start (childArgs.toArray, sparkConf) method is executed.

/ * Entry point for a Spark application. Implementations must provide a no-argument constructor. * / private [spark] trait SparkApplication {def start (args: Array [String], conf: SparkConf): Unit} / * * Implementation of SparkApplication that wraps a standard Java class with a "main" method. * Configuration is propagated to the application via system properties, so running multiple * of these in the same JVM may lead to undefined behavior due to configuration leaks. * / private [deploy] class JavaMainApplication (klass: Class [_]) extends SparkApplication {override def start (args: Array [String], conf: SparkConf): Unit = {val mainMethod = klass.getMethod ("main", new Array [String] (0) .getClass) if (! Modifier.isStatic (mainMethod.getModifiers)) {throw new IllegalStateException ("The main method in the given main class must be static")} val sysProps = conf.getAll.toMap sysProps.foreach {case (k) V) = > sys.props (k) = v} mainMethod.invoke (null, args)}}

If * *-deploy-mode** is the value of client mainClass, it is determined by the command-line argument-class, which is org.apache.spark.examples.SparkPi.

In this case, the client code will be executed in the current virtual machine, and it will be more complex in the case of other conditions.

The run command specified above is an example, where mainClass is the org.apache.spark.deploy.yarn.YarnClusterApplication class class object.

Private [deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"... if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) {childArgs + = ("--primary-py-file", args.primaryResource) childArgs + = ("--class" "org.apache.spark.deploy.PythonRunner")} else if (args.isR) {val mainFile = new Path (args.primaryResource). GetName childArgs + = ("--primary-r-file", mainFile) childArgs + = ("--class", "org.apache.spark.deploy.RRunner")} else {if (args.primaryResource! = SparkLauncher.NO_RESOURCE) {childArgs + = ("- jar", args.primaryResource)} childArgs + = ("--class") Args.mainClass)} if (args.childArgs! = null) {args.childArgs.foreach {arg = > childArgs + = ("--arg", arg)}} five Org.apache.spark.deploy.yarn.YarnClusterApplication class

This class is in the spark-yarn package.

Org.apache.spark spark-yarn_$ {scala.version} ${spark.version}

Start executing its override def start (args: Array [String], conf: SparkConf) method.

Private [spark] class YarnClusterApplication extends SparkApplication {override def start (args: Array [String], conf: SparkConf): Unit = {/ / SparkSubmit would use yarn cache to distribute files & jars in yarn mode, / / so remove them from sparkConf here for yarn mode. Conf.remove (JARS) conf.remove (FILES) new Client (new ClientArguments (args), conf, null). Run ()

Create a client-side Client in the SparkSubmi process, which is a proxy class that includes YarnClient and executes the run () method.

Submit Application to yarn cluster ResourceManager, and return appid after successful submission

If spark.submit.deployMode=cluster&&spark.yarn.submit.waitAppCompletion=true

The SparkSubmit process periodically outputs the appId log until the end of the task (monitorApplication (appId)), otherwise it outputs the log once and then exits.

Def run (): Unit = {this.appId = submitApplication () if (! launcherBackend.isConnected () & & fireAndForget) {val report = getApplicationReport (appId) val state = report.getYarnApplicationState logInfo (s "Application report for $appId (state: $state)") logInfo (formatReportDetails (report)) if (state = = YarnApplicationState.FAILED | state = = YarnApplicationState.KILLED) {throw new SparkException (s "Application $appId finished with status: $state") }} else {val YarnAppReport (appState FinalState Diags) = monitorApplication (appId) if (appState = = YarnApplicationState.FAILED | | finalState = = FinalApplicationStatus.FAILED) {diags.foreach {err = > logError (s "Application diagnostics message: $err")} throw new SparkException (s "Application $appId finished with failed status")} if (appState = = YarnApplicationState.KILLED | | finalState = = FinalApplicationStatus.KILLED) {throw new SparkException (s "Application $appId is killed")} If (finalState = = FinalApplicationStatus.UNDEFINED) {throw new SparkException (s "The final status of application $appId is undefined")}

Continue tracking submitApplication ()

Def submitApplication (): ApplicationId = {ResourceRequestHelper.validateResources (sparkConf) var appId: ApplicationId = null try {launcherBackend.connect () yarnClient.init (hadoopConf) yarnClient.start () logInfo ("Requesting a new application from cluster with% d NodeManagers" .format (yarnClient.getYarnClusterMetrics.getNumNodeManagers)) / / Get a new application from our RM val newApp = yarnClient.createApplication () val newAppResponse = newApp.getNewApplicationResponse () appId = newAppResponse.getApplicationId () / / The app staging dir based on the STAGING_DIR configuration if configured / / otherwise based on the users home directory. Val appStagingBaseDir = sparkConf.get (STAGING_DIR) .map {new Path (_, UserGroupInformation.getCurrentUser.getShortUserName)} .getOrElse (FileSystem.get (hadoopConf). GetHomeDirectory ()) stagingDirPath = new Path (appStagingBaseDir, getAppStagingDir (appId)) new CallerContext ("CLIENT", sparkConf.get (APP_CALLER_CONTEXT)) Option (appId.toString). SetCurrentContext () / / Verify whether the cluster has enough resources for our AM verifyClusterResources (newAppResponse) / / Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext (newAppResponse) val appContext = createApplicationSubmissionContext (newApp, containerContext) / / Finally Submit and monitor the application logInfo (s "Submitting application $appId to ResourceManager") yarnClient.submitApplication (appContext) launcherBackend.setAppId (appId.toString) reportLauncherState (SparkAppHandle.State.SUBMITTED) appId} catch {case e: Throwable = > if (stagingDirPath! = null) {cleanupStagingDir ()} throw e}

This method has done the following work (corresponding to the task submission flowchart of 1pm / 2p3):

1. Send a request to ResourceManager to create an Application to get the globally unique

AppId .

2. According to the configured cache directory information + appId information, create a cache directory stagingDirPath that runs Application.

3verifyClusterResources verifies whether there are enough resources available in the cluster, and throws an exception if not.

4CreateContainerLaunchContext creates a Container, which encapsulates the start command of the Container process.

5, submit appContext.

View the createContainerLaunchContext (newAppResponse) code.

Val amClass = if (isClusterMode) {Utils.classForName ("org.apache.spark.deploy.yarn.ApplicationMaster"). GetName} else {Utils.classForName ("org.apache.spark.deploy.yarn.ExecutorLauncher") .getName}.. / / Command for the ApplicationMaster val commands = prefixEnv + + Seq (Environment.JAVA_HOME.$$ () + "/ bin/java", "- server") + + javaOpts + + amArgs + + Seq ("1 >") ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ stdout", "2 >", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ stderr") / / TODO: it would be nicer to just make sure there are no null commands hereval printableCommands = commands.map (s = > if (s = = null) "null" else s) .toList amContainer.setCommands (printableCommands.asJava)

The startup code for Container is about

Bin/java-server org.apache.spark.deploy.yarn.ApplicationMaster-- class...

Sixth, org.apache.spark.deploy.yarn.ApplicationMaster class.

A NodeManager in the yarn cluster receives a command from ResourceManager to start the ApplicationMaster process, corresponding to step 4 in the task submission flowchart.

View the main method in the ApplicationMaster companion object.

Def main (args: Array [String]): Unit = {SignalUtils.registerLogger (log) val amArgs = new ApplicationMasterArguments (args) val sparkConf = new SparkConf () if (amArgs.propertiesFile! = null) {Utils.getPropertiesFromFile (amArgs.propertiesFile). Foreach {case (k, v) = > sparkConf.set (k, v)}} / / Set system properties foreach config entry. This covers two use cases: / /-The default configuration stored by the SparkHadoopUtil class / /-The user application creating a new SparkConf in cluster mode / Both cases create a new SparkConf object which reads these configs from system properties. SparkConf.getAll.foreach {case (k, v) = > sys.props (k) = v} val yarnConf = new YarnConfiguration (SparkHadoopUtil.newConfiguration (sparkConf)) master = new ApplicationMaster (amArgs, sparkConf, yarnConf) val ugi = sparkConf.get (PRINCIPAL) match {/ / We only need to log in with the keytab in cluster mode. In client mode, the driver / / handles the user keytab. Case Some (principal) if master.isClusterMode = > val originalCreds = UserGroupInformation.getCurrentUser (). GetCredentials () SparkHadoopUtil.get.loginUserFromKeytab (principal, sparkConf.get (KEYTAB) .orNull) val newUGI = UserGroupInformation.getCurrentUser () if (master.appAttemptId = = null | | master.appAttemptId.getAttemptId > 1) {/ / Re-obtain delegation tokens if this is not a first attempt, as they might be outdated / / as of now. Add the fresh tokens on top of the original user's credentials (overwrite). / / Set the context class loader so that the token manager has access to jars / / distributed by the user. Utils.withContextClassLoader (master.userClassLoader) {val credentialManager = new HadoopDelegationTokenManager (sparkConf, yarnConf, null) credentialManager.obtainDelegationTokens (originalCreds)}} / Transfer the original user's tokens to the new user, since it may contain needed tokens / / (such as those user to connect to YARN). NewUGI.addCredentials (originalCreds) newUGI case _ = > SparkHadoopUtil.get.createSparkUser ()} ugi.doAs (new PrivilegedExceptionAction [Unit] () {override def run (): Unit = System.exit (master.run ()}))}

The ApplicationMaster object is created and its run () method is executed.

Final def run (): Int = {try {val attemptID = if (isClusterMode) {/ / Set the web ui port to be ephemeral for yarn so we don't conflict with / / other spark processes running on the same box System.setProperty (UI_PORT.key, "0") / / Set the master and deploy mode property to match the requested mode. System.setProperty ("spark.master", "yarn") System.setProperty (SUBMIT_DEPLOY_MODE.key, "cluster") / Set this internal configuration if it is running on cluster mode, this / / configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty ("spark.yarn.app.id", appAttemptId.getApplicationId (). ToString () Option (appAttemptId.getAttemptId.toString)} else {None} new CallerContext ("APPMASTER", sparkConf.get (APP_CALLER_CONTEXT), Option (appAttemptId.getApplicationId.toString), attemptID). SetCurrentContext () logInfo ("ApplicationAttemptId:" + appAttemptId) / / This shutdown hook should run * after* the SparkContext is shut down. Val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY-1 ShutdownHookManager.addShutdownHook (priority) {() = > val maxAppAttempts = client.getMaxRegAttempts (sparkConf, yarnConf) val isLastAttempt = appAttemptId.getAttemptId () > = maxAppAttempts if (! finished) {/ / The default state of ApplicationMaster is failed if it is invoked by shut down hook. / / This behavior is different compared to 1.x version. / / If user application is exited ahead of time by calling System.exit (N), here mark / / this application as failed with EXIT_EARLY For a good shutdown, user shouldn't call / / System.exit (0) to terminate the application. Finish (finalStatus, ApplicationMaster.EXIT_EARLY, "Shutdown hook called before final status was reported.")} if (! unregistered) {/ / we only want to unregister if we don't want the RM to retry if (finalStatus = = FinalApplicationStatus.SUCCEEDED | | isLastAttempt) {unregister (finalStatus) FinalMsg) cleanupStagingDir (new Path (System.getenv ("SPARK_YARN_STAGING_DIR"))} if (isClusterMode) {runDriver ()} else {runExecutorLauncher ()} catch {case e: Exception = > / / catch everything else if not specifically handled logError ("Uncaught exception:", e) finish (FinalApplicationStatus.FAILED) ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception:" + StringUtils.stringifyException (e)} finally {try {metricsSystem.foreach {ms = > ms.report () ms.stop ()}} catch {case e: Exception = > logWarning ("Exception during stopping of the metric system:", e)}} exitCode}

Execute the runDriver () method.

UserClassThread = startUserApplication () starts a thread named Driver, which initializes SparkContext by reflecting the main** function in the class (org.apache.spark.examples.SparkPi) specified on the command line * *-class. After the main thread wakes up, register the ApplicationMaster with ResourceManager, step 5

Private def runDriver (): Unit = {addAmIpFilter (None, System.getenv (ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) userClassThread = startUserApplication () / This a bit hacky, but we need to wait until the spark.driver.port property has / / been set by the Thread executing the user class. LogInfo ("Waiting for spark context initialization...") Val totalWaitTime = sparkConf.get (AM_MAX_WAIT_TIME) try {val sc = ThreadUtils.awaitResult (sparkContextPromise.future, Duration (totalWaitTime, TimeUnit.MILLISECONDS)) if (sc! = null) {val rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get (DRIVER_HOST_ADDRESS) val port = userConf.get (DRIVER_PORT) registerAM (host, port, userConf Sc.ui.map (_ .webUrl), appAttemptId) val driverRef = rpcEnv.setupEndpointRef (RpcAddress (host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator (driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)} else {/ / Sanity check Should never happen in normal operation, since sc should only be null / / if the user app did not create a SparkContext. Throw new IllegalStateException ("User did not initialize spark context!")} resumeDriver () userClassThread.join ()} catch {case e: SparkException if e.getCause () isInstanceOf [TimeoutException] = > logError (s "SparkContext did not initialize after waiting for $totalWaitTime ms." + "Please check earlier log output for errors. Failing the application.") Finish (FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.")} finally {resumeDriver ()} private def startUserApplication (): Thread = {logInfo ("Starting the user application ina separate Thread") var userArgs = args.userArgs if (args.primaryPyFile! = null & & args.primaryPyFile.endsWith (".py")) {/ / When running pyspark, the app is run using PythonRunner. The second argument is the list / / of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. UserArgs = Seq (args.primaryPyFile, ") + + userArgs} if (args.primaryRFile! = null & & (args.primaryRFile.endsWith (" .R ") | | args.primaryRFile.endsWith (" .r ")) {/ / TODO (davies): add R dependencies here} val mainMethod = userClassLoader.loadClass (args.userClass) .getMeth od (" main ") Classof [Array [string]] val userThread = new Thread {override def run (): Unit = {try {if (! Modifier.isStatic (mainMethod.getModifiers)) {logError (s "Could not find static main method in object ${args.userClass}") finish (FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)} else {mainMethod.invoke (null) UserArgs.toArray) finish (FinalApplicationStatus.SUCCEEDED ApplicationMaster.EXIT_SUCCESS) logDebug ("Done running user class")}} catch {case e: InvocationTargetException = > e.getCause match {case _: InterruptedException = > / / Reporter thread can interrupt to stop user class case SparkUserAppException (exitCode) = > val msg = s "User application exited with status $exitCode" LogError (msg) finish (FinalApplicationStatus.FAILED ExitCode, msg) case cause: Throwable = > logError ("User class threw exception:" + cause, cause) finish (FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) "User class threw exception:" + StringUtils.stringifyException (cause)} sparkContextPromise.tryFailure (e.getCause ())} finally {/ / Notify the thread waiting for the SparkContext, in case the application did not / / instantiate one. This will do nothing when the user code instantiates a SparkContext / (with the correct master), or when the user code throws an exception (due to the / / tryFailure above). SparkContextPromise.trySuccess (null)}} userThread.setContextClassLoader (userClassLoader) userThread.setName ("Driver") userThread.start () userThread}

After registration, the main thread processes the resource createAllocator (driverRef, userConf, rpcEnv, appAttemptId, distCacheConf) returned by yarn.

Private def createAllocator (driverRef: RpcEndpointRef, _ sparkConf: SparkConf, rpcEnv: RpcEnv, appAttemptId: ApplicationAttemptId, distCacheConf: SparkConf): Unit = {/ / In client mode, the AM may be restarting after delegation tokens have reached their TTL. So / / always contact the driver to get the current set of valid tokens, so that local resources can / / be initialized below. If (! isClusterMode) {val tokens = driverRef.askSync [Array [byte]] (RetrieveDelegationTokens) if (tokens! = null) {SparkHadoopUtil.get.addDelegationTokens (tokens, _ sparkConf)}} val appId = appAttemptId.getApplicationId (). ToString () val driverUrl = RpcEndpointAddress (driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME). ToString val localResources = prepareLocalResources (distCacheConf) / / Before we initialize the allocator Let's log the information about how executors will / / be run up front, to avoid printing this out for every single executor being launched. / / Use placeholders for information that changes such as executor IDs. LogInfo {val executorMemory = _ sparkConf.get (EXECUTOR_MEMORY). ToInt val executorCores = _ sparkConf.get (EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable (None, yarnConf, _ sparkConf, driverUrl, "", executorMemory, executorCores, appId, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo () allocator = client.createAllocator (yarnConf, _ sparkConf, appAttemptId, driverUrl, driverRef) SecurityMgr, localResources) / / Initialize the AM endpoint * after* the allocator has been initialized. This ensures / / that when the driver sends an initial executor request (e.g. After an AM restart), / / the allocator is ready to service requests. RpcEnv.setupEndpoint ("YarnAM", new AMEndpoint (rpcEnv, driverRef) allocator.allocateResources () val ms = MetricsSystem.createMetricsSystem (MetricsSystemInstances.APPLICATION_MASTER, sparkConf, securityMgr) val prefix = _ sparkConf.get (YARN_METRICS_NAMESPACE) .getOrElse (appId) ms.registerSource (new ApplicationMasterSource (prefix, allocator)) / / do not register static sources in this case as per SPARK-25277 ms.start (false) metricsSystem = Some (ms) reporterThread = launchReporterThread ()}

Just look at the key code allocator.allocateResources () to deal with the allocated resources.

Def allocateResources (): Unit = synchronized {updateResourceRequests () val progressIndicator = 0.1f / / Poll the ResourceManager. This doubles as a heartbeat if there are no pending container / / requests. Val allocateResponse = amClient.allocate (progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers () allocatorBlacklistTracker.setNumClusterNodes (allocateResponse.getNumClusterNodes) if (allocatedContainers.size > 0) {logDebug (("Allocated containers:% d. Current executor count:% d." + "Launching executor count:% d. Cluster resources:% s.") .format (allocatedContainers.size, runningExecutors.size, numExecutorsStarting.get AllocateResponse.getAvailableResources)) handleAllocatedContainers (allocatedContainers.asScala)} val completedContainers = allocateResponse.getCompletedContainersStatuses () if (completedContainers.size > 0) {logDebug ("Completed% d containers" .format (completedContainers.size)) processCompletedContainers (completedContainers.asScala) logDebug ("Finished processing% d completed containers. Current running executor count:% d. ".format (completedContainers.size, runningExecutors.size)}}

If the assigned number of Container is greater than 0, call * * handleAllocatedContainers (allocatedContainers.asScala) * *

Def handleAllocatedContainers (allocatedContainers: Seq [Container]): Unit = {val containersToUse = new ArrayBuffer [Container] (allocatedContainers.size) / / Match incoming requests by host val remainingAfterHostMatches = new ArrayBuffer [Container] for (allocatedContainer val absPath = if (new File (uri.getPath ()). IsAbsolute ()) {Client.getClusterPath (sparkConf, uri.getPath ())} else {Client.buildPath (Environment.PWD.$ ()) Uri.getPath ()} Seq ("--user-class-path", "file:" + absPath)}. ToSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument (javaOpts) val commands = prefixEnv + + Seq (Environment.JAVA_HOME.$$ () + "/ bin/java", "- server") + + javaOpts + + Seq ("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "- driver-url", masterAddress, "--executor-id", executorId "- hostname", hostname, "- cores", executorCores.toString, "- app-id", appId, "- resourceProfileId", resourceProfileId.toString) + + userClassPath + + Seq (s "1 > ${ApplicationConstants.LOG_DIR_EXPANSION_VAR} / stdout" S "2 > ${ApplicationConstants.LOG_DIR_EXPANSION_VAR} / stderr") / / TODO: it would be nicer to just make sure there are no null commands here commands.map (s = > if (s = = null) "null" else s). ToList} private def prepareEnvironment (): HashMap [String, String] = {val env = new HashMap [String, String] () Client.populateClasspath (null, conf, sparkConf, env SparkConf.get (EXECUTOR_CLASS_PATH)) System.getenv (). AsScala.filterKeys (_ .startswith ("SPARK")) .foreach {case (k, v) = > env (k) = v} sparkConf.getExecutorEnv.foreach {case (key, value) = > if (key = = Environment.CLASSPATH.name ()) {/ / If the key of env variable is CLASSPATH, we assume it is a path and append it. / This is kept for backward compatibility and consistency with hadoop YarnSparkHadoopUtil.addPathToEnvironment (env, key, value)} else {/ / For other env variables, simply overwrite the value. Env (key) = value}} env}} on the spark source yarn-cluster mode task submission operation method is shared here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report