Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What exactly is RDD in Spark?

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "what is the RDD in Spark". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the RDD in Spark?"

Spark is an open source distributed computing engine, which constructs data processing flow based on RDD, schedules tasks among clusters, divides the parallelism of tasks through partition data management mechanism, and exchanges partition data between tasks to achieve distributed data processing.

RDD is the most important concept in Spark. If you understand what RDD is, you can basically understand half of the internal secrets of Spark.

1. RDD base class

RDD is the base class that represents the dataset in Spark and is a serializable object, so the RDD can be replicated in the Spark node. RDD defines a data iterator to loop through the data, as well as various transformation operations on the dataset to generate a new RDD.

Various operators of RDD trigger the generation of a new RDD. Such as:

The map operation generates MapPartitionsRDD.

Filter operations also generate MapPartitionsRDD,filter operations actually encapsulate a layer of filter operations on the previous RDD iterator, which is actually the first iterator, but this iterator will discard some unsatisfied records.

The calculation process of RDD is triggered by the compute method.

1.1 RDD trigger task

The submit process is to submit spark programs to the cluster, which triggers application events, driver events, etc., and creates app and driver by selecting the corresponding node through the master node, and executes the main method in the spark jar package on the node. But the actual execution of task is triggered by the compute action of RDD.

RDD triggers the task through compute and submits the FinalStage to Dag for execution. Methods such as collect () and count () will trigger the compute process and submit the task indirectly.

RDD.compute () = > finalStage = > dag.submitJob () = > submitMissingStage ().

Dag.submitJob () = > scheduleImpl.launchTask () = > scheduleBackend = > executorBackend= > executor.launchTask () = > executorBackend.taskComplete msg = > scheduleBackend.taskCompleted= > dag.stageFinished () = >.

The above is the general process of submitting a task by RDD. The Compute function is the trigger function, which causes the last RDD to be executed, and finalStage;finalStage calls the submitJob function of DAG to submit the stage, where the stage is finalStage.

Stage is strung together from the source to the finalStage, and the execution is searched in the reverse direction. This sentence should be understood well, and this process is actually the secret of RDD.

Let's first take a look at the classic legend of RDD. The Transformation in the middle of the figure is the calculation process of RDD, the HDFS on the left indicates the data source, and the HDFS on the right indicates the operation performed by the finalStage of RDD (the operation in the figure is to write hdfs, of course, it can also be print operation, etc., depending on how you write it).

Stage1 and stage2 are narrow dependencies, map and union are both narrow dependencies; stage3 is a wide dependency, here is the join operation. Narrow dependency means that the operation depends on the data of only one stage, while wide dependency means that it depends on multiple stage, and the data of multiple stage should be fully connected.

1.2.Example RDD execution

RDD gets executed through a call to runJob, as follows:

Def collect (): Array [T] = withScope {val results = sc.runJob (this, (iter: Iterator [T]) = > iter.toArray) Array.concat (results: _ *)}

Sc is SparkContext.

The func operation is performed on each partition, and the result is an Array whose length is equal to the number of partitions.

Sc.runJob adjusts the dagScheduler.runJob method again. Specifically, you can take a look at the job execution steps of DagScheduler, not to mention here, but to see the author's article devoted to DagScheduler.

1.3, iterator

The actual execution of the RDD reads the data through an iterator.

RDD is an abstract class that defines several interfaces:

They are getPartitions, compute and getPreferredLocations. RDD data is partitioned storage, and each partition may be distributed anywhere the spark resource is applied for. These three interfaces can describe all the information of RDD, among which the method of getPreferredLocations is related to computing localization, so let's ignore it here, which does not affect our understanding of the principle of RDD.

Override protected def getPartitions: Array [Partition] = {} override def compute (split: Partition, context: TaskContext): Iterator [java.lang.Integer] = new NextIterator [Integer] {}

We do not need to pay much attention to the getPartitions method, its function is to return a list of partitions, indicating that the RDD has several partitions, and in fact, each partition of the RDD will be arranged to run on a separate node, so as to achieve distributed computing.

What we are most concerned about is the compute method, which returns an iterator, which is the dataset of the split partition of the RDD. As for what the data for this iterator is, it is generated by writing code in the body of the compute method. We can define our own RDD, as long as we write code to implement these methods!

What are the benefits of customizing RDD? The biggest advantage is that you can integrate your data sets into Spark's distributed computing system to help you achieve data partitioning, task allocation, and other RDD to perform fully connected aggregation operations.

Let's get back to the compute method itself.

How to get Iterator [T], for ShuffleRDD, is to get the iterator Iterator [T] from BlockManager. This iterator is blockResult, which is the format in which ShuffleMapTask execution results are saved; the other is to get iter directly, which is the data of ResultTask execution results.

