In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "DAG task decomposition and how to use Shuffle RDD". In daily operation, I believe many people have doubts about DAG task decomposition and how to use Shuffle RDD. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "DAG task decomposition and how to use Shuffle RDD". Next, please follow the editor to study!
1. DagScheduler analysis
The DagScheduler function is mainly responsible for the decomposition and task submission of each stage of the RDD. Stage decomposition starts from the finalStage that triggers the task scheduling process to find the parent stage backwards, and if the parent stage does not submit the task, it loops to submit the missing parent stage. Each stage has the concept of a parent RDD, creating multiple tasks (Task) according to the number of partitions.
The scheduling of Task is actually accomplished through TaskSchedulerImp, and different Backend will be used in TaskSchedulerImp depending on the deployment of the environment. For example, the Backend of Yarn cluster and independent cluster is different. Here is a concept, let's not delve into Backend.
Let's first take a look at the core logic of DagScheduler. The first method to be studied is:
Def submitMissingTasks (stage: Stage, jobId: Int)
This method is to submit the stage for execution. Why is it called this name? It indicates that the stage here needs to be submitted for execution first, and there is no other dependent stage that has not yet been executed.
The submitMissingTasks method creates two types of task,ResultTask and ShuffleMapTask based on RDD dependencies.
Step by step, only look at the key code, because the overall code is too much to understand the key logic.
1.1 generate serialized taskBinary
TaskBinaryBytes = stage match {case stage: ShuffleMapStage = > JavaUtils.bufferToArray (closureSerializer.serialize ((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage = > JavaUtils.bufferToArray (closureSerializer.serialize ((stage.rdd, stage.func): AnyRef))}
The taskBinaryBytes will be encapsulated and distributed to the remote Executor for execution later, so it must be serializable.
The main difference between the two is that the input parameter of ShuffleMapStage is the dependent shuffleDep; and the input parameter of ResultStage is the definition func of the function.
1.2 generate task
Now that you have taskBinaryBytes, the next step is to generate Task.
Val tasks: Seq [Task [_]] = try {stage match {case stage: ShuffleMapStage = > stage.pendingPartitions.clear () partitionsToCompute.map {id = > val locs = taskIdToLocations (id) val part = partitions (id) stage.pendingPartitions + = id new ShuffleMapTask (stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option (jobId) Option (sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier ()} case stage: ResultStage = > partitionsToCompute.map {id = > val p: Int = stage.partitions (id) val part = partitions (p) val locs = taskIdToLocations (id) new ResultTask (stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics Option (jobId), Option (sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier ()} catch {case NonFatal (e) = > abortStage (stage, s "Task creation failed: $e\ n ${Utils.exceptionString (e)}", Some (e)) runningStages-= stage return}
There are two Task types: ShuffleMapTask and ResultTask. The main thing here is that for Task, how many partition will generate how many Task,Task is to the partition dimension, not to the RDD dimension, this concept must be clear.
1.3 submit Task
The final step is to submit the task for execution. TaskScheduler is going to be used here, and of course, taskScheduler here currently means TaskSchedulerImp.
TaskScheduler.submitTasks (new TaskSet (tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
Another method that can be mentioned here in DagScheduler is:
SubmitWaitingChildStages (stage)
This method is to submit the waiting stage waiting for the current stage to execute, so that the entire scheduling process of the DAG is complete.
2. Task execution
There are two Task types: ShuffleMapTask and ResultTask.
2.1 ResultTask
Let's first look at the execution of ResultTask, which is relatively simple. The core mode is runTask, and the core code is:
Override def runTask (context: TaskContext): U = {val ser = SparkEnv.get.closureSerializer.newInstance () val (rdd, func) = ser.deserialize [(RDD [T], (TaskContext, Iterator [T]) = > U)] (ByteBuffer.wrap (taskBinary.value), Thread.currentThread.getContextClassLoader) func (context, rdd.iterator (partition, context))}
Deserialize RDD and func, then execute rdd's iterator method to get the dataset, and execute the func function on this dataset, note that this is actually an iterative process rather than multiple iterations.
2.2 ShuffleMapTask
The execution of ShuffleMapTask tasks is relatively complex.
The core method is still runTask, the core code:
Override def runTask (context: TaskContext): MapStatus = {val ser = SparkEnv.get.closureSerializer.newInstance () val rddAndDep = ser.deserialize [(RDD [_], ShuffleDependency [_, _])] (ByteBuffer.wrap (taskBinary.value), Thread.currentThread.getContextClassLoader) val rdd = rddAndDep._1 val dep = rddAndDep._2 dep.shuffleWriterProcessor.write (rdd, dep, partitionId, context, partition)}
First deserialize the RDD and the dependency ShuffleDependency. Then write the data to RDD using ShuffleWriterProcessor.
In fact, the dep here does not make much sense, mainly to determine whether to merge and use it, without affecting the understanding of the entire shuffle process, so we can forget about dep:
Dep.shuffleWriterProcessor.write (rdd, dep, partitionId, context, partition)
The rdd here is actually the dataset to be generated by ShuffleMapTask. What exactly does this code mean? ShuffleWriterProcessor actually writes the dataset to BlockManager, so let's take a look at the meaning of ShuffleWriterProcessor.
2.3 ShuffleWriterProcessor
Take a look at the definition of the key methods of ShuffleWriterProcessor.
Def write (rdd: RDD [_], dep: ShuffleDependency [_, _], partitionId: Int, context: TaskContext,partition: Partition): MapStatus = {var writer: ShuffleWriter [Any, Any] = null val manager = SparkEnv.get.shuffleManager writer = manager.getWriter [Any, Any] (dep.shuffleHandle, partitionId, context, createMetricsReporter (context)) writer.write (rdd.iterator (partition, context) .asInstanceOf [Iterator [_
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.