In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you about how to analyze the principle of Spark Shuffle. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
Spark Shuffle principle analysis Shuffle is to reorganize the data. Due to the characteristics and requirements of distributed computing, the implementation details are more complicated and complex. In the MapReduce framework, Shuffle is the bridge between Map and Reduce. The Map phase reads the data through shuffle and outputs it to the corresponding Reduce;, while the Reduce phase is responsible for pulling the data from the map and calculating. In the whole process of shuffle, it is often accompanied by a large number of disks and networks. Therefore, the performance of shuffle also directly determines the performance of the whole program. Spark will also have its own shuffle implementation process. In the process of DAG scheduling, the division of Stage phase is based on whether there is a shuffle process, that is, when there is a wide dependency on wide Dependency, shuffle is needed. At this time, the job job is divided into multiple Stage, and there are many task within each stage that can be run in parallel. The process between stage and stage is the shuffle stage. In Spark, the component responsible for the execution, calculation and processing of the shuffle process is ShuffleManager, that is, the shuffle manager. With the development of Spark, there are two ways to realize ShuffleManager, which are HashShuffleManager and SortShuffleManager, so the Shuffle of spark has two kinds of HashShuffle and SortShuffle. HashShuffle mechanism before Spark 1.2, the default shuffle computing engine was HashShuffleManager. The ShuffleManager-HashShuffleManager has a very serious drawback, that is, it will produce a large number of intermediate disk files, which will affect the performance by a large number of disk IO operations. So in versions later than Spark 1.2, the default ShuffleManager has been changed to SortShuffleManager. Compared with HashShuffleManager, SortShuffleManager has some improvements. The main reason is that each Task will generate more temporary disk files during shuffle operation, but will eventually merge all temporary files (merge) into one disk file, so there is only one disk file per Task. When the shuffle read task of the next stage pulls its own data, just read part of the data in each disk file according to the index.
Hash shuffle
One is the ordinary operating mechanism.
The other is the operating mechanism of merger.
The operation mechanism of HashShuffleManager can be divided into two kinds.
The merging mechanism is mainly to optimize the number of small files generated in the process of Shuffle by reusing buffer.
Hash shuffle is a Shuffle that does not have sorting.
The Hash shuffle diagram of the common mechanism here we first clarify the assumption that there is only one CPU core per Executor, that is, no matter how many task threads are allocated on this Executor, only one task thread can be executed at a time. There are three ReduceTask in the figure. Starting from ShuffleMapTask, they each calculate their own Hash (divider: hash/numreduce module) and classify them into three different categories. Each ShuffleMapTask is divided into three categories of data. They want to aggregate different data and then calculate the final result. So ReduceTask will collect the data belonging to its own category and aggregate it into a large collection of the same category. Each ShuffleMapTask outputs 3 local files. Here are 4 ShuffleMapTask. So a total of 4 x 3 classification files = 12 local small files were output. The main purpose of the shuffle Write phase is to "partition" the data processed by each stage according to key in order that the next stage can execute the operators of the shuffle class (such as reduceByKey,groupByKey) after the completion of the calculation of a task. The so-called "partition" is to execute the hash algorithm on the same key so that the same key is written to the same disk file, and each disk file belongs to only one task of the stage on the reduce side. Before writing the data to disk, the data is written to the memory buffer, and when the memory buffer is full, it will be overwritten to the disk file. So how many disk files do you have to create for the next stage for each task that executes shuffle write? Quite simply, how many task are there for the next stage and how many disk files will be created for each task of the current stage. For example, if the next stage has a total of 100 task, then each task of the current stage needs to create 100 disk files. If the current stage has 50 task, a total of 10 Executor, and each Executor executes 5 Task, then a total of 500 disk files will be created on each Executor and 5000 disk files will be created on all Executor. Thus, the number of disk files produced by unoptimized shuffle write operations is staggering. The shuffle Read phase shuffle read is usually what a stage does at the beginning. At this time, each task of the stage needs to pull all the same key in the calculation result of the previous stage from each node to its own node through the network, and then carry out operations such as key aggregation or connection. Because in the process of shuffle write, task creates a disk file for each task of the stage on the reduce side, so in the process of shuffle read, each task only needs to pull its own disk file from all the task nodes of the upstream stage. The pulling process of shuffle read is to aggregate at the same time. Each shuffle read task has its own buffer buffer, and each time it can only pull data of the same size as the buffer buffer, and then aggregate operations such as through a Map in memory. After aggregating one batch of data, the next batch of data is pulled and put into the buffer buffer for aggregation. And so on, until all the data is pulled and the final result is obtained. Note (1) buffer acts as a cache, which can accelerate writing to disk and improve computing efficiency. The default size of buffer is 32k. (2) Partitioner: according to the hash/numRedcue module, it is decided that the data is processed by several Reduce, and it is also decided to write into several buffer. (3) block file: small disk files. From the figure, we can know the formula for calculating the number of small disk files: block file=M*R (4) M is the number of map task, R is the number of Reduce, and generally the number of Reduce is equal to the number of buffer. The problem of the ordinary mechanism of Hash shuffle is determined by the divider (1). The Shuffle stage will produce a large number of small files on disk, and the number of establishing communication and pulling data will become more, which will result in a large number of time-consuming and inefficient IO operations (because too many small files are generated) (2). It may lead to OOM, a large number of time-consuming and inefficient IO operations, resulting in too many objects when writing to disk and too many objects when reading disk. These objects are stored in heap memory, which will lead to insufficient heap memory, which will lead to frequent GC,GC and OOM. Because a large number of file operation handles and temporary information need to be stored in memory, if the scale of data processing is relatively large, the memory is unbearable, and there will be problems such as OOM. The Hash shuffle merging mechanism of the merging mechanism is the reuse of buffer buffers, and the configuration for enabling the merging mechanism is spark.shuffle.consolidateFiles. The default value of this parameter is false. Set it to true to enable the optimization mechanism. Generally speaking, if we use HashShuffleManager, it is recommended that this option be turned on.
There are six shuffleMapTask in the diagram, and the data category is still divided into three types, because the Hash algorithm will classify according to your Key. In the same process, no matter how many Task there are, it will put the same Key in the same Buffer, and then write the data in the Buffer to the local file in terms of the number of Core (a Core has only one type of Key data). Each Task is in the process. Write to 3 local files in the common process respectively, and there are 6 shuffleMapTasks here, so the total output is 2 Cores x 3 classification files = 6 local small files. Note (1). Start the configuration of HashShuffle merge mechanism ConsolidatedShuffle spark.shuffle.consolidateFiles=true (2). Block file=Core*R Core is the number of cores of CPU and R is the number of Reduce. The problem of Hash shuffle merging mechanism is that if there are too many parallel tasks or data fragments on the Reducer side, the Core* Reducer Task is still too large, and many small files will be generated. Sort shuffle
The operation mechanism of SortShuffleManager can be divided into two kinds.
One is the ordinary operating mechanism.
The other is the bypass operation mechanism.
The general mechanism of Sort shuffle in this mode, the data is first written to a data structure, and the aggregation operator is written to Map, which is locally aggregated through Map and written to memory at the same time. The Join operator is written to ArrayList directly into memory. Then you need to determine whether the threshold (5m) is reached, and if so, the data of the memory data structure is written to disk, emptying the memory data structure. Before overwriting the disk, it is sorted according to key, and the sorted data will be written to the disk file in batches. The default batch is 10000, and the data is written to the disk file in each batch. Write disk files through buffer overwrite, each overflow will produce a disk file, that is, a task process will produce multiple temporary files. Finally, in each task, all temporary files are merged, which is the merge process, which reads all temporary files and writes them to the final file at once. It means that all the data of an task is in this file. At the same time, write a separate index file to identify the downstream task data in the file index start offset and end offset. In this way, if the first stage has 50 task and each Executor executes one task, then no matter how many task are downstream, you need 50 files on disk. Benefits 1. There are obviously fewer small files, and a task generates only one file file. 2. The overall order of file files, coupled with the assistance of index files, makes the search faster. Although sorting wastes some performance, searching is much faster than the operating conditions of the sortShufflebypass mechanism in bypass mode.
The number of shuffle map task is less than the value of the spark.shuffle.sort.bypassMergeThreshold parameter
Shuffle operators that are not aggregate classes (such as reduceByKey)
Compared with the ordinary mechanism of sortshuffle, when there is not much shuffleMapTask, the first write mechanism is different, and secondly, there is no sorting. This saves some of the performance overhead. To sum up, when the number of shuffleMapTask is less than the default value of 200, enable sortShuffle in bypass mode (because the amount of data itself is relatively small, there is no need for sort full sorting, because the query speed is fast with a small amount of data, which saves the performance overhead of sort.) This mechanism is different from the ordinary SortShuffleManager mechanism in that: first, the disk write mechanism is different; second, there is no sort sorting; Shuffle tunes the Shuffle core components.
Stage is divided when you encounter ShuffleDenpendency. ShuffleMapStage: the intermediate stage,ResultStage that provides data for the shuffle: the stage that calculates the result for an action operation.
Analysis of Shuffle principle MapOutputTracker
One problem solved is how does resut task know which Executor to pull Shuffle data from?
ShuffleWriter
(1) HashShuffleWriter
Features: according to the Hash partition, the number of partitions is m * n.
Val counts: RDD [(String, Int)] = wordCount.reduceByKey (new HashPartitioner (2), (x, y) = > x + y)
(2) SortShuffleWriter
Features:
A, the number of files is m
B, if you need sorting or combine, then each partition data sorting should be implemented on its own. (the sort in SortShuffleWriter refers to sorting the partition numbers of the partition.)
C, the data is first put in memory, if there is not enough memory, it is written to disk, and then all is written to disk.
(3) BypassMergeSortShuffleWriter
This model has the characteristics of both HashShuffleWriter and SortShuffleter. Because in fact, the performance of HashShufflerWriter is good, but if the number of task is too large, the performance will decline, so Spark automatically uses this mode when the number of task is small. At first, it still generates multiple files like HashShufflerWriter, but eventually merges multiple files into one file. Then read the file downstream. The default map partition needs to be less than spark.shuffle.sort.bypassMergeThreshold (the default is 200), because if there are too many partitions, the resulting small files will have a lot of performance degradation.
Spark Shuffle parameter tuning spark.shuffle.file.buffer
Default value: 32k
Parameter description: this parameter is used to set the buffer buffer size of the BufferedOutputStream of shuffle write task. Before writing data to a disk file, it is written to the buffer buffer, and after the buffer is full, it will be overwritten to the disk.
Tuning suggestion: if the memory resources available for the job are sufficient, you can appropriately increase the size of this parameter (for example, 64k), so as to reduce the number of overwrites to disk files during shuffle write, and thus reduce the number of disk IO, thereby improving performance. In practice, it is found that by adjusting this parameter reasonably, the performance will be improved by 1% to 5%.
Spark.reducer.maxSizeInFlight
Default value: 48m
Parameter description: this parameter is used to set the buffer buffer size of shuffle read task, and this buffer buffer determines how much data can be pulled at a time.
Tuning suggestion: if the memory resources available for the job are sufficient, you can appropriately increase the size of this parameter (for example, 96m), so as to reduce the number of times to pull data, and then reduce the number of network transmissions, thus improving performance. In practice, it is found that by adjusting this parameter reasonably, the performance will be improved by 1% to 5%.
Spark.shuffle.io.maxRetries
Default value: 3
Parameter description: when shuffle read task pulls its own data from the node where the shuffle write task resides, if the pull fails due to a network exception, it will automatically retry. This parameter represents the maximum number of times you can retry. If the pull is not successful within the specified number of times, it may cause the job execution to fail.
Tuning suggestion: for those jobs that include time-consuming shuffle operations, it is recommended to increase the maximum number of retries (such as 60) to avoid data pull failures due to factors such as JVM full gc or network instability. In practice, it is found that for shuffle processes with large amounts of data (billions ~ tens of billions), adjusting this parameter can greatly improve the stability.
Spark.shuffle.io.retryWait
Default value: 5s
Parameter description: the specific explanation is the same as above. This parameter represents the waiting interval for pulling data for each retry. The default is 5s.
Tuning suggestion: it is recommended to increase the interval (such as 60s) to increase the stability of shuffle operations.
Spark.shuffle.memoryFraction
(Spark1.6 is this parameter. It has changed since 1.6.For more information, please refer to the previous section on Spark memory model.)
Default value: 0.2
Parameter description: this parameter represents the proportion of memory allocated to shuffle read task for aggregation operations in Executor memory. The default is 20%.
Tuning suggestion: this parameter is explained in resource parameter tuning. If there is enough memory and persistence operations are rarely used, it is recommended that you increase this ratio to give more memory to shuffle read's aggregation operations to avoid frequently reading and writing disks during the aggregation process due to insufficient memory. In practice, it is found that reasonable adjustment of this parameter can improve the performance by about 10%.
Spark.shuffle.manager
Default value: sort
Parameter description: this parameter is used to set the type of ShuffleManager. After Spark 1.5, there are three options: hash, sort, and tungsten-sort. HashShuffleManager is the default option before Spark 1.2, but Spark 1.2 and later versions default to SortShuffleManager. Spark1.6 later removed the hash mode. Tungsten-sort is similar to sort, but uses the out-of-heap memory management mechanism in the tungsten plan to make memory use more efficient.
Tuning suggestion: because SortShuffleManager sorts the data by default, you can use the default SortShuffleManager if the sorting mechanism is needed in your business logic; and if your business logic does not need to sort the data, it is recommended to refer to the following parameters to avoid sorting operations through the bypass mechanism or optimized HashShuffleManager, while providing better disk read and write performance. It is important to note here that tungsten-sort should be used with caution, as some corresponding bug have been found before.
Spark.shuffle.sort.bypassMergeThreshold
Default value: 200
Parameter description: when ShuffleManager is SortShuffleManager, if the number of shuffle read task is less than this threshold (default is 200,200), the shuffle write process will not sort, but will directly write data in the way of unoptimized HashShuffleManager, but will eventually merge all temporary disk files generated by each task into a single file and create a separate index file.
Tuning suggestion: when you use SortShuffleManager, if you really do not need a sort operation, then it is recommended that this parameter be larger than the number of shuffle read task. Then the bypass mechanism is automatically enabled and map-side does not sort, reducing the performance overhead of sorting. However, in this way, a large number of disk files will still be generated, so the performance of shuffle write needs to be improved.
The above is the editor for you to share how to analyze the principle of Spark Shuffle, if you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.