Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement ScheduleBackend in Yarn

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to achieve ScheduleBackend in Yarn. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

What does ScheduleBackend use in Yarn mode?

When you create a ScheduleBackend in SparkContext, you decide which ScheduleBackend to create based on the prefix of the specified "master" parameter. For a URL like "yarn://host:port", if it is cluster mode, it is creating YarnClusterSchedulerBackend, and if it is client mode, it is creating YarnClientSchedulerBackend.

Let's first look at the code structure of YarnClusterSchedulerBackend.

YarnClusterSchedulerBackend inherits YarnSchedulerBackend and doesn't have much code to play with, so let's just look at YarnSchedulerBackend. I guess it's about the same in client mode.

YarnSchedulerBackend inherits CoarseGrainedSchedulerBackend, so let's see what the difference is.

Overrides the doRequestTotalExecutors and doKillExecutors methods, one to apply for Executor and one to kill Executor.

Override def doRequestTotalExecutors (requestedTotal: Int): Future [Boolean] = {yarnSchedulerEndpointRef.ask [Boolean] (prepareRequestExecutors (requestedTotal))} override def doKillExecutors (executorIds: Seq [String]): Future [Boolean] = {yarnSchedulerEndpointRef.ask [Boolean] (KillExecutors (executorIds))}

YarnSchedulerEndpointRef is the endpoint side of the same file. See what the specific execution code is:

Case r: RequestExecutors = > amEndpoint match {case Some (am) = > am.ask [Boolean] (r). AndThen {case Success (b) = > context.reply (b) case Failure (NonFatal (e)) = > logError (s "Sending $r to AM was unsuccessful" E) context.sendFailure (e)} (ThreadUtils.sameThread)} case k: KillExecutors = > amEndpoint match {case Some (am) = > am.ask [Boolean] (k). AndThen {case Success (b) = > context.reply (b) case Failure (NonFatal (e)) = > LogError (s "Sending $k to AM was unsuccessful" E) context.sendFailure (e)} (ThreadUtils.sameThread)}

We see that it relays the message to amEndpoint, that is, to ApplicationManager in the yarn project. I'm going to jump to ApplicationManager again to see the implementation logic inside. It's really a twists and turns.

How do you deal with RequestExecutors and KillExecutors messages in ApplicationManager?

Case r: RequestExecutors = > Option (allocator) match {case Some (a) = > if (r.requestedTotal, r.localityAwareTasks, r.hostToLocalTaskCount) R.nodeBlacklist) {resetAllocatorInterval ()} context.reply (true)} case KillExecutors (executorIds) = > Option (allocator) match {case Some (a) = > executorIds.foreach (a.killExecutor)} context.reply (true)

Call the killExecutor and requestTotalExecutorsWithPreferredLocalities methods of allocator. What is allocator? There are too many classes here.

Allocator = client.createAllocator (yarnConf, _ sparkConf, appAttemptId, driverUrl, driverRef, securityMgr, localResources)

It is created by client's createAllocator method. What is client? It's YarnRMClient. We need to take a look at YarnRMClient first. We can probably guess from the name that YarnRMClient is here to apply for Executor and kill Executor from the Yarn machine.

The createAllocator method returns the following YarnAllocator:

Return new YarnAllocator (driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr

LocalResources, SparkRackResolver.get (conf))

Come to YarnAllocator.

YarnAllocator's killExecutor method is easy to understand, which is to release the Container in Yarn:

Def killExecutor (executorId: String): Unit = synchronized {executorIdToContainer.get (executorId) match {case Some (container) if! releasedContainers.contains (container.getId) = > internalReleaseContainer (container) runningExecutors.remove (executorId) case _ = > logWarning (s "Attempted to kill unknown executor $executorId!")}}

Applying for Executor is ultimately implemented in the runAllocatedContainers method.

Take a look at the core code, you can look at the source code completely:

If (runningExecutors.size ()

< targetNumExecutors) { numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(() =>

{try {new ExecutorRunnable (Some (container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr LocalResources) .run () updateInternalState ()} catch {})}

Apply for targetNumExecutors ExecutorRunner, which corresponds to Standalone's application Executor. All right, that's the whole process.

Eventually, you will apply for the required number of Container in the Yarn cluster, and start ExecutorRunner in the Container to report the results to Driver.

The ExecutorRunner here is the YarnCoarseGrainedExecutorBackend thread, which you can see in the ExecutorRunner class.

On how to achieve ScheduleBackend in Yarn to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report