Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the ways to improve the performance of MapReduce

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "what are the ways to improve the performance of MapReduce". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

One of the services that Cloudera provides to customers is to adjust and optimize MapReduce job execution performance. MapReduce and HDFS form a complex distributed system, and they run a variety of user code, which leads to the lack of a fast and effective rule to optimize code performance. In my opinion, adjusting the operation of cluster or job is more like a doctor treating patients, identifying key "symptoms" and having different diagnosis and treatment for different symptoms.

In medicine, there is no substitute for an experienced doctor; in complex distributed systems, this is still true-experienced users and operators have a "sixth sense" in the face of many common problems. I have solved problems for Cloudera customers in different industries, and the workload, data sets and cluster hardware they face are very different, so I have accumulated a lot of experience in this area and would like to share these experiences with you.

In this blog, I will highlight suggestions to improve the performance of MapReduce. Some of the previous suggestions are for the entire cluster, which may be helpful to cluster operators and developers. The latter part of the advice is for developers who write MapReduce job in Java. In each suggestion, I list some "symptoms" or "diagnostic tests" to illustrate some improvements to these problems, which may be helpful to you.

Please note that these suggestions contain a lot of intuitive experience that I have learned from various scenarios in the past. They may not be suitable for the particular workload, dataset, or cluster you face, and if you want to use it, you need to test its performance in your cluster environment before and after use. For these suggestions, I will show some comparative data, which is generated in a 4-node cluster to run 40GB's Wordcount job. After applying the suggestions I mentioned below, each map task in this job runs for about 33 seconds, and the job executes for a total of about 8 minutes and 30 seconds.

The first point is to configure your Cluster correctly

Diagnostic results / symptoms:

1. The result of the Linux top command shows that the slave node is still idle when all map and reduce slot have task running.

2.The top command shows that kernel processes, such as RAID (mdX_raid*) or pdflush, take up a lot of CPU time.

3. The average load of Linux is usually twice the number of CPU of the system.

4. Even if the system is running job,Linux, the average load is always kept at half the number of CPU in the system.

5. Swap utilization on some nodes exceeds several MB

The first step in optimizing your MapReduce performance is to make sure that your entire cluster profile has been adjusted. For novices, please refer to the blog: configuration parameters article here on configuration parameters. In addition to these configuration parameters, when you want to modify the job parameters to improve performance, you should refer to some of my items that you should pay attention to:

1. Make sure that the storage mount you are using in DFS and MapReduce is set to the noatime option. This setting will not start recording disk access time and will significantly improve the performance of IO.

two。 Avoid performing RAID and LVM operations on TaskTracker and DataNode machines, which usually degrades performance

3. The values configured in these two parameters mapred.local.dir and dfs.data.dir should be distributed in directories on each disk, so that you can make full use of the node's IO read and write capabilities. Running the iostat-dx 5 command under the Linux sysstat package allows each disk to show its utilization.

4. You should have a smart monitoring system to monitor the health of disk devices. MapReduce job is designed to tolerate disk failures, but disk exceptions can cause some task repeats to degrade performance. If you find that a TaskTracker is blacklisted in many job, then it may have a problem.

5. Use tools like Ganglia to monitor and chart the utilization of swap and the network. If you can see from the monitoring diagram that the machine is using swap memory, reduce the memory allocation represented by the mapred.child.java.opts attribute.

Benchmark:

It's a pity that I can't generate some test data for this suggestion, because it requires building the entire cluster. If you have relevant experience, please attach your suggestions and results to the message area below.

The second point uses LZO compression

Diagnostic results / symptoms:

1. It is a good idea to use compression for the intermediate result data of job.

2. The output data size of MapReduce job can not be ignored.

3. When job is running, you can see that the iowait utilization of slave nodes is very high through the linux top and iostat commands.

Almost every Hadoop job can achieve better spatial benefits by compressing the intermediate data output from map task by LZO. Although LZO compression increases the load on some CPU, it reduces the amount of data on disk IO during the shuffle process, which generally saves time.

When a job needs to output a large amount of data, the application of LZO compression can improve the output performance. This is because by default each file output will save 3 formats, 1GB output file you will save 3GB disk data, when using compression, of course, can save space and improve performance.

In order for LZO compression to be effective, set the parameter mapred.map.output value to true.

Benchmark:

In my cluster, if LZO compression is not used in the Wordcount example, the run time of job is only slightly increased. However, the FILE_BYTES_WRITTEN counter increases from 3.5GB to 9.2GB, which means that compression reduces disk IO by 62%. In my cluster, the ratio of the number of disks on each data node to the number of task is very high, but Wordcount job is not shared across the entire cluster, so IO in cluster is not a bottleneck, and disk IO growth will not be a big problem. However, in an environment where disks are limited by many concurrent activities, a 60% reduction in disk IO can significantly increase the speed of job execution.

