In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how to implement Spark in shuffle. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Background
In the MapReduce framework, shuffle is the bridge between Map and Reduce. The output of Map must go through shuffle if it is used in Reduce. The performance of shuffle directly affects the performance and throughput of the whole program. Spark, as an implementation of MapReduce framework, naturally implements the logic of shuffle. This paper makes an in-depth study of how Spark's shuffle is implemented, what are its advantages and disadvantages, and what is different from Hadoop MapReduce's shuffle.
Shuffle
Shuffle is a specific phase in the MapReduce framework, which is between Map phase and Reduce phase. When the output of Map is to be used by Reduce, the output needs to be hashed by key and distributed to each Reducer. This process is shuffle. Because shuffle involves disk reading and writing and network transmission, the performance of shuffle directly affects the running efficiency of the whole program.
The following diagram clearly describes the entire flow of the MapReduce algorithm, where shuffle phase is between Map phase and Reduce phase.
Conceptually, shuffle is a bridge to communicate data connections, so in fact, how is this part of shuffle realized? let's take Spark as an example to talk about the implementation of shuffle in Spark.
Evolutionary history of Spark Shuffle
First, take the figure as an example to briefly describe the whole process of shuffle in Spark:
First of all, each Mapper will create a corresponding number of bucket,bucket based on the number of Reducer is M × R, where M is the number of Map and R is the number of Reduce.
Secondly, the results produced by Mapper will be populated into each bucket according to the set partition algorithm. The partition algorithm here can be customized, and of course the default algorithm is to go to different bucket according to key hash.
When Reducer starts, it fetches the corresponding bucket from the remote or local block manager according to the id of its own task and the id of the Mapper it depends on as input to the Reducer.
Bucket here is an abstract concept. In the implementation, each bucket can correspond to a file, part of a file or something else.
Next, let's talk about the shuffle evolution history of Spark from shuffle write and shuffle fetch respectively.
Shuffle Write
In versions of Spark 0.6 and 0.7, the storage of shuffle data is stored in block manager as a file, following the same strategy as rdd.persist (StorageLevel.DISk_ONLY). See:
Override def run (attemptId: Long): MapStatus = {val numOutputSplits = dep.partitioner.numPartitions. / / Partition the map output. Val buckets = Array.fill (numOutputSplits) (new ArrayBuffer [(Any, Any)]) for (elem val blockId = ShuffleBlockManager.blockId (shuffleId, bucketId, mapId) blockManager.getDiskBlockWriter (blockId, serializer, bufferSize)} new ShuffleWriterGroup (mapId, writers)} override def releaseWriters (group: ShuffleWriterGroup) = {/ / Nothing really to release here. }
Spark 0.8 significantly reduces the memory pressure on shuffle. Now Map output does not need to store it all in memory and then flush it to the hard disk. Instead, record-by-record writes to disk. At the same time, the management of shuffle files is also managed independently of the new ShuffleBlockManager, rather than with rdd cache files.
But this version of Spark 0.8's shuffle write still has two big problems unsolved:
First of all, it is still the problem of too many shuffle files, too many shuffle files will cause excessive pressure on the file system, and second, it will reduce the throughput of IO.
Second, although Map output data no longer need to be pre-evaluate in memory, which significantly reduces the memory pressure, the buffer overhead caused by the newly introduced DiskObjectWriter is also a memory overhead that can not be ignored. Suppose we have 1k Mapper and 1k Reducer, then there will be 1m bucket and 1m write handler at the same time, and each write handler requires 100KB memory by default, so a total of 100GB memory is required. In this way, only buffer needs so much memory, and the memory cost is staggering. Of course, in reality, if the 1k Mapper are run in time sharing, the memory required will only be the size of cores * reducer numbers * 100KB. However, if there is a large number of reducer, the memory cost of this buffer is also quite high.
To address the problem of too many shuffle files, Spark 0.8.1 introduced a new shuffle consolidation with a view to significantly reducing the number of shuffle files.
First of all, let's introduce the principle of shuffle consolidation with an illustration.
Suppose the job has four Mapper and four Reducer and two core, that is, it can run two task in parallel. We can calculate that the shuffle write of Spark requires a total of 16 bucket, so there are 16 write handler. In previous versions of Spark, each bucket corresponds to a file, so 16 shuffle files are generated here.
In shuffle consolidation, each bucket does not correspond to a file, but to a segment in the corresponding file. At the same time, the number of shuffle files generated by shuffle consolidation is also related to the number of Spark core. In the above illustration, the four Mapper of job are run in two batches. When the first batch of two Mapper runs, it will apply for eight bucket and generate eight shuffle files; while when the second batch of Mapper runs, the eight bucket applied for will not generate eight new files, but will be appended to the previous eight files, so that there are only eight shuffle files in total, while there are 16 different segment files inside the file. Therefore, theoretically speaking, the number of shuffle files generated by shuffle consolidation is C × R, where C is the number of core number,R of Spark cluster is the number of Reducer.
It is important to note that when masking C, shuffle consolidation produces the same number of files as the previous implementation.
Shuffle consolidation significantly reduces the number of shuffle files and solves a serious problem in the previous version, but the buffer overhead of writer handler is still not reduced. If we want to reduce the buffer overhead of writer handler, we can only reduce the number of Reducer, but this will introduce new problems, which will be described in more detail below.
After talking about the evolutionary history of shuffle write, we are going to talk about shuffle fetch and Spark's aggregator, which is very important to the performance of Spark applications.
Shuffle Fetch and Aggregator
If the data written by Shuffle write is to be used by Reducer, you need shuffle fetcher to fetch the required data, where the fetch includes local and remote, because part of the shuffle data may be stored locally. Spark implements two different frameworks for shuffle fetcher: NIO to fetch data through socket connections and OIO to fetch data through netty server. The corresponding classes are BasicBlockFetcherIterator and NettyBlockFetcherIterator.
In Spark 0.7 and earlier versions, only BasicBlockFetcherIterator is supported, but BasicBlockFetcherIterator performance is not very good when the amount of shuffle data is relatively large, and can not make full use of network bandwidth. in order to solve this problem, a new shuffle fetcher is added to try to achieve better performance. For early evaluation of shuffle performance, please refer to Spark usergroup. Of course, the performance of BasicBlockFetcherIterator is much better now, and you can test and compare both implementations when you use them.
Let's talk about aggregator next. We all know that during the shuffle process of Hadoop MapReduce, the data from shuffle fetch will be merge sort, so that different value under the same key will be sequentially merged together for Reducer use. For this process, please see the following figure:
All merge sort is done on disk, effectively controlling the use of memory, but at the cost of more disk IO.
So whether Spark also has merge sort, or whether it is implemented in other ways, we will explain in detail below.
First of all, although Spark belongs to the MapReduce system, the traditional MapReduce algorithm has been changed to some extent. Spark assumes that in most users' case, the sort of shuffle data is not required, such as word count, and forced sorting will only worsen performance, so Spark does not do merge sort on the Reducer side. Since there is no merge sort, how does Spark do reduce? It's time to talk about aggregator.
Aggregator is essentially a hashmap, which takes the key of map output as key and any type of combine you want as value. When we do the word count reduce to calculate the count value, it will update or insert every key-value pair to the shuffle fetch into the hashmap (if not found in the hashmap, insert it; if found, update the value value). In this way, there is no need to merge sort all the key-value in advance, but to deal with one by one, saving the step of external sorting. But at the same time, it should be noted that the memory of the reducer must be sufficient to hold all the key and count values of the partition, so there are certain memory requirements.
In the example of word count above, memory usage is relatively small because value is constantly updated and does not need to be recorded in memory. Consider that if it is an operation like group by key, Reducer needs to get all the value corresponding to key. In Hadoop MapReduce, because of the merge sort, the data given to the Reducer is already group by key, but Spark does not have this step, so you need to store all the key and the corresponding value in hashmap, and merge the value into an array. It is conceivable that in order to hold all the data, users have to make sure that each partition is small enough for memory to hold, which is a very severe test for memory. Therefore, the Spark documentation advises users to increase partition as much as possible when it comes to such operations, that is, to increase the number of Mapper and Reducer.
Increasing the number of Mapper and Reducer certainly reduces the size of the partition so that memory can accommodate the partition. But we mentioned in shuffle write that bucket and write handler corresponding to bucket are determined by the number of Mapper and Reducer. The more task, the more bucket will increase, and as a result, more buffer will be needed for write handler. On the one hand, we adopt the strategy of increasing the number of task in order to reduce the use of memory, on the other hand, the increase of the number of task will bring more buffer overhead, so we are caught in a dilemma of memory use.
In order to reduce memory usage, aggregator operations can only be moved from memory to disk, and the Spark community is aware of the problems caused by Spark when dealing with data that is much larger than memory. Therefore, PR303 provides an implementation of external sorting, and it is believed that when Spark 0.9 release, this patch should be able to merge in, then the memory usage can be significantly reduced.
As a very important part of Spark program, shuffle directly affects the performance of Spark program. Although there are still many problems in the implementation of shuffle in today's Spark version, it has made great progress compared with the earlier version. Open source code is so constantly iterated forward, with the increasing popularity of Spark, more and more people contribute, I believe that the subsequent version will be greatly improved.
This is the end of the shuffle implementation of Spark. I hope the above content can be of some help to you and learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.