In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, Xiaobian will bring you about how to understand spark shuffle. The article is rich in content and analyzed and described from a professional perspective. After reading this article, I hope you can gain something.
shuffle Overview
A spark RDD consists of a fixed set of partitions, each of which consists of a series of records. For RDDs returned by narrowly dependent transformations (such as map and filter), partition information from the parent RDD is continued and computed as a pipeline. Each object depends only on a single object in the parent RDD. Operations such as coalesce may result in tasks processing multiple input partitions, but transitions are still considered narrowly dependent because partitions of a parent RDD are inherited by only one child RDD partition.
Spark also supports widely dependent transformations such as groupByKey and reduceByKey. Among these dependencies, the data needed to compute records in a single partition can come from many partitions of the parent dataset. To perform these transformations, all tuples with the same key must end up in the same partition and processed by the same task. To meet this requirement, Spark generates a shuffle that transfers data within the cluster and generates a new stage with a new set of partitions.
See the following code snippet:
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()
The code snippet above has only one action operation, count, and three transformations from the input textfile to action. This code will only run in a single stage because there is no shuffle for the three transformations, meaning that each partition of the three transformations is a single partition that depends only on its parent RDD.
However, the following word statistics are very different from the above:
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._ 2 >= 1000)
val charCounts = filtered.flatMap(_._ 1.toCharArray).map((_, 1)).reduceByKey(_ + _)
charCounts.collect()
There are two reducebykey operations and three stages in this code.
The following diagram is more complicated because there is a join operation:
The pink box surrounds the stage division of the entire DAG.
At the boundary of each stage, tasks in the parent stage write data to disk, and tasks in the child stage read data over the network. Since they result in high disk and network IO, shuffles are expensive and should be avoided. The number of partitions in a parent stage is often different from the number of partitions in a child stage. The operator that triggers shuffle can often specify the number of partitions, that is, numPartitions represents how many partitions the next stage will have. Just as the reducer's data is an important parameter in mr tasks, the number of partitions specified in shuffle will largely determine the performance of an application.
Optimize shuffle
Often you can choose to substitute action and transform that produce the same result. But not all operators that produce the same result have the same performance. Avoiding common pitfalls and choosing the right operators can significantly improve application performance.
When selecting a conversion operation, minimize the number of shuffles and the amount of data in the shuffle. Shuffle is a very performance-intensive operation. All shuffle data is written to disk and transmitted over the network. Operations of type partition, join, cogroup, and *By or *ByKey generate shuffle. We can optimize several operators:
1. groupByKey can be replaced by reduceByKey in some cases.
2. reduceByKey can be replaced by aggregateByKey in some cases.
3. flatMap-join-groupBy can be replaced by cgroup in some cases.
no shuffle
In some cases, the transformation operations described above do not result in shuffle. Spark does not generate shuffle when a previous conversion operation has used the same partitioner partition data as shuffle.
For example:
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)
Because redcuebykey is used without a partition specified, it uses the default partition, which results in rdd1 and rdd2 using hash partitions. Two reducebykey operations produce two shuffle procedures. If the dataset has the same number of partitions, no extra shuffle is required to perform the join operation. Because the partitions of the dataset are identical, the key set in any single partition of rdd1 can only appear in a single partition of rdd2. Therefore, the contents of any single output partition of rdd3 depend only on the contents of a single partition in rdd1 and a single partition in rdd2, and a third shuffle is not required.
For example, if someRdd has four partitions, someOtherRdd has two partitions, and reduceByKeys all use three partitions, the task set that runs is as follows:
If rdd1 and rdd2 use different partitioners or the same partitioner has different number of partitions, only one dataset needs to be shuffled during join
To avoid shuffles during join, broadcast variables can be used. While executor memory can store data sets, it can be loaded into a hash table on the driver side and broadcast to executor. The map transformation can then reference hash tables to perform lookups.
Add shuffle
Sometimes it is necessary to break the rule of minimizing the number of shuffles.
Additional shuffles are advantageous when increasing parallelism. For example, if some files in the data are indivisible, then the partition corresponding to the large file will have a large number of records, rather than spreading the data into as many partitions as possible to use all the CPU applications. In this case, repartition is used to regenerate more partitions to satisfy the parallelism required for the later transformation operator, which improves performance significantly.
Aggregating data to the driver side using reduce and aggregate operations is also a good example of modifying the number of extents.
Aggregation in a driver's single thread can be a bottleneck when performing aggregation on a large number of partitions. To reduce the load on the driver, you can first perform a round of distributed aggregation using reducebykey or aggregatebykey, while reducing the number of partitions in the resulting dataset. The actual idea is to first perform initial aggregation within each partition, while reducing the number of partitions, and then send the aggregation results to the driver side to achieve final aggregation. Typical operations are treeReduce and treeAggregate.
This is especially true when aggregations have been grouped by key. For example, suppose a program calculates the number of occurrences of each word in a corpus and returns the results to the driver using a map. One way to do this is to compute a local map on each partition using an aggregation operation, and then merge the maps in the driver. You can use aggregateByKey to make statistics in a fully distributed way, and then simply use collectAsMap to return the results to the driver.
The above is how to understand the spark shuffle shared by Xiaobian. If there is a similar doubt, please refer to the above analysis for understanding. If you want to know more about it, please pay attention to the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.