In the first case, let's see if BlockManager can find the BlockResult of this RDD's partition. See that the getOrElseUpdate method also passes a function as the last input parameter. If the specified BlockResult does not exist, the input function is returned to calculate the iter. The method body is defined as follows:

() = > {readCachedBlock = false computeOrReadCheckpoint (partition, context)}

The main thing is to call the computeOrReadCheckpoint method to calculate the partition.

Def computeOrReadCheckpoint (split: Partition, context: TaskContext): Iterator [T] = {if (isCheckpointedAndMaterialized) {firstParent [T] .iterator (split, context)} else {compute (split, context)}}

ComputeOrReadCheckpoint gets Iterator, and if it's checkpoint, call the iterator method of the first parent class to get Iterator, where the parent class is CheckpointRDD; or call the compute method to get Iterator.

Therefore, the actual acquisition of RDD's iterator is divided into two steps:

First, it is determined whether the BlockResult of the RDD specified partition exists, and if so, the BlockResult is taken as the result of the Iterator, which means that the RDD is a shuffleRDD or the like.

Then, if the above is not satisfied, there are two cases: the first is the RDD of checkpoint, then the iterator method of the parent RDD is called (in this case, the parent RDD is CheckpointRDD); otherwise, the compute method is called to get the Iterator.

2. Stage division

We know that RDD commit Spark cluster execution is a phased Stage submission. Starting from the last Stage, the partition of whether or not to call a RDD-dependent Stage,Stage is based on whether or not Shuffle is used as a demarcation point.

If the dep of a RDD is ShuffleDependency, the secondary RDD is submitted as a ShuffleMapTask task, otherwise the last RDD is submitted as a ResultTask.

Recursively commit Stage. For RDD of type ShuffleMapTask, it will always recursively determine whether the RDD has a pre-ShuffleDependency, and if so, rely on RDD before recursive submission.

The entire Spark job is concatenated with RDD, and if there is no Shuffle dependency, the last RDD is submitted, and only this RDD is submitted. When calculating the iterator of the last RDD, the iterator method of the parent RDD is called, and the parent RDD is usually MapPartitionsRDD. It is further described in MapPartitionsRDD.

3. RDD subclass

RDD contains multiple subclasses, such as MapPartitionRDD,HadoopRDD, CoGroupedRDD, and so on. The author will find a few examples here to briefly illustrate their internal logic.

3.1 MapPartitionsRDD

MapPartitionsRDD is a subclass of RDD, and many of the operators seen earlier in RDD generate new MapPartitionRDD.

The constructor of MapPartitionsRDD needs to input the parameter f, which is a functional abstract class or generic class.

F: (TaskContext, Int, Iterator [T]) = > Iterator [U]

There are three parameters for f:

(1) TaskContext: task context

(2) Int: it is a partition code

(3) Iterator [T] is a partition iterator.

The output of f is also an Iterator iterator. As you can see, f is an abstract custom function that generates one iterator from another. The logic of data processing is reflected in f.

The compute method that triggers calculation in MapPartitionRDD is defined as follows:

Override def compute (split: Partition, context: TaskContext): Iterator [U] = f (context, split.index, firstParent [T] .iterator (split, context))

Where f is the input parameter passed in the constructor of MapPartitionRDD, which is the user-defined map function. In this way, through RDD's map, flatmap and other operators and MapPartitionRDD, a series of operations on RDD can be continuously concatenated.

3.2 CoalescedRDD

CoalescedRDD redivides the RDD of M partitions into N partitions to form a new RDD. In the process of calculation, it will lead to Shuffle project.

First of all, CoalescedRDD needs a rezoning algorithm to divide M partitions into N partitions, where M > N. The result of re-partitioning is that each partition of N corresponds to multiple partitions of M, represented by List, and each Int in List represents the number of one of the M partitions in the parent RDD.

If CoalescedRDD does not specify its own rezoning algorithm, DefaultPartitionCoalescer is used for the rezoning calculation.

The compute process for CoalescedRDD is as follows:

Override def compute (partition: Partition, context: TaskContext): Iterator [T] = {partition.asInstanceOf [coalescedRDDPartition] .parents.iterator.flatMap {parentPartition = > firstParent.iterator (parentPartition, context)}}

Partition.parents refers to the partition list of the parent RDD corresponding to the partition of CoalescedRDD. For each partition in the partition list, execute:

FirstParent [T] .iterator (parentPartition, context)

And then get the final Iterator [T]. This part should not be difficult to understand.

It is important to note that the Iterator [T] you get here will eventually be written to Shuffle, because CoalescedRDD corresponds to ShuffleMapTask rather than ResultTask.

For understanding the Spark computing process, understanding the Shuffle process will solve half of the confusion.

Thank you for your reading, the above is the content of "what is the RDD in Spark?" after the study of this article, I believe you have a deeper understanding of what the RDD in Spark is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report