Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the method of Spark performance optimization

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "what is the method of Spark performance optimization". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

# Overview of ShuffleManager Development

In the source code of Spark, the component responsible for the execution, calculation, and processing of shuffle processes is mainly ShuffleManager, that is, the shuffle manager. With the development of the version of Spark, ShuffleManager is also iterating and becoming more and more advanced.

Prior to Spark 1.2, the default shuffle computing engine was HashShuffleManager. The ShuffleManager and HashShuffleManager has a very serious disadvantage, that is, it will produce a large number of intermediate disk files, which will affect the performance by a large number of disk IO operations.

So in versions later than Spark 1.2, the default ShuffleManager has been changed to SortShuffleManager. Compared with HashShuffleManager, SortShuffleManager has some improvements. The main reason is that although each Task will generate more temporary disk files during the shuffle operation, it will eventually merge all the temporary files (merge) into one disk file, so there is only one disk file per Task. When the shuffle read task of the next stage pulls its own data, just read part of the data in each disk file according to the index.

Let's analyze the principles of HashShuffleManager and SortShuffleManager in detail.

# how HashShuffleManager works

# # unoptimized HashShuffleManager

The following figure illustrates the principle of unoptimized HashShuffleManager. Let's make a clear assumption here: there is only one CPU core per Executor, that is, no matter how many task threads are allocated on this Executor, only one task thread can be executed at a time.

Let's start with shuffle write. In the shuffle write phase, after the calculation of a stage is completed, the data processed by each task is "classified" according to key in order that the next stage can execute the operators of the shuffle class (such as reduceByKey). The so-called "classification" is to execute the hash algorithm on the same key so that the same key is written to the same disk file, and each disk file belongs to only one task of the downstream stage. Before writing the data to disk, the data is written to the memory buffer, and when the memory buffer is full, it will be overwritten to the disk file.

So how many disk files do you have to create for the next stage for each task that executes shuffle write? Quite simply, how many task are there for the next stage and how many disk files will be created for each task of the current stage. For example, if the next stage has a total of 100 task, then each task of the current stage needs to create 100 disk files. If the current stage has 50 task, a total of 10 Executor, and each Executor executes 5 Task, then a total of 500 disk files will be created on each Executor and 5000 disk files will be created on all Executor. Thus, the number of disk files produced by unoptimized shuffle write operations is staggering.

Now let's talk about shuffle read. Shuffle read is usually what a stage does at the beginning. At this time, each task of the stage needs to pull all the same key in the calculation result of the previous stage from each node to its own node through the network, and then carry out operations such as key aggregation or connection. Because in the process of shuffle write, task creates a disk file for each task of the downstream stage, so in the process of shuffle read, each task only needs to pull its own disk file from all the task nodes of the upstream stage.

The pulling process of shuffle read is to aggregate at the same time. Each shuffle read task has its own buffer buffer, and each time it can only pull data of the same size as the buffer buffer, and then aggregate operations such as through a Map in memory. After aggregating one batch of data, the next batch of data is pulled and put into the buffer buffer for aggregation. And so on, until all the data is pulled and the final result is obtained.

# # optimized HashShuffleManager

The following figure illustrates the principle of the optimized HashShuffleManager. By optimization, we mean that we can set a parameter, spark.shuffle.consolidateFiles. The default value of this parameter is false. Set it to true to enable the optimization mechanism. Generally speaking, if we use HashShuffleManager, it is recommended that this option be turned on.

When the consolidate mechanism is turned on, task does not create a disk file for each task of the downstream stage during the shuffle write process. The concept of shuffleFileGroup appears, and each shuffleFileGroup corresponds to a batch of disk files, and the number of disk files is the same as the number of task in the downstream stage. How many CPU core can be executed in parallel on an Executor. Each task executed in parallel in the first batch creates a shuffleFileGroup and writes the data to the corresponding disk file.

When the CPU core of Executor executes one batch of task and then the next batch of task, the next batch of task reuses the existing shuffleFileGroup, including the disk files in it. That is, task writes data to an existing disk file instead of to a new disk file. Therefore, the consolidate mechanism allows different task to reuse the same batch of disk files, which can effectively merge the disk files of multiple task to a certain extent, thus greatly reducing the number of disk files and improving the performance of shuffle write.

