In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the relevant knowledge of "how the Shuffle process is". In the operation of actual cases, many people will encounter such a dilemma. Then let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
In the MapReduce framework, shuffle is the bridge between Map and Reduce. The output of Map must go through shuffle if it is used in Reduce. The performance of shuffle directly affects the performance and throughput of the whole program. As an implementation of MapReduce framework, Spark also implements the logic of shuffle.
Shuffle
Shuffle is a specific phase in the MapReduce framework, which is between Map phase and Reduce phase. When the output of Map is to be used by Reduce, the output needs to be hashed by key and distributed to each Reducer. This process is shuffle. Because shuffle involves disk reading and writing and network transmission, the performance of shuffle directly affects the running efficiency of the whole program.
The following diagram clearly describes the entire flow of the MapReduce algorithm, where shuffle phase is between Map phase and Reduce phase.
Conceptually, shuffle is a bridge to communicate data connections, so in fact, the implementation mechanism of this part of shuffle (partition) is as follows.
1 、 Spark Shuffle
Take the figure as an example to briefly describe the whole process of shuffle in Spark:
First of all, each Mapper will create the corresponding number of bucket,bucket according to the number of Reducer is M × RM × R, where MM is the number of Map and RR is the number of Reduce.
Secondly, the results produced by Mapper will be populated into each bucket according to the set partition algorithm. The partition algorithm here can be customized, and of course the default algorithm is to go to different bucket according to key hash.
When Reducer starts, it takes the corresponding bucket from the remote or local block manager according to the id of its own task and the id of the Mapper it depends on as the input of Reducer.
Bucket here is an abstract concept. In the implementation, each bucket can correspond to a file, part of a file or something else.
The Shuffle process of Apache Spark is similar to the Shuffle process of Apache Hadoop. Some concepts can be applied directly. For example, in the Shuffle process, one end of providing data is called Map side, and each task of generating data on Map side is called Mapper, and the corresponding end of receiving data is called Reduce side. Every task of pulling data on Reduce side is called Reducer,Shuffle process, which essentially divides the data obtained by Map side using a divider. The process of sending data to the corresponding Reducer.
2 、 Shuffle Write
In versions of Spark 0.6 and 0.7, the storage of shuffle data is stored in block manager as a file, following the same strategy as rdd.persist (StorageLevel.DISk_ONLY). See:
You can see that Spark creates a bucket for each Reducer in each Mapper and puts the result of the RDD calculation into the bucket. It is important to note that each bucket is an ArrayBuffer, which means that the output of the Map is first stored in memory.
Spark then writes the Map output from ArrayBuffer to the disk managed by block manager, where the file is named as: shuffle_ + shuffle_id + "_" + map partition id + "_" + shuffle partition id.
There were two big problems with early shuffle write:
The output of l Map must be stored in memory and then written to disk. This is a very large overhead for memory, and OOM occurs when there is not enough memory to store all the Map output.
Each Mapper will generate Reducer number shuffle files. If the number of Mapper is 1k and the number of Reducer is 1k, then 1m shuffle files will be generated, which is a great burden on the file system. At the same time, in the case of a small amount of shuffle data and a large number of shuffle files, random writing will seriously degrade the performance of IO.
In version 0.8 of Spark, shuffle write takes a different approach from RDD block write and creates a separate ShuffleBlockManager for shuffle write, which partially solves the problems encountered in versions 0.6 and 0.7.
First, let's take a look at the implementation of Spark 0.8:
In this release, a new class ShuffleBlockManager has been added for shuffle write, and ShuffleBlockManager allocates and manages bucket. At the same time, ShuffleBlockManager assigns a DiskObjectWriter to each bucket, and each write handler has a cache of the default 100KB, which is used to write the Map output to the file. You can see that the current write mode is buckets.writers (bucketId) .write (pair), which means that the key-value pair of Map output is written to disk one by one instead of storing all the data in memory in advance and flush to disk as a whole.
The code for ShuffleBlockManager is as follows:
Spark 0.8 significantly reduces the memory pressure on shuffle. Now Map output does not need to store it all in memory and then flush it to the hard disk. Instead, record-by-record writes to disk. At the same time, the management of shuffle files is also managed independently of the new ShuffleBlockManager, rather than with rdd cache files.
But this version of Spark 0.8's shuffle write still has two big problems unsolved:
First of all, it is still the problem of too many shuffle files, too many shuffle files will cause excessive pressure on the file system, and second, it will reduce the throughput of IO.
Second, although Map output data no longer need to be pre-evaluate in memory, which significantly reduces the memory pressure, the buffer overhead caused by the newly introduced DiskObjectWriter is also a memory overhead that can not be ignored. Suppose there are 1k Mapper and 1k Reducer, then there will be 1m bucket and 1m write handler at the same time, and each write handler requires 100KB memory by default, so a total of 100GB memory is required. In this way, only buffer needs so much memory, and the memory cost is staggering. Of course, in reality, if the 1k Mapper are run in time sharing, the memory required will only be the size of cores * reducer numbers * 100KB. However, if there is a large number of reducer, the memory cost of this buffer is also quite high.
To address the problem of too many shuffle files, Spark 0.8.1 introduced a new shuffle consolidation with a view to significantly reducing the number of shuffle files.
First of all, use the illustration to introduce the principle of shuffle consolidation.
Suppose the job has four Mapper and four Reducer and two core, that is, it can run two task in parallel. It can be calculated that the shuffle write of Spark requires a total of 16 bucket, so there are 16 write handler. In previous versions of Spark, each bucket corresponds to a file, so 16 shuffle files are generated here.
In shuffle consolidation, each bucket does not correspond to a file, but to a segment in the corresponding file. At the same time, the number of shuffle files generated by shuffle consolidation is also related to the number of Spark core. In the above illustration, the four Mapper of job are run in two batches. When the first batch of two Mapper runs, it will apply for eight bucket and generate eight shuffle files; while when the second batch of Mapper runs, the eight bucket applied for will not generate eight new files, but will be appended to the previous eight files, so that there are only eight shuffle files in total, while there are 16 different segment files inside the file. Therefore, theoretically speaking, the number of shuffle files generated by shuffle consolidation is C × R, where C is the number of core number,R of Spark cluster is the number of Reducer.
It is important to note that when masking C, shuffle consolidation produces the same number of files as the previous implementation.
Shuffle consolidation significantly reduces the number of shuffle files and solves a serious problem in the previous version, but the buffer overhead of writer handler is still not reduced. If you want to reduce the buffer overhead of writer handler, you can only reduce the number of Reducer, but this will introduce new problems, which will be described in more detail below.
3 、 Shuffle Fetch and Aggregator
If the data written by Shuffle write is to be used by Reducer, you need shuffle fetcher to fetch the required data, where the fetch includes local and remote, because part of the shuffle data may be stored locally. Spark implements two different frameworks for shuffle fetcher: NIO to fetch data through socket connections and OIO to fetch data through netty server. The corresponding classes are BasicBlockFetcherIterator and NettyBlockFetcherIterator.
In Spark 0.7 and earlier versions, only BasicBlockFetcherIterator is supported, but BasicBlockFetcherIterator performance is not very good when the amount of shuffle data is relatively large, and can not make full use of network bandwidth. in order to solve this problem, a new shuffle fetcher is added to try to achieve better performance. We all know that during the shuffle process of hadoop MapReduce, the data from shuffle fetch will be merge sort, so that different value under the same key will be sequentially merged together for Reducer use. For this process, please see the following figure:
All merge sort is done on disk, effectively controlling the use of memory, but at the cost of more disk IO.
So does Spark also have merge sort?
First of all, although Spark belongs to the MapReduce system, the traditional MapReduce algorithm has been changed to some extent. Spark assumes that in most users' case, the sort of shuffle data is not required, such as word count, and forced sorting will only worsen performance, so Spark does not do merge sort on the Reducer side. Since there is no merge sort, how does Spark do reduce?
The existence of aggregator,aggregator in Spark is essentially a hashmap, which takes the key of map output as key and any type of combine as value. When doing word count reduce to calculate the count value, it will update or insert every key-value pair to the shuffle fetch into the hashmap (if not found in the hashmap, insert it; if found, update the value value). In this way, there is no need to merge sort all the key-value in advance, but to deal with one by one, saving the step of external sorting. But at the same time, it should be noted that the memory of the reducer must be sufficient to hold all the key and count values of the partition, so there are certain memory requirements.
In the example of word count above, memory usage is relatively small because value is constantly updated and does not need to be recorded in memory. Consider that if it is an operation like group by key, Reducer needs to get all the value corresponding to key. In Hadoop MapReduce, because of the merge sort, the data given to the Reducer is already group by key, but Spark does not have this step, so you need to store all the key and the corresponding value in hashmap, and merge the value into an array. It is conceivable that in order to hold all the data, users have to make sure that each partition is small enough for memory to hold, which is a very severe test for memory. Therefore, the Spark documentation advises users to increase partition as much as possible when it comes to such operations, that is, to increase the number of Mapper and Reducer.
Increasing the number of Mapper and Reducer certainly reduces the size of the partition so that memory can accommodate the partition. However, it is mentioned in shuffle write that bucket and write handler corresponding to bucket are determined by the number of Mapper and Reducer. The more task, the more bucket will increase, resulting in more buffer required by write handler. On the one hand, in order to reduce the use of memory, the strategy of increasing the number of task is adopted, on the other hand, the increase of the number of task will bring more buffer overhead, so it is caught in a dilemma of memory use.
In order to reduce memory usage, aggregator operations can only be moved from memory to disk, and the Spark community is aware of the problems caused by Spark when dealing with data that is much larger than memory. Therefore, PR303 provides an implementation of external sorting.
This is the end of the content of "what is the Shuffle process?". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.