Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the parameters and principles of hadoop tuning

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Hadoop tuning parameters and principles, I believe that many inexperienced people do not know what to do, so this paper summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.

Question guide:

1.map will first write some of the results that have been generated to the buffer. Which parameter can be used to set the buffer size?

two。 How to reduce the number of split of map?

When will the data in 3.map be written to disk? What is spill?

4.map actually starts when buffer is full to a certain extent (say, 80%). Which parameter determines spill?

5. Which parameter can control whether the map intermediate result uses compression?

How many phases does 6.reduce contain, which must be included in each reduce?

What does it look like when 7.Reduce task is doing shuffle? How to adjust the number of downloads of multiple parallel map?

Whether all the data in 8.reduce comes from disk, and how to adjust the use of memory?

1 Map side tuning parameter 1.1 MapTask operation internal principle

1. When map task starts to operate and produces intermediate data, the intermediate results are not simply written to disk. The intermediate process is more complex, and the memory buffer is used to cache some of the results that have been produced, and some pre-sorting is done in the memory buffer to optimize the performance of the whole map. As shown in the figure above, each map corresponds to a memory buffer (MapOutputBuffer, that is, the buffer in memory in the image above). Map will first write some of the results already generated to the buffer. The buffer is the 100MB size by default, but the size can be adjusted according to the parameters set when the job is submitted.

This parameter is: mapreduce.task.io.sort.mb.

When the data generated by map is very large and the mapreduce.task.io.sort.mb is enlarged, then the number of spill of map in the whole calculation process is bound to be reduced, and the operation of map task on the disk will be reduced.

If the bottleneck of map tasks is on disk, this adjustment will greatly improve the computing performance of map.

The memory structure of map for sort and spill is as follows:

In the process of running, 2.map keeps writing existing calculation results to the buffer, but the buffer may not be able to cache all the map output. When the map output exceeds a certain threshold (such as 100m), then map must write the data in the buffer to disk, a process called spill in mapreduce. Map does not wait until the buffer is full before spill, because if you write the spill when it is all full, it will inevitably cause the computing part of the map to wait for the buffer to free space. So, map actually starts spill when buffer is full to a certain extent (say, 80%).

This threshold is also controlled by a configuration parameter of job

"mapreduce.map.sort.spill.percent, which defaults to 0.80 or 80%."

This parameter also affects the frequency of spill, which in turn affects the read and write frequency of the map task running cycle to the disk. But in non-special cases, there is usually no need for artificial adjustment. Adjusting mapreduce.task.io.sort.mb is more convenient for users.

3. When the calculation part of the map task is complete, if the map has output, it will generate one or more spill files, which are the output of map. Before map exits normally (cleanup), these spill need to be merged (merge) into one, so map still has a process of merge before it ends. In the process of merge, there is a parameter that adjusts the behavior of the process: mapreduce.task.io.sort.factor. This parameter defaults to 10. It indicates how many parallel stream can be written to the merge spill file at most when it comes to the merge file. For example, if the data generated by map is very large, the resulting spill file is greater than 10, and mapreduce.task.io.sort.factor uses the default 10, then when the map calculation is completed to do merge, there is no way to merge all the spill files into one at a time, but will be divided into multiple times, up to 10 stream at a time. That is to say, when the intermediate result of map is very large, increasing mapreduce.task.io.sort.factor will help to reduce the number of merge, and then reduce the frequency of map reading and writing to the disk, which may achieve the purpose of optimizing the job.

4. When job specifies combiner, we all know that after the introduction of map, map results will be merged on the map side according to the functions defined by combiner. The time to run the combiner function may be before or after the merge is completed, and the timing can be controlled by one parameter, mapreduce.map.combine.minspills (default 3). When combiner is set in job and the number of spill is greater than or equal to 3, then the combiner function will run before the merge produces the result file. In this way, we can reduce the amount of data written to the disk file when spill needs a lot of merge and a lot of data needs to do conbine, also in order to reduce the frequency of reading and writing to the disk, and it is possible to optimize the job.

