In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Based on an in-depth understanding of these components, users can easily optimize the running efficiency of jobs by adjusting some key parameters. This article will introduce how to tune the performance of Hadoop from the perspective of Hadoop administrators and users to meet their needs.
1 Overview
Hadoop performance tuning is a huge engineering work, which involves not only the performance tuning of Hadoop itself, but also the tuning of lower-level hardware, operating system and Java virtual machine. Proper tuning of these systems may improve the performance of Hadoop.
Hadoop (JobTracker, TaskTracker)
JVM
OS
Hardware (CPU Memory Network)
Performance tuning for non-Hadoop aspects, such as hardware, operating system (IOScheduler, enabling pre-read mechanism, turning off Swap, etc.), Java virtual machine. This article will focus on how to optimize the efficiency of a job by adjusting some parameters that come with Hadoop. In general, improving job efficiency requires the joint efforts of Hadoop administrators and job owners. Among them, the administrator is responsible for providing users with an efficient job environment, while users are responsible for making it run as much as possible according to the characteristics of their jobs.
At the time of writing, Apache Hadoop is mainly divided into four series: 0.20.x, 0.21.x, 0.22.x and 0.23.x, and Cloudera Hadoop is mainly divided into two series: CDH3 and CDH4, of which 0.23.x and CDH4 belong to the next generation MapReduce.
2 tuning from the point of view of administrator
2.1 hardware selection
The basic characteristics of Hadoop's own architecture determine the options of its hardware configuration. Hadoop uses master and slave architectures, where master (JobTracker and NameNode) maintains global metadata information, which is far more important than slave (TaskTracker and DataNode). In the lower version of Hadoop, master has a single point of failure, and the configuration of master should be much better than that of each slave (TaskTracker or DataNode).
2.2 tuning operating system parameters
Because of the characteristics of Hadoop, it is only suitable for using Linux as the production environment of the operating system. In the practical application scenario, the administrator optimizes the linux kernel parameters appropriately, which can improve the running efficiency of the job to a certain extent. The more useful adjustment options are as follows.
1. Increase the file descriptor and network connection limit that are opened at the same time.
In the Hadoop cluster, due to the large number of jobs and tasks involved, for a node, due to the restrictions of the operating system kernel in terms of file descriptors and the number of network connections, a large number of file read and write operations may lead to job failure. Therefore, administrators can use the ulimit command to increase the maximum number of file descriptors allowed to open at the same time to an appropriate value when starting the Hadoop cluster. At the same time, adjust the kernel parameter net.core.somaxconn to a large enough value.
In addition, Hadoop RPC uses the epoll high concurrency library. If you are using a version of the Linux kernel above 2.6.28, you need to adjust the upper limit of the epoll file descriptor appropriately.
2. Close the swap partition
In Linux, if a process does not have enough memory space, it will temporarily write part of the data in memory to disk, and then dynamically replace the data on disk into memory when needed, which will greatly reduce the execution efficiency of the process. In MapReduce distributed computing environment, users can avoid using swap partition by controlling the amount of data processed by each job and the various buffers used in the running of each task. The specific way is to adjust the vm.swappiness in the / etc/sysctl.conf file.
3. Set a reasonable pre-read buffer size
The development of the performance of disk I Pot O lags far behind that of CPU and memory, so it has become a major bottleneck of modern computer systems. Pre-read can effectively reduce the seek times of disk and the waiting time of application, and it is one of the important optimization means to improve the performance of disk read. Administrators can use the Linux command blockdev to set the size of the pre-read buffer to improve the performance of sequential reads of large files in Hadoop. Of course, you can also increase the pre-read buffer size of the Hadoop system itself.
4. File system selection and configuration
The performance of Hadoop depends largely on the read and write performance of the Linux local file system. There are many file systems to choose from in Linux, such as ext3 and ext4, and the performance of different file systems can vary.
5. Icano scheduler selection
The administrator can enable the most suitable Imax O scheduler according to the characteristics of his application.
2.3 JVM parameter tuning
Because each service and task of Hadoop runs in a single JVM, some important parameters of JVM can also affect Hadoop performance. Administrators can improve Hadoop performance through JVM FLAGS and JVM garbage collection mechanisms through tuning.
2.4 Hadoop parameter tuning
1. Rational planning of resources
A. set up a reasonable number of slots
In Hadoop, computing resources are represented by slots (slot). There are two types of Slot: Mapslot and Reduce slot. Each slot represents a certain amount of resources, and the same slot is homogeneous, that is, the same slot represents the same amount of resources. Administrators need to configure a certain number of Map slot and Reduce slot for TaskTracker according to their actual needs, thus limiting the number of concurrent Map Task and Reduce Task on each TaskTracker.
The number of slots is configured in mapred-site.xml on each TaskTracker. The specific values are: mapreduce.tasktracker.map.tasks.maxium, mapreduce.tasktracker.reduce.tasks.maxium.
B. write a health monitoring script
Hadoop allows administrators to configure a node health monitoring script for each TaskTracker. TaskTracker contains a dedicated thread that periodically executes the script and reports the results of the script execution to JobTracker through a heartbeat mechanism. Once JobTracker finds that a TaskTracker is currently in an unhealthy state (such as memory or CPU usage is too high), it will be blacklisted and no new tasks will be assigned to it (the currently executing task will still finish normally) until the script execution results show that it is healthy.
It is important to note that this mechanism is only available in versions above Hadoop0.20.2.
2. Adjust the heartbeat configuration
A. adjust the heartbeat interval
The size of the heartbeat interval between TaskTracker and JobTracker should be moderate.
If it is too small, JobTracker needs to deal with highly concurrent heartbeat information, which is bound to cause a lot of pressure.
If it is too large, the idle resources can not notify the JobTracker in time (and then allocate a new Task to it), resulting in idle resources, thus reducing the system throughput.
For small and medium-sized Hadoop clusters (less than 300nodes), shortening the heartbeat interval between TaskTracker and JobTracker can significantly improve system throughput.
In Hadoop1.0 and below, when the node cluster size is less than 300 nodes, the heartbeat interval will always be 3 seconds (cannot be modified). This means that if your cluster has 10 nodes, JobTracker only needs to process 3. 3 heartbeats per second on average. If your cluster has 100 nodes, JobTracker needs to process an average of only 33 heartbeats per second.
For an ordinary server, the load is too low to make full use of the server resources. To sum up, for small and medium-sized Hadoop clusters, the heartbeat interval of 3 seconds is too large, and the administrator can reduce the heartbeat interval appropriately as needed.
Specific configuration: mapreduce.jobtracker.heartbeat.interval.min,
Mapreduce.heartbeats.in.second 、
Mapreduce.jobtracker.heartbeats.scaling.factor.
B. Enable out-of-band heartbeat
Generally speaking, the heartbeat is sent to the JobTracker by each TaskTracker at a fixed time interval. The heartbeat contains information such as node resource usage, the running status of each task, and so on. Heartbeat mechanism is a typical pull-based model. TaskTracker periodically reports information to JobTracker through a heartbeat and acquires newly assigned tasks. This model results in a large delay in the task allocation process: when the TaskTracker has free resources, it can only pass the next heartbeat.
The out-of-band heartbeat is different from the regular heartbeat, which is triggered when the task ends or fails to run. It can notify JobTracker as soon as the idle resources are available, so that it can quickly assign new tasks to the idle resources.
Specific configuration: mapreduce.tasktracker.outofband.heartbeat=true
3. Disk block configuration
The intermediate result of Map Task needs to be written to the local disk. For the O-intensive task, this part of the data will put a lot of pressure on the local disk. Administrators can ease the write pressure by configuring multiple disks. When there are multiple available disks, Hadoop will use polling to write the intermediate results of different Map Task to these disks, thus sharing the load.
4. Set a reasonable number of RPC Handler and HTTP threads
A. Configure the number of RPC Handler
JobTracker needs to process RPC requests from various TaskTracker concurrently, and administrators can adjust the number of RPC Handler according to the size of the cluster and the concurrent processing of the server to optimize JobTracker service capacity.
Specific configuration:
Mapred.job.tracker.handler.count
Mapreduce.jobtracker.handler.count
B. Configure the number of HTTP threads
In the Shuffle phase, Reduce Task reads the Map Task intermediate results from each TaskTracker through HTTP requests, and each TaskTracker processes these HTTP requests through Jetty Server through TaskTracker. The administrator can appropriately adjust the number of worker threads of Jetty Server to improve the concurrent processing ability of Jetty Server.
Specific configuration: tasktracker.http.threads
Mapreduce.tasktracker.http.threads.
5. Be cautious in using the blacklist mechanism
When a job finishes running, it counts the number of tasks that failed on each TaskTracker.
If a TaskTracker fails more than a certain number of tasks, the job adds it to its own blacklist.
If a TaskTracker is blacklisted by a certain number of jobs, JobTracker adds the TaskTracker to the system blacklist, and JobTracker does not assign any new tasks to it until there are no failed tasks for a certain period of time.
When the size of the Hadoop cluster is small, if a certain number of nodes are frequently added to the system blacklist, the cluster throughput and computing power will be greatly reduced, so it is recommended to turn off this feature.
6. Enable batch task scheduling
In Hadoop, the scheduler is one of the most core components, which is responsible for allocating idle resources in the system to each task. At present, Hadoop provides a variety of schedulers, including default FIFO scheduler, Fair Scheduler, Capacity Scheduler and so on. The scheduling efficiency of the scheduler directly determines the throughput of the system. In general, Hadoop schedulers support batch task scheduling in order to allocate tasks to as many free resources as possible. That is, all free tasks are assigned at a time, rather than one at a time.
7. Choose the appropriate compression algorithm
Hadoop is usually used to handle I / O-intensive applications. For such applications, Map Task will output a large amount of intermediate data, which is transparent to users. If it can support intermediate data compression and storage, it will significantly improve the performance of the system.
When choosing a compression algorithm, we need to consider two factors: compression ratio and compression efficiency.
Some compression algorithms have a good compression ratio, but the decompression ratio is very low. On the contrary, some algorithms have high compression / decompression efficiency, and the compression ratio is very low at that time. Therefore, an excellent compression algorithm needs to strike a balance between compression ratio and compression efficiency.
Specific configuration:
Mapred.compress.map.out indicates whether to compress the MapTask intermediate output.
Mapred.map.output.compression.codec indicates the code used / × × ×
8. Enable the pre-read mechanism
As mentioned earlier, the pre-read mechanism can effectively improve the Imax O read performance of the disk. Because Hadoop is a typical sequential reading system, using pre-read mechanism can significantly improve HDFS read performance and MapReduce job execution efficiency, and administrators can enable pre-read function for MapReduce data copy and IFile file reading.
Specific table
Hadoop version number
Configuration parameters
Meaning
Default value
Below Apache and CDH3
This mechanism has not been introduced for the time being
-
-
Above CDH3
Mapred.tasktracker.shuffle.fadvise
Whether to enable the Shuffle pre-read mechanism
True
Mapred.tasktracker.shuffle.readhead.bytes
Shuffle pre-read buffer size
4MB
Mapred.ifile.readhead
Whether to enable the Ifile pre-read mechanism
True
Mapred.ifile.readhead.bytes
Ifile pre-read buffer size
4MB
3 tuning from the user's point of view
3.1 Application programming specification
From the user's point of view, in addition to the job configuration parameters, the way the application is written also has a great impact on performance. in the process of writing the application, keep in mind the following rules that are very helpful to improve job performance.
1. Set Combiner
For a large number of MapReduce applications, it is very helpful to improve job performance if you can set up a Combiner. Combiner reduces the intermediate output of Map Task, thereby reducing the amount of remote copy data for each Reduce Task, resulting in a reduction in Map Task and Reduce Task execution time.
2. Choose a reasonable type of Writable
In the MR model, the input and output data types of Map Task and Reduce Task are of type Writable. Hadoop itself already provides many Writable implementations, including base types and object types. Choosing the appropriate Writable type for the data type processed by the application can greatly improve performance.
For example, when dealing with integer data, using IntWritable directly is more efficient than reading it with a Text type and then converting it to an integer.
If most of the output integers can be saved in one or two bytes, you can use VIntWritable or VlongWritable directly. They adopt the variable length integer coding method, which can greatly reduce the amount of output data.
3.2 tuning of job-level parameters
1. Plan a reasonable number of tasks
The number of tasks in a job has an important impact on the running time of the job. If there are too many tasks in a job (which means that each task processes very little data and the execution time is very short), the proportion of task startup time will be greatly increased; on the other hand, the number of tasks in a job is too small (which means that each task processes a lot of data and takes a long time to execute), it may produce too much overflow data to affect the performance of the task, and the recalculation cost after task failure is too high.
In Hadoop, each Map Task processes one Input Split. The division of Input Split is determined by the user-defined InputFormat and, by default, by the following three configuration parameters:
Minimum value of mapred.min.split.size:Input Split (configured in mapred-site.xml)
Maximum value of mapred.max.split.size:Input Split (in mapred-site.xml configuration)
One block size in dfs.block.size:HDFS
For Reduce Task, the number of Reduce Task per job is usually determined by the user. Users can set the number of Reduce Task according to the estimated amount of Map Task output data to prevent the excessive amount of data in each Reduce Task from causing a large number of write disk operations.
2. Increase the number of copies of documents
If a job and a large number of tasks are executed, then the common input file of these tasks may become a bottleneck. In order to prevent multiple tasks from reading the contents of a file in parallel, users can increase the number of copies of the input file as needed.
Users can add the dfs.replication option to modify the number of file copies in the client configuration file hdfs-site.xml.
3. Enable speculative execution mechanism
Speculative execution is an optimization mechanism for Hadoop to slow down tasks. When some tasks of a job run significantly slower than other tasks of the same job, Hadoop starts a backup task for the slow task on another node, so that the two tasks process the same data at the same time, and Hadoop will eventually take the result of the priority task as the final result and kill the other task.
4. Set the tolerance for failure
Hadoop allows you to set job-and task-level tolerance for failure.
Job-level failure tolerance means that Hadoop allows a certain percentage of tasks to fail in each job, and the input data corresponding to these tasks will be ignored.
Task-level failure tolerance means that Hadoop allows a task to try to run again on another node after it fails. If a task fails after several attempts, then Hadoop will finally assume that the task failed.
5. Open the JVM reuse function appropriately.
In order to achieve task isolation, Hadoop puts each task into a separate JVM for execution, while for tasks with short execution time, JVM startup and shutdown will take a large proportion of time. For this reason, users can enable JVM reuse so that a JVM can start multiple tasks of the same type in succession.
6. Set the task timeout
In some special cases, a task may be blocked for some reason, which slows down the execution of the entire job and may even cause the job to run to an end. In response to this situation, Hadoop adds a task timeout mechanism. If a task does not report progress within a certain time interval, TaskTracker actively kills it and restarts execution on another node.
7. Rational use of DistributedCache
When a user's application needs an external file (such as a data dictionary, configuration file, and so on), it is usually necessary to use DistributedCache to distribute the file to each node. In general, there are two ways to get external files: one is to put the external files on the client side together with the application jar package, when the submitted job is uploaded to a directory in HDFS by the client, and then distributed to each node through DistributedCache. Another way is to put external files directly on the HDFS in advance. In terms of efficiency, the second method is more efficient than the first. The second method not only saves the time for the client to upload files, but also implicitly tells DistributedCache that please download the files to the public-level shared directory of each node, so that subsequent jobs can reuse the downloaded files without having to download them repeatedly, that is, one download, which will benefit for life.
8. Reasonably control the starting time of Reduce Task.
In the MR computing model, because Reduce Task depends on the execution result of Map Task, logically, Reduce Task should start later than Map Task.
In Hadoop, reasonable control of Reduce Task startup time can not only accelerate the running speed of jobs, but also improve the utilization of system resources. If ReduceTask starts too early, it may be due to the slot Hoarding phenomenon caused by ReduceTask occupying Resourceslot for a long time, which reduces resource utilization; otherwise, if ReduceTask starts too late, it will cause ReduceTask to obtain resources late and increase the job running time.
9. Skip bad records
Hadoop is used to deal with large amounts of data. For most data-intensive applications, discarding one or more pieces of data has little impact on the final result. Because of this, Hadoop provides users with the ability to skip bad records. When one or more pieces of data cause a task to fail, Hadoop can automatically identify and skip these bad records.
10. Increase the priority of jobs
All Hadoop job schedulers take job priority into account when scheduling tasks. The higher the priority of a job, the more resources it can get (slot). It should be noted that, in general, in a production environment, administrators have graded jobs according to their importance, and jobs with different degrees of importance are allowed to be configured with different priorities, and users cannot adjust them without authorization.
Hadoop provides five job priorities, namely VERY_HIGH, HIGH, NORMAL, LOW, and VERY_LOW.
3.3 tuning of task level parameters
1. Map Task tuning
The output of Map Task is temporarily stored in a ring buffer, the size of which is specified by the parameter "io.sort.mb" and the default is 100MB. The buffer consists of two main parts: the index and the actual data.
By default, the index accounts for the entire buffer so that the remaining space of the io.sort.record.percent,5%, stores all the data. Flush will be triggered once to generate a temporary file if and only if any of the following conditions are met:
A, the ratio of index space utilization (io.sort.spill.percent) is 0.8
B. The percentage of data space usage (io.sort.spill.percent) is 0.8
Reasonable adjustment of io.sort.spill.destroy value can reduce the number of intermediate files, improve the efficiency of task execution, for example, if your key/value is very small, you can adjust the io.sort.spill.destroy value appropriately to prevent the index space priority to reach the upper limit of use to trigger flush. Considering that each data record needs to occupy an index size of 16B, it is recommended that io.sort.spill.percent=16/ (16cm R), where R is the average length of each record.
2. Reduce Task tuning
Reduce Task starts multiple copy threads to read the corresponding intermediate results from each Map Task, and the specific number of threads is specified by "mapred.reduce.parallel.copies" (the default is 5).
For each file to be copied, if the file size is less than a certain threshold A, put it in memory, otherwise it will be stored on disk as a file.
If the memory file meets certain condition D, the data is written to disk, and when the number of files on disk reaches io.sort.factor (the default is 10), a merge is performed.
The threshold An is: heapsize* {mapred.job.shuffle.input.buffer.percent} * 0.25
Where heapsize is specified by the parameter "mapred.child.java.opts", and the default size is 200MB _ mapred.job.shuffle.input.buffer.buffer.
Condition D is either of the following two conditions:
When the memory utilization reaches 0.06 of mapred.job.shuffle.merge.percent
When the number of files in memory exceeds (mapred.inmem.merge.percent) 1000
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.