In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
1. What is mapreduce?
Mapreduce is a programming model, is a programming method, abstract theory.
The distributed hadoop consists of two parts, one is the distributed file system hdfs, the other is the distributed computing frame, which is mapreduce, which is indispensable, that is to say, it is easy to carry out distributed computing programming on the hadoop platform through mapreduce. MR consists of two phases, map and reduce. Users only need to implement map () and reduce () functions to realize distributed computing.
2. The working principle and implementation steps of projection MapReduce are as follows:
Shuffle originally means shuffling and confusion, similar to the Collections.shuffle (List) method in java, which randomly disrupts the order of elements in the parameter list. The Shuffle process in MapReduce.
The so-called Shuffle process can be roughly understood as: how to effectively transmit the output of map task to the reduce input. It can also be understood that Shuffle describes the process of data output from map task to reduce task input.
The figure above shows the whole process of Shuffle. In a clustered environment like Hadoop, most of the execution of map task and reduce task is on different nodes. Of course, in many cases, when Reduce executes, you need to read map task results on other nodes across nodes and store them locally. If there are a lot of job running in the cluster, the normal execution of task will consume a lot of network resources within the cluster. This kind of network consumption is normal, we can not limit, what we can do is to maximize the reduction of unnecessary consumption. In addition, in the node, compared with memory, disk IO has a greater impact on the completion time of job. Spark makes improvements to hadoop based on this point. All tasks of map and reduce are carried out in memory and are saved in memory, which is more than 100 times faster than hadoop. In terms of the most basic requirements, we want to do this for the Shuffle process:
Read data from the map task side to the reduce side completely.
When reading data across nodes, the unnecessary consumption of bandwidth is reduced as much as possible.
Reduce the impact of disk IO on task execution.
Shuffle actually includes two processes on the map side and the reduce side, which is called the first half on the map side and the second half on the reduce side.
The first half of the Shuffle process mainly includes:
1. Split process
2. Partition process: partition is the result of segmenting each node of map, which is mapped to different reduce according to key, and can be customized.
3. Overflow process
4. Merge process
Each map task has a memory buffer that stores the output of map. When the buffer is almost full, the data of the buffer needs to be stored on disk as a temporary file. When the whole map task ends, all temporary files generated by this map task in the disk are merged to generate the final official output file, and then wait for reduce task to read the data. The Shuffle process can be divided into four steps: (combined with the example of WordCount)
1. Split process: when map task executes, its input data comes from HDFS's block. Of course, in the concept of MapReduce, map task only reads split. The correspondence between Split and block may be many-to-one, and the default is one-to-one. In the WordCount example, assume that the input data for map is a string like "aaa".
2. Partiton process: after the operation of mapper, we know that the output of mapper is such a key/value pair: key is "aaa" and value is the number 1. Because the current map side only adds 1, the result set is merged in reduce task. Before we know that this job has three reduce task, which reduce should be assigned to do the current "aaa"? this is mainly decided by partition. Here's how to decide which reduce will do it.
MapReduce provides Partitioner interface, and its function is to decide which reduce task should handle the current pair of output data according to the number of key or value and reduce. The default is to key hash and then take the number of reduce task models. The default mode is only to average the processing power of reduce. If you have a need for Partitioner, you can re-implement the partition interface and set it to job.
In our example, "aaa" returns 0 after Partitioner, which means that the pair of values should be handled by the first reducer. Next, you need to write the data to the memory buffer, which is used to collect map results in bulk, reducing the impact of disk IO. Both our key/value pair and the result of Partition are written to the buffer. Of course, both key and value values are serialized into a byte array before writing.
3. Overflow process: this memory buffer is limited in size. The default is 100MB, or you can set it by setting the parameter mapreduce.task.io.sort.mb in the configuration file. When there is a lot of map task output, memory may burst, so you need to temporarily write the data in the buffer to disk under certain conditions, and then reuse the buffer. This process of writing data from memory to disk is called Spill, which can be translated into overflow writing in Chinese. The literal meaning is very intuitive. This overflow is done by a separate thread and does not affect the thread that writes the map result to the buffer. The result output of map should not be blocked when the overflow thread starts, so the entire buffer has an overflow ratio spill.percent. This ratio defaults to 0.8, that is, when the data in the buffer has reached the threshold (buffer size * spill percent = 100MB * 0.8 = 80MB), the overflow thread starts, locks the 80MB's memory, and performs the overflow process. The output of Map task can also be written to the remaining 20MB memory without affecting each other.
When the overflow thread starts, you need to Sort the key in this 80MB space. Sorting is the default behavior of the MapReduce model, and sorting here is also the sort of serialized bytes.
Here we can think about it, because the output of map task needs to be sent to different reduce sides, and the memory buffer does not merge the data to be sent to the same reduce side, then this merge should be reflected in the disk file. You can also see from the official diagram that the overflow files written to disk are merged with values on different reduce sides. So an important detail of the overflow process is that if there are many key/value pairs that need to be sent to a reduce, then these key/ values need to be spliced together to reduce the number of index records associated with partition.
When merging data for each reduce side, some data may look like this: "aaa" / 1, "aaa" / 1. For the WordCount example, we simply count the number of word occurrences. If there are many key that appear multiple times like "aaa" in the same map task, we should merge their values together. This process is called reduce or combine. However, in the terminology of MapReduce, reduce only refers to the process that the reduce side performs the process of fetching data from multiple map task for calculation. With the exception of reduce, informally merging data can only be counted as combine. In fact, as we all know, MapReduce equates Combiner with Reducer.
If client has set up Combiner, now is the time to use Combiner. Add up the key/value pairs with the same key to reduce the amount of data overwritten to disk. Combiner optimizes the intermediate results of MapReduce, so it is used multiple times throughout the model. So which scenarios can use Combiner? From this analysis, the output of Combiner is the input of Reducer, and Combiner must not change the final calculation result. So from my point of view, Combiner should only be used in scenarios where the input key/value of Reduce is exactly the same as the output key/value type and does not affect the final result. Such as accumulation, maximum, etc. The use of Combiner must be careful. If used well, it will help the efficiency of job execution, otherwise it will affect the final result of reduce.
4. Merge process: merge is to merge multiple overwritten files into one file. Each overflow will generate an overflow file on the disk. If the output of map is really large, there will be multiple overflow files on the disk if there are multiple such overwrites. When the map task is actually completed, all the data in the memory buffer is overwritten to disk to form an overwrite file. In the end, there will be at least one such overflow file on the disk (if the output of map is small, only one overflow file will be generated when the map execution is complete). Because there is only one file in the end, these overflow files need to be merged together, a process called Merge. What is Merge like? As in the previous example, the value of "aaa" is 5 when it is read from one map task and 8 when it is read from another map. Because they have the same key, they have to merge into group. What is group. For "aaa" it is like this: {"aaa", [5, 8, 2,...] }, the values in the array are read from different overflow files, and then added up. Note that because merge merges multiple overwritten files into one file, the same key may exist, and if client sets Combiner, Combiner will be used to merge the same key in the process.
At this point, all the work on the map side is done, and the resulting file is stored in a local directory within TaskTracker's reach. Each reduce task continuously obtains information about whether the map task is completed from the JobTracker through the RPC. If the reduce task is notified that the map task execution on a certain TaskTracker is complete, the second half of the Shuffle process starts.
To put it simply, the work of reduce task before execution is to constantly pull the final result of each map task in the current job, and then constantly merge the data pulled from different places, and finally form a file as the input file of reduce task.
The process of Shuffle on the reduce side can also be summarized in three points. The premise of the current reduce copy data is that it obtains from JobTracker which map task has been executed. Before Reducer actually runs, all the time is pulling data, doing merge, and doing it over and over again. As in the previous way, I also describe the Shuffle details on the reduce side in sections below:
1. Copy process, which simply pulls data. The Reduce process starts some data copy threads (Fetcher) and requests the TaskTracker where the map task is located to get the output file of the map task by HTTP. Because map task is already over, these files are managed by TaskTracker on the local disk.
2. Merge stage. The merge here is like the merge action on the map, except that the array stores the values from different copy on the map. The data from Copy will be put into the memory buffer first. The buffer size here is more flexible than that on the map side. It is based on the heap size setting of JVM. Since Reducer does not run in the Shuffle phase, most of the memory should be used by Shuffle. It should be emphasized here that merge comes in three forms: 1) memory to memory 2) memory to disk 3) disk to disk. The first form is not enabled by default, which is confusing, isn't it? When the amount of data in memory reaches a certain threshold, start the memory-to-disk merge. Similar to the map side, this is also the process of overwriting, in which if you set Combiner, it will be enabled, and then a large number of overflow files are generated on disk. The second merge mode runs until there is no data on the map side, and then starts the third disk-to-disk merge mode to generate the final file.
3. Input file for Reducer. After constant merge, a "final file" is finally generated. Why are you in quotation marks? Because this file may exist on disk or in memory. For us, of course, we want it to be stored in memory as input to Reducer, but by default, this file is stored on disk. When the input file of the Reducer has been decided, the entire Shuffle is finally finished. Then there is the Reducer execution, putting the results on the HDFS.
# Summary # #
What is the meaning of Shuffle? X
The expectations of the Shuffle process can be:
Pull the data completely from the map task side to the reduce side.
When pulling data across nodes, the unnecessary consumption of bandwidth is reduced as much as possible.
Reduce the impact of disk IO on task execution.
Each map task has a memory buffer that stores the output of the map. How to deal with the buffer data when the buffer is almost full?
Each map task has a memory buffer that stores the output of map. When the buffer is almost full, the data of the buffer needs to be stored on disk as a temporary file. When the whole map task ends, all temporary files generated by this map task in the disk will be merged to generate the final official output file, and then wait for reduce task to pull the data.
MapReduce provides a Partitioner interface. What is its purpose?
MapReduce provides Partitioner interface, and its function is to decide which reduce task should handle the current pair of output data according to the number of key or value and reduce. By default, the model is modeled by the number of key hash after reduce task. The default mode is only to average the processing power of reduce. If users have a need for Partitioner, they can customize it and set it to job.
What is overwriting?
Under certain conditions, the data in the buffer is temporarily written to disk, and then the buffer is reused. This process of writing data from memory to disk is called Spill, which can be translated into overflow in Chinese.
Why does overwriting not affect threads that write map results to the buffer?
The result output of map should not be blocked when the overflow thread starts, so the entire buffer has an overflow ratio spill.percent. This ratio defaults to 0.8, that is, when the data in the buffer has reached the threshold (buffer size * spill percent = 100MB * 0.8 = 80MB), the overflow thread starts, locks the 80MB's memory, and performs the overflow process. The output of Map task can also be written to the remaining 20MB memory without affecting each other.
When the overflow thread starts, you need to Sort the key in this 80MB space. Sorting is the default behavior of the MapReduce model, and who is the sort here?
When the overflow thread starts, you need to Sort the key in this 80MB space. Sorting is the default behavior of the MapReduce model, and sorting here is also the sort of serialized bytes.
How to deal with these key/value values if there are many key/ pairs that need to be sent to a reduce during the overflow process?
If there are many key/value pairs that need to be sent to a reduce side, then these key/ values need to be spliced together to reduce the number of index records associated with partition.
In which scenarios can you use Combiner?
The output of Combiner is the input of Reducer, and Combiner must not change the final calculation result. So from my point of view, Combiner should only be used in scenarios where the input key/value of Reduce is exactly the same as the output key/value type and does not affect the final result. Such as accumulation, maximum, etc. The use of Combiner must be careful. If used well, it will help the efficiency of job execution, otherwise it will affect the final result of reduce.
What is the purpose of Merge?
In the end, there will be at least one such overflow file on the disk (if the output of map is small, only one overflow file will be generated when the map execution is complete). Because there is only one file in the end, these overflow files need to be merged together, a process called Merge.
What protocol does each reduce task continuously use to obtain information from JobTracker about whether the map task is complete?
Each reduce task continuously obtains information about whether the map task is completed from the JobTracker through RPC.
What protocol is adopted by the Copy process in reduce?
The Copy process, which simply pulls data. The Reduce process starts some data copy threads (Fetcher) and requests the TaskTracker where the map task is located to get the output file of the map task by HTTP.
How many ways are there for the merge process in reduce?
Merge comes in three forms: 1) memory to memory 2) memory to disk 3) disk to disk. The first form is not enabled by default, which is confusing, isn't it? When the amount of data in memory reaches a certain threshold, start the memory-to-disk merge. Similar to the map side, this is also the process of overwriting, in which if you set Combiner, it will be enabled, and then a large number of overflow files are generated on disk. The second merge mode runs until there is no data on the map side, and then starts the third disk-to-disk merge mode to generate the final file.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.