5. The way to reduce the access of intermediate results to and from disk is not only these, but also compression. That is to say, in the middle of map, whether it is the time of spill or the result file produced by merge, it can be compressed. The advantage of compression is that the amount of data written to the read disk is reduced by compression. It is especially useful for job, where the intermediate result is very large and disk speed becomes the bottleneck of map execution. The parameter that controls whether the map intermediate result uses compression is: mapreduce.map.output.compress (true/false). When this parameter is set to true, map will compress the data and then write it to disk when writing the intermediate result. When reading the result, it will decompress the data first and then read the data. The consequence of this is that the amount of intermediate data written to disk will be less, but cpu will consume some to compress and decompress. So this method is usually suitable for the case that the intermediate result of job is very large, and the bottleneck is not in cpu, but in the read and write of disk. To put it bluntly, use cpu for IO. According to observation, usually most jobs cpu is not a bottleneck, unless the operation logic is extremely complex. So it is usually profitable to use compression for intermediate results. The following is a comparison of the amount of data read and written to the local disk of the wordcount intermediate result with and without compression:

The intermediate result of map is not compressed:

Map intermediate result compression:

As you can see, the same job, the same data, in the case of compression, the intermediate result of map can be reduced by nearly 10 times. If the bottleneck of map is on disk, then the performance of job will be greatly improved.

When using map intermediate result compression, users can also choose which compression format to use for compression. Now the compression formats supported by hadoop are: GzipCodec,LzoCodec,BZip2Codec,LzmaCodec and other compression formats. Generally speaking, LzoCodec is suitable for achieving a more balanced cpu and disk compression ratio. But it also depends on the specific situation of job. If you want to choose the compression algorithm of the intermediate result, you can set the configuration parameter: mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec or other compression method chosen by the user.

1.2 tuning of Map side related parameters

Option type default value description mapreduce.task.io.sort.mb

Int100 caches the buffer size (in MB) io.sort.record.percent of map intermediate results

Percentage of float0.05io.sort.mb used to hold map output record boundaries, and other caches are used to hold data io.sort.spill.percent

Mapreduce.map.sort.spill.percent

Threshold io.sort.factor for float0.80map to start spill operation

Mapreduce.task.io.sort.factor

The upper limit of the number of streams that int10 can operate at the same time when performing merge operations. Min.num.spill.for.combineint3combiner function runs the minimum number of spill mapred.compress.map.outputbooleanfalsemap intermediate result whether to use compressed mapred.map.output.compression.codecclass nameorg.apache.hadoop.io.

Compress.DefaultCodec

Compression format of map Intermediate result

2 Reduce side tuning parameters 2.1 ReduceTask operation internal principle

The operation of 1.reduce is divided into three stages The order is copy- > sort- > reduce. Since each map of job divides the data into map output results into n partition according to the number of reduce (n), it is possible that the intermediate result of map contains some of the data that each reduce needs to process. Therefore, in order to optimize the execution time of the reduce, when the first map of the job ends, all the reduce will try to download the partition part of the data corresponding to the reduce from the completed map. This process is commonly known as shuffle, that is, the copy process.

When 2.Reduce task is doing shuffle, it is actually downloading part of the data belonging to this reduce from different completed map, because there are usually many map, so for a reduce, downloading can also be parallel from multiple map downloads, this parallelism can be adjusted, the adjustment parameter is: mapreduce.reduce.shuffle.parallelcopies (default 5). By default, only 5 parallel download threads will download data from map. If there are 100 or more map completed by job in a period of time, then reduce can only download data of up to 5 map at the same time, so this parameter is more suitable for job with a lot of map and faster completion, which is conducive to faster reduce access to its own part of the data.

