In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
[TOC]
First, the execution process of the wordcount program import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf, SparkContext} object WordCount {def main (args: Array [String]): Unit = {/ / create the spark profile object. Sets the app name, master address, and local as the local mode. / / if it is submitted to the cluster, it is usually not specified. Because it is possible to run on multiple cluster sinks, it is inconvenient to write val conf = new SparkConf (). SetAppName ("wordCount") / / create a spark context object val sc = new SparkContext (conf) sc.textFile (args (0)). FlatMap (_ .split (")) .map ((_, 1)) .reduceByKey (_ + _) .saveAsTextFile (args (1)) sc.stop ()}}
The core code is very simple. First, look at the function textFile.
SparkContext.scala def textFile (path: String, minPartitions: Int = defaultMinPartitions): RDD [String] = withScope {assertNotStopped () / / specify the file path, the input format class is textinputformat, the output key type is longwritable, the output value type is text / / map (pair = > pair._2.toString), take out the previous value, and then convert value to string type / / finally return the processed value as a new list That is, RDD [String] / / setName (path) sets the file name to the path hadoopFile (path, classOf [TextInputFormat], classOf [LongWritable], classOf [Text], minPartitions) .map (pair = > pair._2.toString) .setName (path)} the key operation is to return a hadoopFile with several parameters: path: file path classOf [TextInputFormat]: this is actually the processing class of the input file, that is, TextInputFormat analyzed in mr. In fact, it is directly used, no doubt, it is the classOf [LongWritable], classOf [Text]: these two can actually be guessed, that is, the type of key and value entered. Then a map (pair = > pair._2.toString) is executed to convert the value in KV to string type
Let's move on to the hadoopFile method.
Def hadoopFile [K, V] (path: String, inputFormatClass: Class [_ FileInputFormat.setInputPaths (jobConf, path) / / see here. The final return is a HadoopRDD object / / specified sc object, configuration file, input method class, KV type, number of partitions new HadoopRDD (this, confBroadcast, Some (setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions) .setName (path)}
Finally, the HadoopRDD object is returned.
Then there is flatMap (.split (")) .map ((, 1)), which is relatively simple.
FlatMap (_ .split ("")) is to cut each line of input according to a space, and then the cut element is called a new array. The numbers generated by each row are then combined into a large array. Map ((_, 1)) counts each element to form a KV pair, where K is the element and V is 1.
Keep looking. ReduceByKey (_ + _)
In fact, this is to aggregate and group the KV of the same key, and then add the value of the same key, and finally get the corresponding value of a key, that is, the number of words. Look at the function def reduceByKey (func: (V, V) = > V): RDD [(K, V)] = self.withScope {reduceByKey (defaultPartitioner (self), func)}. The default number of partitions is 2. HashPartitioner is used for partition. You can specify the minimum number of partitions. 2. Resource scheduling of spark 2.1 Resource scheduling process
figure 2.1 spark resource scheduling
1. Execute the submit command and start a spark-submit process on the client client (used to request resources for Driver).
2. Apply for resources from Master for Driver, and add the information to be applied for by this Driver in the waitingDrivers collection of Master. Master looks at the works collection and picks out the appropriate Work node.
3. Start the Driver process in the selected Work node (the Driver process has been started, the mission of spark-submit has been completed, close the process). So in fact, driver also needs resources, and it's just a thread running on executor.
4. The Driver process requests resources for the Application to run (this resource refers to the Executor process). At this point, the resource information requested by this Application will be added to the waitingApps of Master. At this point, it is necessary to calculate which Worker nodes (how many resources to use for each node) are needed according to the requirements of the resources applied for. Start the Executor process on these nodes.
(note: polling starts Executor. Executor takes up 1G of memory on this node and all the core that this Worker can manage)
5. At this point, Driver can distribute tasks to the Executor process of each Worker node to run.
Three sets in Master
Val works = new HashSet [WorkInfo] () works collection uses the HashSet array to store the node information of the work, which avoids storing duplicate work nodes. Why avoid repetition? First of all, we need to know that the work node may die for some reason, and then the next time it communicates with master, it will be reported to master. This node will die, and then master will remove the node from the works object and add it again when the next time the node is used. In this way, there are theoretically no duplicate work nodes. However, there is a special case: when the work is down and starts on its own before the next communication, there will be duplicate work messages in the works. val waitingDrivers = new ArrayBuffer [DriverInfo] () when the client applies for a resource for Driver from master, the information about the applied Driver will be encapsulated in the DriverInfo of the master node, and then added to the waitingDrivers. Master monitors the waitingDrivers object. When the elements in the waitingDrivers collection are not empty, it means that a client has applied for resources from master. At this point, you should first look at the works collection, find the worker node that meets the requirements, and start Driver. When Driver starts successfully, the application information is removed from the waitingDrivers object. val waitingApps = new ArrayBuffer [ApplicationInfo] () Driver is started successfully, and the resource is applied to master for application, and the application information is stored in the waitingApps object of the master node. Similarly, when the waitingApps collection is not empty, Driver has requested resources from Master for the current Application. At this point, check the workers collection to find the appropriate Worker node to start the Executor process. By default, each Worker only launches one Executor for each Application, and this Executor uses 1G of memory and all core. Remove the application information from the waitingApps object after starting Executor. Note to : it is mentioned above that master will monitor these three sets, so how is it monitored? master does not specifically monitor these three collections by separate threads, which is relatively wasteful. Master is actually 'monitoring' changes to these three collections, and when one of these three sets changes (add or delete), the schedule () method is called. The processing logic mentioned above is encapsulated in the schedule method. 2.2 relationship between application and executor
1. By default, each Worker only launches one Executor for each Application. By default, each Executor uses 1GB of memory and all the core that this Worker can manage.
2. If you want to start multiple Executor on a single Worker, specify the number of core used by the Executor when submitting the Application (avoid using all the core of the worker). Submit command: spark-submit-- executor-cores
3. By default, Executor starts by polling, which is conducive to data localization to a certain extent.
What is polling startup? Why do you want to start training in rotation?
polling startup: polling startup is one by one startup. For example, there are five people here, each giving out an apple and a banana. The distribution idea of polling is: five people share an apple first, and then distribute bananas after the apples are distributed.
Why does use polling to start? When we do big data calculation, the first thing we must think about is to calculate and find data. Calculate directly in the place where the data is stored, rather than moving the data over and then calculating it. We have n Worker nodes, if only on the node where the data is stored. Only a few Worker are used to calculate, and most of the worker are idle. This kind of scheme is definitely not feasible. So we use polling to start Executor, first allowing one task on each node.
The nodes where stores data do not need to transmit data over the network, so they must be fast and execute a large number of task. This will not waste cluster resources, but also can be calculated in the node where the data is stored, which is also conducive to the localization of the data to a certain extent.
2.3 coarse-grained scheduling of spark
Coarse-grained (rich second generation):
The resource will be requested before the task is executed, and will not be released until all the task has been executed. Pros: before each task is executed. You don't have to apply for resources by yourself to save start-up time. Cons: resources will not be released until all task execution is completed (that is, the entire job execution is complete), and the resources of the cluster will not be fully utilized. This is the scheduling granularity used by spark, mainly to make stage,job,task execution more efficient.
Fine grain (poor second generation):
When the Application is submitted, each task applies for the resources on its own, and the task applies for the resources. After the task is executed, the resources will be released immediately. Advantages: resources will be released immediately after the execution of each task, which is conducive to making full use of resources. Disadvantages: due to the need for each task to apply for resources, the startup time of task is too long, which in turn leads to the extension of startup time of stage, job and application. 2.4 Resource restrictions on spark-submit submission tasks
When we submit a task, we can specify some parameters for resource restrictions:
-- executor-cores: the number of core used by a single executor. If not specified, all the core--executor-memory that can be called by the worker are used by default: the amount of memory used by a single executor, such as 1G. The default is 1G--total-executor-cores: the maximum number of core used by the entire application to prevent monopolization of the entire cluster resources. 3. The whole spark resource scheduling + task scheduling process 3.1 Total scheduling process
Https://blog.csdn.net/qq_33247435/article/details/83653584#3Spark_51
The scheduling of an application needs to go through the following stages:
Application-- > Resource scheduling-> Task scheduling (task)-> parallel Computing-> completion
figure 3.1 spark scheduling process
As you can see, after driver starts, there are two objects:
DAGScheduler: according to the width dependence of RDD, the DAG directed acyclic graph is cut into stage, the stage is encapsulated into another object taskSet,taskSet=stage, and then the taskSet is given to taskScheduler one by one. After taskScheduler:taskSeheduler takes down the taskSet, it traverses the taskSet, gets each task, then calls the method on the HDFS, gets the location of the data, and distributes the task to the thread pool in the Executor process of the response Worker node according to the location of the obtained data. And will monitor according to the execution of each task, and when all task execution is completed, tell master to kill all executor.
Task scheduling mainly involves the following processes:
1) DAGScheduler: according to the width dependence of RDD, the DAG directed acyclic graph is cut into stage, the stage is encapsulated to another object taskSet,taskSet=stage, and then the taskSet is given to taskScheduler one by one. 2) after taskScheduler:taskSeheduler takes down the taskSet, it traverses the taskSet, gets each task, then calls the method on HDFS to get the location of the data, and distributes the task to the thread pool in the Executor process of the response Worker node according to the location of the obtained data. 3) the taskScheduler:taskScheduler node will track the execution of each task. If the execution fails, the TaskScher will try to resubmit. By default, it will retry the submission three times. If the retry fails three times, then the stage of the task fails, and the TaskScheduler will report to the DAGScheduler. 4) DAGScheduler: after receiving the failed stage request, DAGSheduler will resubmit the failed stage, and the successful stage will not repeat the submission, but will only retry the failed stage. (note: if DAGScheduler fails after four retries, then the job fails and job will not try again
The concept of lagging tasks:
When more than 75% of the task runs successfully, it will be calculated every 100 seconds, calculating the current median execution time of all unsuccessful tasks * 1.5. any task that takes longer than this is a struggling task.
Overall scheduling process:
= = Resource scheduling = = 1, start Master and standby Master (if the cluster is highly available, you need to start standby Master, otherwise there is no standby Master). 2. Start the Worker node. The Worker node registers with Master when it starts successfully. Add your own information to the works collection. 3. Submit the Application on the client side and start the spark-submit process. Pseudo code: spark-submit-- master-- deploy-mode cluster-- class jarPath4 and Client apply for resources for Driver from Master. After the application information arrives at the Master, add the application information for the Driver to the waitingDrivers collection of the Master. 5. When the waitingDrivers collection is not empty, call the schedule () method, Master looks up the works collection, and launches the Driver in the eligible Work node. After starting Driver successfully, the application information in the waitingDrivers collection is removed. The spark-submit process for the Client client shuts down. (after Driver starts successfully, the DAGScheduler object and TaskSchedule object will be created. 6. When the TaskScheduler is created successfully, it will apply for resources from the Master Application. After the application request is sent to the Master side, the application information is added to the waitingApps collection. 7. When the element in the waitingApps collection changes, the schedule () method is called. Find the works collection and start the Executor process on the worker node that meets the requirements. 8. When the Executor process starts successfully, the application information in the waitingApps collection will be removed. And reverse register with TaskSchedule. At this point, TaskSchedule has a batch of Executor list information. = = Task scheduling = = 9. According to the width dependence of RDD, cut job and divide stage. Each stage is made up of a set of task. Each task is a pipleline computing model. 10. TaskScheduler distributes the task according to the data location. (how did taskScheduler get the data location? TaskSchedule calls HDFS's api to get the block block of the data and the location information of the block block) 11. TaskSchedule distributes the task and monitors the execution of the task. 12. If the task execution fails or struggles. Will retry this task. The default is to retry three times. 13. If you try again three times, you still fail. The task is returned to DAGScheduler,DAGScheduler and the failed stage is retried (only the failed stage is retried). Retry four times by default. 14. Tell master to kill the executor in the cluster and release resources.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.