Third, adjust the number of map and reduce task to the appropriate value.

In my own experience, generally speaking, it is impossible to run a job and change the size of the hdfs block of the entire cluster. Parameters are usually set when the job is submitted.

Job.setMapOutputValueClass (IntWritable.class)

Job.setNumReduceTasks (1)

/ / set the minimum shard to 512m

FileInputFormat.setMinInputSplitSize (job, 1024,1024,512)

FileInputFormat.addInputPath (job, new Path ("/ usr/keyword/input"))

Diagnostic results / symptoms:

1. The completion time for each map or reduce task is less than 30 to 40 seconds.

two。 Large job cannot take full advantage of all the free slot in cluster.

3. Most map or reduce task are scheduled for execution, but one or two task are still ready and executed separately after the other task is completed

Adjusting the number of map and reduce task in job is an important and often overlooked thing. Here are some of my intuitive experiences in setting these parameters:

1. If the execution time of each task is less than 30 to 40 seconds, the number of task is reduced. The creation and scheduling of Task usually takes a few seconds, and if the task is completed quickly, we are wasting time. At the same time, setting JVM reuse can also solve this problem.

two。 If the input data of a job is larger than 1TB, we increase the block size to 256or 512, which reduces the number of task. You can use this command to modify the block size of the existing file: hadoop distcp-Ddfs.block.size=$ [256 / 1024 / 1024] / path/to/inputdata / path/to/inputdata-with/largeblocks. After executing this command, you can delete the original input file (/ path/to/inputdata).

3. As long as each task runs for at least 30 to 40 seconds, increase the number of map task to several times the total number of map slot on the entire cluster. If you have 100 map slot in your cluster, avoid running a job with 101 map task-if you do, the first 100 map will be executed at the same time, and the 101st task will be run separately before reduce execution. This advice is important for small cluste and small job.

4. Don't schedule too much reduce task-for most job, we recommend that the number of reduce task should be equal to or slightly less than the number of reduce slot in cluster.

Benchmark:

In order for Wordcount job to have a lot of task running, I set the following parameters: Dmapred.max.split.size=$ [160240241024]. In the past, there were 360 map task generated by default, but now there are 2640. When this setting is complete, each task execution takes 9 seconds, and as you can see in JobTracker's Cluster Summar view, the number of map task running fluctuates between 0 and 24. The job ends in 17 minutes and 52 seconds, more than twice as slow as the original execution.

Fourth, add a Combiner for job

Diagnostic results / symptoms:

1. When job performs classified aggregations, the REDUCE_INPUT_GROUPS counter is much smaller than the REDUCE_INPUT_RECORDS counter.

2. Job performs a large shuffle task (for example, the output data of map is several GB per node).

3. As you can see from the job counter, SPILLED_RECORDS is much larger than MAP_OUTPUT_RECORDS.

If your algorithm involves the aggregation of some categories, you can use Combiner to do the initial aggregation of the data before it reaches the reduce side. The MapReduce framework wisely uses Combiner to reduce the amount of data written to disk and transferred over the network to the reduce side.

Benchmark:

I delete the call to the setCombinerClass method in the Wordcount example. This modification alone increases the average running time of map task from 33 seconds to 48 seconds, and the data volume of shuffle from 1GB to 1.4GB. The running time of the entire job has almost tripled from 8 minutes 30 seconds to 15 minutes 42 seconds. The compression function of map output is enabled during this test, and if this compression function is not enabled, then the impact of Combiner will become more obvious.

Fifth, use the most appropriate and concise Writable type for your data.

Diagnosis / symptoms:

1. Text objects are used in non-text or mixed data.

two。 Use IntWritable or LongWritable objects when most of the output values are small.

When a developer is writing MapReduce for the first time, or moving from developing Hadoop Streaming to Java MapReduce, they often use Text objects when they are not necessary. Although the Text object is convenient to use, it is inefficient and consumes a lot of CPU time when converting from numeric value to text or from UTF8 string to text. When dealing with non-text data, you can use binary Writable types, such as IntWritable, FloatWritable, and so on.

In addition to avoiding the consumption of file conversion, binary Writable types take up less space as an intermediate result. When disk IO and network transfer become bottlenecks for large job, reducing the size of intermediate results can achieve better performance. When dealing with shaping values, it may sometimes be faster to use VIntWritable or VLongWritable types-these types that implement variable length shaping codes save more space when serializing decimal values. For example, the integer 4 is serialized into a single byte, and the integer 10000 is serialized into two bytes. These variable length types are more efficient for tasks such as statistics, in which we just make sure that most of the records are a small value so that the values can match one or two bytes.

If the Writable type that comes with Hadoop does not meet your needs, you can develop your own Writable type. This should be quite simple and may be faster in dealing with text. If you have written your own Writable type, be sure to provide a RawComparator class-you can use the built-in Writable type as an example.