Suppose the second stage has 100 task and the first stage has 50 task. There are still 10 Executor in total, and each Executor executes 5 task. When you originally used unoptimized HashShuffleManager, each Executor produced 500 disk files, and all Executor produced 5000 disk files. But after optimization at this time, the formula for calculating the number of disk files created by each Executor is: number of CPU core * number of task for the next stage. In other words, only 100 disk files are created per Executor at this time, and only 1000 disk files are created by all Executor.

# how SortShuffleManager works

The operation mechanism of SortShuffleManager is mainly divided into two kinds, one is the ordinary operation mechanism, the other is the bypass operation mechanism. When the number of shuffle read task is less than or equal to the value of the spark.shuffle.sort.bypassMergeThreshold parameter (the default is 200), the bypass mechanism is enabled.

# # General operating mechanism

The following figure illustrates the principle of a normal SortShuffleManager. In this mode, the data is first written to an in-memory data structure, and different data structures may be selected according to different shuffle operators. If it is the shuffle operator of the aggregation class such as reduceByKey, then it will choose the Map data structure to aggregate through Map and write to memory at the same time; if it is a common shuffle operator like join, it will choose the Array data structure and write it directly to memory. Then, after each piece of data is written into the in-memory data structure, it is determined whether a critical threshold has been reached. If the critical threshold is reached, an attempt is made to overflow the data from the in-memory data structure to disk, and then empty the in-memory data structure.

Before overwriting to a disk file, the data already in the in-memory data structure is sorted according to key. After sorting, the data is written to the disk file in batches. The default number of batch is 10000, that is, sorted data is written to disk files in batches of 10, 000 pieces of data per batch. Writing to disk files is done through Java's BufferedOutputStream. BufferedOutputStream is the buffered output stream of Java, which first buffers the data in memory and then writes it to the disk file again when the memory buffer overflows. This can reduce the number of disk IO and improve performance.

When a task writes all the data to the in-memory data structure, multiple disk overwrites occur, resulting in multiple temporary files. Finally, all the previous temporary disk files are merged, which is the merge process, in which the data from all the previous temporary disk files is read out and then written to the final disk file in turn. In addition, since a task corresponds to only one disk file, which means that the data prepared by the task for the task of the downstream stage is in this file, a separate index file is written, which identifies the start offset and end offset of the data of each downstream task in the file.

SortShuffleManager greatly reduces the number of files because it has a disk file merge process. For example, the first stage has 50 task, a total of 10 Executor, each Executor executes 5 task, and the second stage has 100 task. Since there is only one disk file per task in the end, there are only 5 disk files on each Executor and only 50 disk files on all Executor.

# # bypass Operation Mechanism the following figure illustrates the principle of bypass SortShuffleManager. The trigger conditions for the bypass operation mechanism are as follows:

The number of shuffle map task is less than the value of the spark.shuffle.sort.bypassMergeThreshold parameter.

Shuffle operators that are not aggregate classes (such as reduceByKey).

At this point, task creates a temporary disk file for each downstream task, hash the data by key, and then writes key to the corresponding disk file according to the hash value of key. Of course, when writing to a disk file, it also writes to the memory buffer first, and then overwrites to the disk file after the buffer is full. Finally, all temporary disk files are also merged into a single disk file and a separate index file is created.

The disk writing mechanism of this process is exactly the same as that of unoptimized HashShuffleManager, because an amazing number of disk files are created, but a merge of disk files is done at the end. As a result, a small number of final disk files also make shuffle read perform better than unoptimized HashShuffleManager.

The difference between this mechanism and the ordinary SortShuffleManager running mechanism is that: first, the disk write mechanism is different; second, there is no sorting. In other words, the biggest benefit of enabling this mechanism is that there is no need to sort the data during the shuffle write process, thus saving this part of the performance overhead.

# tuning related parameters of shuffle

Here are some of the main parameters in the Shffule process, which explain in detail the functions, default values and tuning suggestions based on practical experience of each parameter.

Spark.shuffle.file.buffer

Default value: 32k

Parameter description: this parameter is used to set the buffer buffer size of the BufferedOutputStream of shuffle write task. Before writing data to a disk file, it is written to the buffer buffer, and after the buffer is full, it will be overwritten to the disk.

Tuning suggestion: if the memory resources available for the job are sufficient, you can appropriately increase the size of this parameter (for example, 64k), so as to reduce the number of overwrites to disk files during shuffle write, and thus reduce the number of disk IO, thereby improving performance. In practice, it is found that by adjusting this parameter reasonably, the performance will be improved by 1% to 5%.