When each download thread of 3.reduce downloads some map data, there may be an error on the machine where the intermediate result of the map is located, or the file of the intermediate result is lost, or the network is cut off, and so on, so the download of reduce may fail, so the download thread of reduce will not wait endlessly. When the download still fails after a certain period of time, then the download thread will give up the download. And then try to download it from another place (because map may run again during this time). So the maximum download time period of the reduce download thread can be adjusted as follows: mapred.reduce.copy.backoff (default 300s). If the network of the cluster environment is itself a bottleneck, users can increase this parameter to prevent the reduce download thread from being misjudged as a failure. However, in the case of a better network environment, there is no need to adjust. Generally speaking, professional cluster networks should not have too many problems, so there are not many cases in which this parameter needs to be adjusted.

When 4.Reduce downloads map results locally, merge is also required, so the configuration option of mapreduce.task.io.sort.factor will also affect the behavior of reduce when merge. As mentioned above, when reduce is found to be very high in iowait in the shuffle phase, it is possible to increase the concurrent throughput of merge by increasing this parameter to optimize reduce efficiency.

5.Reduce does not write the downloaded map data to disk immediately during the shuffle phase, but caches it in memory first, and then brushes it to disk when the memory usage reaches a certain amount. This memory size control is not set by mapreduce.task.io.sort.mb like map, but by another parameter: mapreduce.reduce.shuffle.input.buffer.percent (default 0.7), which is actually a percentage, which means that the maximum amount of memory used by shuffile in reduce memory is: 0.7 × maxHeap of reduce task. That is, if a certain percentage of the maximum heap usage of the reduce task (usually set by mapreduce.reduce.java.opts, such as-Xmx1024m) is used to cache data. By default, reduce uses 70% of its heapsize to cache data in memory. If the heap of reduce is adjusted larger for business reasons, the corresponding cache size will also become larger, which is why the parameter used by reduce for caching is a percentage rather than a fixed value.

6. Assuming that the max heapsize of the mapreduce.reduce.shuffle.input.buffer.percent is 0.7 max heapsize reduce task is 1G, then the memory used for downloading data cache is about 700MB. This 700m memory, like the map side, does not wait until it is fully written before it will be brushed to the disk, but when the 700m is used to a certain limit (usually a percentage), it will start to brush to the disk. This threshold can also be set by the job parameter: mapreduce.reduce.shuffle.merge.percent (default 0.66). If the download speed is fast and it is easy to stretch the memory cache, then adjusting this parameter may be helpful to the performance of reduce.

7. When reduce has downloaded all the data on the map corresponding to its own partition, it will begin the real reduce computing phase (there is a sort phase that usually takes a very short time, which is completed in a few seconds, because the entire download phase is already downloading while sort, and then merge). When reduce task really enters the evaluation phase of the reduce function, there is a parameter that can also adjust the calculation behavior of reduce. That is: mapreduce.reduce.input.buffer.percent (default 0.0). Because reduce calculation definitely consumes memory, and when reading the data needed by reduce, memory is also needed as buffer. This parameter controls how much memory percentage is needed as the buffer percentage of data that reduce reads sort better. The default is 0, that is, by default, reduce reads and processes data all from disk. If this parameter is greater than 0, then a certain amount of data will be cached in memory and transferred to reduce. When the memory consumption of reduce computing logic is very small, it can be divided into parts of memory to cache data. Anyway, the memory of reduce is idle.

2.2 tuning of Reduce side related parameters

The default value of the option type describes the maximum number of threads for mapred.reduce.parallel.copiesint5 per reduce to download map results in parallel mapred.reduce.copy.backoffint300reduce download thread maximum waiting time (in sec) io.sort.factorint10 is the same as the percentage of reduce task heap used by mapred.job.shuffle.input.buffer.percentfloat0.7 to cache shuffle data the percentage of memory in the mapred.job.shuffle.merge.percentfloat0.66 cache starts to do merge operation mapred.job. After the completion of reduce.input.buffer.percentfloat0.0sort, the percentage of data used for caching in the reduce calculation phase is read above. Have you mastered the method of tuning parameters and principles of hadoop? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report