In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "what is the method of spark shuffle tuning". In daily operation, I believe many people have doubts about what the method of spark shuffle tuning is. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "what is the method of spark shuffle tuning?" Next, please follow the editor to study!
Under what circumstances will shuffle occur, and then what is the principle of shuffle?
In spark, there are the following operators: groupByKey, reduceByKey, countByKey, join, and so on.
GroupByKey, it is necessary to centralize the same key and the corresponding values in the data distributed on each node of the cluster, on the same node in the cluster, or more strictly, in a task of an executor of a node. Then, after centralizing a values corresponding to key, it can be handed over to us to deal with,; reduceByKey, the operator function to reduce the values set, and finally becomes a value;countByKey. You need to get all the value corresponding to a key in a task, and then count the total number of value. Join,RDD,RDD, as long as there are two value corresponding to the same key in the two RDD, they can all go to the task of the executor of one node and process them for us.
The problem is that the same word, such as (hello, 1), may be scattered on different nodes; to accumulate each word, you have to have all the words run to a task of the same node and be processed by a task.
For the task of stage in the first half of each shuffle, each task will create files with the same number of task in the next stage. For example, if the next stage will have 100 task, then the current stage will create 100 files for each task. The values corresponding to the same key must be written to the same file.
In the second half of shuffle, the task of stage, each task will pull key and value pairs from its own file written by task on each node; then task will have a memory buffer, and then HashMap will be used to aggregate key and values; (key, values)
Task uses our own defined aggregate function, such as reduceByKey (_ + _), to accumulate all the values one-to-one to get the final value. It's done, shuffle.
Shuffle must be divided into two stage to complete. Because this is actually a reverse process, it is not stage that decides shuffle, but shuffle that decides stage.
ReduceByKey (_ + _), when an action triggers a job, DAGScheduler is responsible for dividing the job into multiple stage. The basis for division is that if an operator that triggers a shuffle operation, such as reduceByKey, is found, the first half of the operation, as well as all previous RDD and transformation operations, is divided into the second half of a stage;shuffle operation, and the subsequent RDD and transformation operations up to action are divided into another stage.
Before writing data to the disk file, the task in the first half of the shuffle will write to the memory buffer one by one. After the memory buffer is overflowed, the spill will overflow to the disk file.
What happens if you don't merge the map output files?
Reduce network transmission, disk io, and reduce the memory buffer on the reducing side
Conditions of the actual production environment:
100 nodes (one executor per node): 100 executor, each executor:2 cpu core, total 1000 task: average 10 task per executor, 1000 task upstream, 1000 task downstream, 10 task per node, how many map-side files will each node or each executor output? 10 * 1000 = 10,000 files (masks)
How many map output files are there in total? 10000 = 1 million.
The question arises: what is the adverse impact of the default shuffle behavior on performance?
The operation of writing to disk in shuffle is basically the most performance-consuming part of shuffle.
According to the above analysis, a shuffle section of a spark job in a normal production environment will write 1 million files to disk.
The impact of disk IO on performance and spark job execution speed is extremely staggering and frightening.
Basically, the performance of spark jobs is consumed in shuffle, although it is not just the output file on the map side of shuffle, but it is also a very large performance consumption point.
New SparkConf (. Set ("spark.shuffle.consolidateFiles", "true")
Enable the mechanism of merging output files on the shuffle map side; by default, it is not enabled, that is, a large number of output files on the map side as mentioned above will occur, which will seriously affect performance.
After enabling the merge mechanism of output files on the map side:
For the first stage, run cpu core task at the same time, for example, cpu core is 2, and run 2 task in parallel
Each task creates the number of task files for the next stage
The first stage, after the execution of the two task running in parallel, will execute the other two task
The other two task will not recreate the output files; instead, they will reuse the map output files created by the previous task and write the data to the output files of the previous batch of task.
When the second stage,task pulls the data, it will not pull the output file created for itself by each task of the previous stage.
A reminder (merge of output files on the map side):
Only task executed in parallel will create a new output file
The next batch of task executed in parallel will reuse the existing output files.
However, there is one exception, for example, two task are being executed in parallel, but at this time two task are to be executed (not the same batch).
Then, at this time, the output files created by the two task just cannot be reused.
Instead, you can only create a new output file.
To achieve the effect of merging output files, one batch of task must be executed first, and then the next batch of task
In order to reuse the previous output file; responsible for multiple batches of task to execute at the same time, it is still impossible to reuse.
What will happen to the example in the production environment when the map output file merging mechanism is enabled?
Conditions of the actual production environment:
100 nodes (one executor per node): 100 executor
Cpu core per executor:2
Total 1000 task: an average of 10 task per executor
1000 task upstream and 1000 task downstream
Each node, 2 cpu core, how many output files are there? 2 * 1000 = 2000 (Clears)
With a total of 100 nodes, how many output files are created? 100 * 2000 = 200000 files
Compared with the situation before the merger mechanism was turned on, 1 million
Map output file, in the production environment, up and down 5 times!
How does merging map output files affect the performance of our spark?
Map task writes IO to disk files, reducing: 1 million files-> 200000 files
The second stage, originally to pull the number of task files of the first stage, 1000 task, and each task of the second stage, will pull 1000 files and transfer them through the network; after merging, 1000 nodes, 2 cpu core per node, and each task of the second stage will be pulled, mainly pulling 1000 * 2 = 2000 files Is the performance consumption of network transmission also greatly reduced to share? in fact, in the production environment, after the use of the spark.shuffle.consolidateFiles mechanism, the actual effect of performance tuning: for the configuration of the above production environment, the performance improvement is still quite objective.
Spark assignment, 5 hours-> 2-3 hours.
Don't underestimate the merging mechanism of output files on map. In fact, when the amount of data is relatively large, you yourself have made the previous performance tuning.
Go to executor-> cpu core-> parallelism (number of task). If shuffle is not tuned, shuffle will be very bad.
The generation of a large number of map output files. It has a bad effect on the performance.
At this time, to turn on this mechanism can effectively improve performance.
Spark.shuffle.manager hash multiple small Files
Spark.shuffle.manager sort censor small files (default shuffle management mechanism)
Spark.shuffle.file.buffer, default is 32k
Spark.shuffle.memoryFraction,0.2
By default, when shuffle's map task is output to a disk file, Universe first writes to each task's own associated memory buffer. The buffer size defaults to 32kb. Each time, when the memory buffer is full, the spill operation will be performed, and the overflow operation will be written to the disk file to reduce the task. After pulling the data, the values corresponding to each key will be aggregated using the data format of hashmap. For the values corresponding to each key, the code that executes our custom aggregate function, such as _ + _ (accumulating all the values) reduce task, in fact, uses its own corresponding executor memory, executor (jvm process, heap). By default, the proportion of executor memory allocated to reduce task for aggregation is 0.2. Here comes the problem, because the ratio is 0.2, so, in theory, it is very likely that a lot of data will be pulled, so it cannot be put in memory. At this time, the default behavior, that is, spill all the data that cannot be stored in it, will be overwritten to the disk file.
After saying the principle, let's take a look at what kind of problems may occur if you don't tune by default.
By default, the memory buffer on the map side is per task,32kb.
By default, the aggregate memory ratio on the reduce side is 0.2, that is, 20%.
If the task on the map side, the amount of data processed is relatively large, but your memory buffer size is fixed.
What might happen?
Each task handles 320kb, 32kb, and overwrites 320 / 32 = 10 times to the disk in total.
Each task handles 32kb of task, which overwrites 32000 / 32 = 1000 times to the disk in total.
When the amount of data processed by map task is relatively large, the memory buffer of your task is relatively small by default, 32kb. It may cause multiple spill overflow operations on the map side to disk files, resulting in a large number of disk IO, resulting in performance degradation.
Reduce side aggregates memory, occupies a proportion. The default is 0.2. If the amount of data is large and a lot of data is pulled by reduce task, then insufficient aggregate memory on the reduce side will occur frequently, spill operations will occur frequently and overflow will be written to disk. And the most fatal thing is that the larger the amount of data overwritten on the disk, the more likely it is that the data on the disk will be read and aggregated many times during the aggregation operation.
It is not tuned by default, and in the case of a large amount of data, reading and writing of disk files on the reducing side may occur frequently.
The reason why these two points are put together is that they are related. With the increase of the amount of data, there will be some problems on the map side.
It is certain that there will be some problems on the reduce side; the problem is the same: disk IO is frequent and more, which affects performance.
Tuning:
Adjust map task memory buffer: spark.shuffle.file.buffer. Default is 32k (spark 1.3.x is not this parameter.
Followed by a suffix, kb;spark 1.5.x later, changed, is now this parameter)
Adjust the aggregate memory ratio on the reduce side: spark.shuffle.memoryFraction,0.2
In the actual production environment, when do we adjust the two parameters?
Look at Spark UI, if your company decides to adopt standalone mode, then it's very simple. When your spark runs, it will show an address of Spark UI, port of 4040, go in and click on it. You can see the details of each stage, what executor, what task, the amount of shuffle write and shuffle read of each task, the disk and memory of shuffle, the amount of data read and written. If the yarn mode is used to submit the course, at the front of the course, enter through the interface of yarn, click the corresponding application, enter Spark UI, and view the details.
If you find that the write and read of the shuffle disk are very large, you can adjust these two parameters
Adjust the two parameters mentioned above. The principle of adjustment. Spark.shuffle.file.buffer, double at a time, then look at the effect, 64128 potential spark.shuffle.randomyFraction, increase by 0.1at a time, see the effect. Can not adjust too big, too big after too much, because the memory resources are limited, you adjust here is too large, other links of the memory use will be a problem.
After adjusting, the effect? Map task memory buffer has become larger, reducing the number of spill to disk files; reduce-side aggregate memory has become larger
Reduces the number of spill to disk and reduces the number of disk files that are aggregated later.
At this point, the study of "what is the method of spark shuffle tuning" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.