Spark.reducer.maxSizeInFlight

Default value: 48m

Parameter description: this parameter is used to set the buffer buffer size of shuffle read task, and this buffer buffer determines how much data can be pulled at a time.

Tuning suggestion: if the memory resources available for the job are sufficient, you can appropriately increase the size of this parameter (for example, 96m), so as to reduce the number of times to pull data, and then reduce the number of network transmissions, thus improving performance. In practice, it is found that by adjusting this parameter reasonably, the performance will be improved by 1% to 5%.

Spark.shuffle.io.maxRetries

Default value: 3

Parameter description: when shuffle read task pulls its own data from the node where the shuffle write task resides, if the pull fails due to a network exception, it will automatically retry. This parameter represents the maximum number of times you can retry. If the pull is not successful within the specified number of times, it may cause the job execution to fail.

Tuning suggestion: for those jobs that include time-consuming shuffle operations, it is recommended to increase the maximum number of retries (such as 60) to avoid data pull failures due to factors such as JVM full gc or network instability. In practice, it is found that for shuffle processes with large amounts of data (billions ~ tens of billions), adjusting this parameter can greatly improve the stability.

Spark.shuffle.io.retryWait

Default value: 5s

Parameter description: the specific explanation is the same as above. This parameter represents the waiting interval for pulling data for each retry. The default is 5s.

Tuning suggestion: it is recommended to increase the interval (such as 60s) to increase the stability of shuffle operations.

Spark.shuffle.memoryFraction

Default value: 0.2

Parameter description: this parameter represents the proportion of memory allocated to shuffle read task for aggregation operations in Executor memory. The default is 20%.

Tuning suggestion: this parameter is explained in resource parameter tuning. If there is enough memory and persistence operations are rarely used, it is recommended that you increase this ratio to give more memory to shuffle read's aggregation operations to avoid frequently reading and writing disks during the aggregation process due to insufficient memory. In practice, it is found that reasonable adjustment of this parameter can improve the performance by about 10%.

Spark.shuffle.manager

Default value: sort

Parameter description: this parameter is used to set the type of ShuffleManager. After Spark 1.5, there are three options: hash, sort, and tungsten-sort. HashShuffleManager is the default option before Spark 1.2, but Spark 1.2 and later versions default to SortShuffleManager. Tungsten-sort is similar to sort, but uses the out-of-heap memory management mechanism in the tungsten plan to make memory use more efficient.

Tuning suggestion: because SortShuffleManager sorts the data by default, you can use the default SortShuffleManager if the sorting mechanism is needed in your business logic; and if your business logic does not need to sort the data, it is recommended to refer to the following parameters to avoid sorting operations through the bypass mechanism or optimized HashShuffleManager, while providing better disk read and write performance. It is important to note here that tungsten-sort should be used with caution, as some corresponding bug have been found before.

Spark.shuffle.sort.bypassMergeThreshold

Default value: 200

Parameter description: when ShuffleManager is SortShuffleManager, if the number of shuffle read task is less than this threshold (default is 200,200), the shuffle write process will not sort, but will directly write data in the way of unoptimized HashShuffleManager, but will eventually merge all temporary disk files generated by each task into a single file and create a separate index file.

Tuning suggestion: when you use SortShuffleManager, if you really do not need a sort operation, then it is recommended that this parameter be larger than the number of shuffle read task. Then the bypass mechanism is automatically enabled and map-side does not sort, reducing the performance overhead of sorting. However, in this way, a large number of disk files will still be generated, so the performance of shuffle write needs to be improved.

Spark.shuffle.consolidateFiles

Default value: false

Parameter description: this parameter is valid if HashShuffleManager is used. If set to true, the consolidate mechanism will be enabled and the output files of shuffle write will be greatly merged. In the case of a particularly large number of shuffle read task, this method can greatly reduce disk IO overhead and improve performance.

Tuning suggestion: if you really don't need the sorting mechanism of SortShuffleManager, in addition to using the bypass mechanism, you can also try to manually specify the spark.shffle.manager parameter to hash, use HashShuffleManager, and enable the consolidate mechanism. It has been tried in practice and found that its performance is 10% and 30% higher than that of SortShuffleManager with bypass mechanism turned on.

This is the end of the content of "what is the method of Spark performance optimization". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report