Benchmark:

For the Wordcount example, I changed its intermediate variable in the map count from IntWritable to Text. And use Integer.parseString (value.toString) to convert the real value in the final sum of reduce statistics. This version is nearly 10% slower than the original version-the entire job is completed in almost 9 minutes, and each map task runs in 36 seconds, which is slower than the previous 33 seconds. Try to look like plastic surgery is fast, but that doesn't mean anything. Under normal circumstances, I have seen examples where choosing the right Writable type can result in a 2-to 3-fold performance improvement.

Sixth, reuse Writable types

Diagnosis / symptoms:

1. Add-verbose:gc-XX:+PriintGCDetails to the mapred.child.java.opts parameter, and then view some task logs. If garbage collection works frequently and takes some time, you need to pay attention to useless objects.

two。 Search your code for "new Text" or "new IntWritable". This advice can be useful if they appear in an internal loop or inside the map/reduce method.

3. This advice is especially useful in situations where task memory is limited.

A common mistake made by many MapReduce users is to create a Writable object for each output in a map/reduce method. For example, your Wordcout mapper method might write:

Java code

Public void map (...) {

...

For (String word: words) {

Output.collect (new Text (word), new IntWritable (1))

}

}

This causes the program to allocate thousands of short-cycle objects. The Java garbage collector has to do a lot of work for this. A more effective way to write it is:

Java code

Class MyMapper... {

Text wordText = new Text ()

IntWritable one = new IntWritable (1)

Public void map (...) {

For (String word: words) {

WordText.set (word)

Output.collect (wordText, one)

}

}

}

Benchmark:

When I modified the Wordcount example with the above description, at first I found that the job runtime was no different from that before the modification. This is because by default in my cluster, each task is allocated a heap size of one 1GB, so the garbage collection mechanism is not started. When I reset the parameters to allocate only the 200MB heap for each task, there was a severe slowdown in this version of execution that did not reuse Writable objects-the execution time of job has changed from about 8 minutes 30 seconds to more than 17 minutes now. The original version that reuses Writable maintains the same execution speed when setting up a smaller heap. So reusing Writable is a simple problem fix, and I recommend that you do it all the time. It may not get very good performance in every job execution, but it makes a big difference when your task has memory limitations.

Seventh, use a simple profiling method to view the operation of task.

This is a trick I often use when looking at MapReduce job performance issues. Those who don't want to do this will object that it won't work, but the truth is right in front of you.

For easy profiling, you can use the ssh tool to connect to the task tracker machine where task is located when some task in job is slow. Execute this simple command sudo killall-QUIT java 5 to 10 times (at intervals of several seconds). Don't worry, don't be intimidated by the name of the command, it won't cause anything to quit. Then use the JobTracker interface to jump to the stdout file of a task on that machine, or view the stdout file of the task in the / var/log/hadoop/userlogs/ directory on the running machine. You can see the dump file of the stack trace information generated by the SIGQUIT signal sent to JVM when you execute that command. There is a table of Cluster Summary on the interface of JobTracker, go to the Nodes link, select the server where you execute the above command, there is Local Logs at the bottom of the interface, click LOG to enter, and then select the userlogs directory. Here you can see several directories named after the jobID executed by server. No matter which directory you enter, you can see many lists of task. There is a stdout file in the log of each task, if the file is not empty. So this file is what the author calls a stack information file)

It takes a little bit of experience to parse and process this output file. Here I'll show you how to handle it:

For each thread in the stack message, quickly look up the name of your java package (if it is com.mycompany.mrjobs). If you don't find any information about your code in the stack information of your current thread, skip to another thread.

If you see the code you are looking for in some stack information, quickly look it up and roughly write down what it is doing. If you see some information related to NumberFormat, you need to write it down at this time, and you don't need to pay attention to which lines of code it is.

Go to the next dump in the log, then take some time to do something similar and write down some of your concerns.

After looking at four or five stacks of information, you may realize that there is something familiar every time you look it up. If these problems you are aware of are the reasons that prevent your program from getting faster, then you may have found the real problem with the program. If you take stack information from 10 threads and see NumberFormat-like information in 5, it may mean that you waste 50% of CPU on data format conversion.

Of course, it's not as scientific as you use a real analysis program. But I've found this to be an effective way to spot obvious CPU bottlenecks when you don't need to introduce other tools. More importantly, it's a technology that makes you stronger, and you'll know what a normal and problematic dump looks like in practice.

Through this technique, I found some misunderstandings that usually occur in performance tuning, which are listed below.

1. NumberFormat is quite slow, so try to avoid using it.

2. String.split- whether encoding or decoding UTF8 strings is slower than you can imagine-refer to the above advice and use the appropriate Writable type.

3. Use StringBuffer.append to connect strings

This is the end of the content of "what are the ways to improve the performance of MapReduce". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report