In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
Today, I would like to talk to you about how to analyze CoarseGrainedSchedulerBackend and CoarseGrainedExecutorBackend. Many people may not know much about it. In order to make you understand better, the editor has summarized the following contents for you. I hope you can get something from this article.
CoarseGrainedSchedulerBackend is used on the Driver side and CoarseGrainedExecutorBackend is used on the Executor side. They are all Backend. What is Backend? Backend is actually responsible for end-to-end communication, and the Backend of these two CoarseGrained is responsible for the communication between Driver and Executor.
What is Driver?
Driver is the spark code we wrote, and the main function in it is the code that Driver runs.
What is Executor?
Executor is the place where the Task task of spark is executed. After Backend receives the LaunchTask message of Driver, it calls the launchTask method of the Executor class to execute the task.
Driver will start CoarseGrainedSchedulerBackend, apply for a machine from the cluster to start Executor through CoarseGrainedSchedulerBackend, find a machine, send a command to start an ExecutorRunner,ExecutorRunner to start CoarseGrainedExecutorBackend to register with Driver, and create an Executor to process the requests received by CoarseGrainedExecutorBackend. What I just said is the process under Standalone deployment, which is mostly similar under Yarn, except that the step of applying for machines from the cluster to start Executor is quite different. Let's talk about this briefly.
In the Yarn environment, the deployment of machines and the distribution of partition tasks are completed together through several class-level yarn functions in the spark-yarn project.
Spark-yarn contains two files: client.java and ApplicationMaster.java.
The client.java function is to request resources from yarn to execute the code of ApplicationMaster.java, so here we mainly look at what the code function of ApplicationMaster.java is.
ApplicationMaster first does two things, starting a "/ bin/mesos-master" and multiple "/ bin/mesos-slave", both of which are applied for resources from yarn and then deployed to execute. They are all functional parts of yarn. "/ bin/mesos-master" and "/ bin/mesos-slave" are two bin programs included in the yarn environment, which can be regarded as Master and Worker similar to the Standalone environment.
The launchContainer method is to start the container of yarn, that is, start "/ bin/mesos-slave" on container, and mesos-slave will register with mesos-master. After all the required slave node resources have been started, call the startApplication () method to start the Driver execution.
StartApplication () method:
/ / Start the user's application private void startApplication () throws IOException {try {String sparkClasspath = getSparkClasspath (); String jobJar = new File ("job.jar"). GetAbsolutePath (); String javaArgs = "- Xms" + (masterMem-128) + "m-Xmx" + (masterMem-128) + "m"; javaArgs + = "- Djava.library.path=" + mesosHome + "/ lib/java"; String substitutedArgs = programArgs.replaceAll ("\ [MASTER\]", masterUrl) If (mainClass.equals (")) {javaArgs + ="-cp "+ sparkClasspath +"-jar "+ jobJar +" + substitutedArgs;} else {javaArgs + = "- cp" + sparkClasspath + ":" + jobJar + "" + mainClass + "" + substitutedArgs;} String java = "java" If (System.getenv ("JAVA_HOME")! = null) {java = System.getenv ("JAVA_HOME") + "/ bin/java";} String bashCommand = java + "" + javaArgs + "1 >" + logDirectory + "/ application.stdout" + "2 >" + logDirectory + "/ application.stderr"; LOG.info ("Command:" + bashCommand) String [] command = new String [] {"bash", "- c", bashCommand}; String [] env = new String [] {"SPARK_HOME=" + sparkHome, "MASTER=" + masterUrl, "SPARK_MEM=" + (slaveMem-128) + "m"}; application = Runtime.getRuntime (). Exec (command, env) New Thread ("wait for user application") {public void run () {try {appExitCode = application.waitFor (); appExited = true; LOG.info ("User application exited with code" + appExitCode);} catch (InterruptedException e) {e.printStackTrace ();} .start () } catch (SparkClasspathException e) {unregister (false); System.exit (1); return;}}
This is the start of Driver, masterUrl is the address of "bin/mesos-master", set to the environment variable "MASTER" to use, the address format of master under yarn is "mesos://host:port", and that of Standalone is "spark://host:port".
Under SparkContext, different processing will be done according to the master address format. This code is as follows:
Master match {case "local" = > checkResourcesPerTask (clusterMode = false, Some (1) val scheduler = new TaskSchedulerImpl (sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend (sc.getConf, scheduler, 1) scheduler.initialize (backend) (backend Scheduler) case LOCAL_N_REGEX (threads) = > def localCpuCount: Int = Runtime.getRuntime.availableProcessors () / / local [*] estimates the number of cores on the machine Local [N] uses exactly N threads. Val threadCount = if (threads = "*") localCpuCount else threads.toInt if (threadCount def localCpuCount: Int = Runtime.getRuntime.availableProcessors () / / local [*, M] means the number of cores on the computer with M failures / / local [N, M] means exactly N threads with M failures val threadCount = if (threads = "*") localCpuCount else threads.toInt checkResourcesPerTask (clusterMode = false, Some (threadCount)) val scheduler = new TaskSchedulerImpl (sc, maxFailures.toInt) IsLocal = true) val backend = new LocalSchedulerBackend (sc.getConf, scheduler, threadCount) scheduler.initialize (backend) (backend, scheduler) case SPARK_REGEX (sparkUrl) = > checkResourcesPerTask (clusterMode = true, None) val scheduler = new TaskSchedulerImpl (sc) val masterUrls = sparkUrl.split (","). Map ("spark://" + _) val backend = new StandaloneSchedulerBackend (scheduler, sc MasterUrls) scheduler.initialize (backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) = > checkResourcesPerTask (clusterMode = true, Some (coresPerSlave.toInt)) / / Check to make sure memory requested memoryPerSlaveInt) {throw new SparkException ("Asked to launch cluster with% d MiB RAM / worker but requested% d MiB/worker" .format (memoryPerSlaveInt Sc.executorMemory)} val scheduler = new TaskSchedulerImpl (sc) val localCluster = new LocalSparkCluster (numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start () val backend = new StandaloneSchedulerBackend (scheduler, sc, masterUrls) scheduler.initialize (backend) backend.shutdownCallback = (backend: StandaloneSchedulerBackend) = > {localCluster.stop ()} (backend Scheduler) case masterUrl = > checkResourcesPerTask (clusterMode = true, None) val cm = getClusterManager (masterUrl) match {case Some (clusterMgr) = > clusterMgr case None = > throw new SparkException ("Could not parse Master URL:'" + master + "")} try {val scheduler = cm.createTaskScheduler (sc, masterUrl) val backend = cm.createSchedulerBackend (sc, masterUrl, scheduler) cm.initialize (scheduler Backend) (backend, scheduler)} catch {case se: SparkException = > throw se case NonFatal (e) = > throw new SparkException ("External scheduler cannot be instantiated", e)}
If it is yarn, it falls to the last case statement:
Case masterUrl = > checkResourcesPerTask (clusterMode = true, None) val cm = getClusterManager (masterUrl) match {case Some (clusterMgr) = > clusterMgr case None = > throw new SparkException ("Could not parse Master URL:'" + master + "")} try {val scheduler = cm.createTaskScheduler (sc, masterUrl) val backend = cm.createSchedulerBackend (sc, masterUrl, scheduler) cm.initialize (scheduler) Backend) (backend, scheduler)} catch {case se: SparkException = > throw se case NonFatal (e) = > throw new SparkException ("External scheduler cannot be instantiated", e)}
The ClusterManager class will be used here, but what is this? This is the difficulty of spark, because there are too many concepts involved.
Private def getClusterManager (url: String): Option [ExternalClusterManager] = {val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load (loader). AsScala.filter (_ .canCreate (url)) if (serviceLoaders.size > 1) {throw new SparkException (s "Multiple external cluster managers registered for the url $url: $serviceLoaders")} serviceLoaders.headOption}
Find all the ExternalClusterManager classes and subclasses, and see which class's canCreate method returns true to url. Here we are looking for classes that satisfy "mesos://host:port".
After reading the above, do you have any further understanding of how to analyze CoarseGrainedSchedulerBackend and CoarseGrainedExecutorBackend? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.