In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
Next, please follow the editor to study!
Shuffle normally means shuffling or messing up. Perhaps you are more familiar with the Collections.shuffle (List) method in Java API, which randomly disrupts the order of elements in the parameter list. If you don't know what Shuffle is in MapReduce, take a look at this picture:
This is the official description of the Shuffle process. But I am sure that it is almost impossible for you to understand the process of Shuffle from this picture alone, because it is quite different from the facts and the details are confusing. I will describe the facts of Shuffle in detail later, so here you just need to know the approximate scope of Shuffle-how to effectively transmit the output of map task to the reduce side. It can also be understood that Shuffle describes the process of data output from map task to reduce task input.
In a clustered environment like Hadoop, most of the execution of map task and reduce task is on different nodes. Of course, in many cases, Reduce needs to pull map task results on other nodes across nodes. If there are a lot of job running in the cluster, the normal execution of task will consume a lot of network resources within the cluster. This kind of network consumption is normal, we can not limit, what we can do is to maximize the reduction of unnecessary consumption. And within the node, compared with memory, the impact of disk IO on job completion time is also considerable. In terms of the most basic requirements, our expectations for the Shuffle process can be: pull data completely from the map task side to the reduce side. When pulling data across nodes, the unnecessary consumption of bandwidth is reduced as much as possible. Reduce the impact of disk IO on task execution.
OK, when you see this, you can stop and think about what your design goal would be if you designed the Shuffle process yourself. I think the main thing that can be optimized is to reduce the amount of data pulled and try to use memory instead of disk.
My analysis is based on the source code of Hadoop0.21.0. If it is different from the Shuffle process you know, please do not hesitate to point out. I'll take WordCount as an example and assume it has eight map task and three reduce task. As you can see from the figure above, the Shuffle process spans both map and reduce, so I will expand it in two parts below.
First, take a look at the situation on the map side, as shown below:
The figure above may be the operation of a map task. Comparing it with the left half of the official map, you will find a lot of inconsistencies. The official chart does not clearly show at which stage partition, sort and combiner function. I drew this picture, hoping to give you a clear understanding of the whole process from map data input to map-side data preparation.
I divided the whole process into four steps. To put it simply, each map task has a memory buffer that stores the output of map. When the buffer is almost full, you need to store the data of the buffer to the disk as a temporary file. When the entire map task ends, all temporary files generated by the map task in the disk are merged to generate the final official output file, and then wait for reduce task to pull the data.
Of course, each step here may contain multiple steps and details. Let me explain the details one by one:
When map task executes, its input data comes from HDFS's block, and of course, in the concept of MapReduce, map task only reads split. The correspondence between Split and block may be many-to-one, and the default is one-to-one. In the WordCount example, assume that the input data for map is a string like "aaa".
After running mapper, we know that the output of mapper is such a key/value pair: key is "aaa" and value is the number 1. Because the current map side only adds 1, the result set is merged in reduce task. Before we know that this job has three reduce task, which reduce should hand over the current "aaa" to do, it needs to be decided now. MapReduce provides Partitioner interface, and its function is to decide which reduce task should handle the current pair of output data according to the number of key or value and reduce. By default, the model is modeled by the number of key hash after reduce task. The default mode is only to average the processing power of reduce. If users have a need for Partitioner, they can customize it and set it to job. In our example, "aaa" returns 0 after Partitioner, which means that the pair of values should be handled by the first reducer. Next, you need to write the data to the memory buffer, which is used to collect map results in bulk, reducing the impact of disk IO. Both our key/value pair and the result of Partition are written to the buffer. Of course, both key and value values are serialized into a byte array before writing. The whole memory buffer is a byte array, its byte index and key/value storage structure I have not studied. If you have a friend who has studied it, please give a general description of it.
This memory buffer is limited in size and defaults to 100MB. When there is a lot of map task output, memory may burst, so you need to temporarily write the data in the buffer to disk under certain conditions, and then reuse the buffer. This process of writing data from memory to disk is called Spill, which can be translated into overflow writing in Chinese. The literal meaning is very intuitive. This overflow is done by a separate thread and does not affect the thread that writes the map result to the buffer. The result output of map should not be blocked when the overflow thread starts, so the entire buffer has an overflow ratio spill.percent. This ratio defaults to 0.8, that is, when the data in the buffer has reached the threshold (buffer size * spill percent = 100MB * 0.8 = 80MB), the overflow thread starts, locks the 80MB's memory, and performs the overflow process. The output of Map task can also be written to the remaining 20MB memory without affecting each other. When the overflow thread starts, you need to Sort the key in this 80MB space. Sorting is the default behavior of the MapReduce model, and sorting here is also the sort of serialized bytes. Here we can think about it, because the output of map task needs to be sent to different reduce sides, and the memory buffer does not merge the data to be sent to the same reduce side, then this merge should be reflected in the disk file. You can also see from the official diagram that the overflow files written to disk are merged with values on different reduce sides. So an important detail of the overflow process is that if there are many key/value pairs that need to be sent to a reduce, then these key/ values need to be spliced together to reduce the number of index records associated with partition. When merging data for each reduce side, some data may look like this: "aaa" / 1, "aaa" / 1. For the WordCount example, we simply count the number of word occurrences. If there are many key that appear multiple times like "aaa" in the same map task, we should merge their values together. This process is called reduce or combine. However, in the terminology of MapReduce, reduce only refers to the process that the reduce side performs the process of fetching data from multiple map task for calculation. With the exception of reduce, informally merging data can only be counted as combine. In fact, as we all know, MapReduce equates Combiner with Reducer. If client has set up Combiner, now is the time to use Combiner. Add up the key/value pairs with the same key to reduce the amount of data overwritten to disk. Combiner optimizes the intermediate results of MapReduce, so it is used multiple times throughout the model. In which scenarios can Combiner be used? From this analysis, the output of Combiner is the input of Reducer, and Combiner must not change the final calculation result. So from my point of view, Combiner should only be used in scenarios where the input key/value of Reduce is exactly the same as the output key/value type and does not affect the final result. Such as accumulation, maximum, etc. The use of Combiner must be careful. If used well, it will help the efficiency of job execution, otherwise it will affect the final result of reduce.
Each overflow will generate an overflow file on the disk. If the output of map is really large, there will be multiple overflow files on the disk if there are multiple such overwrites. When the map task is actually completed, all the data in the memory buffer is overwritten to disk to form an overwrite file. What is Merge like? As in the previous example, the value of "aaa" is 5 when it is read from one map task and 8 when it is read from another map. Because they have the same key, they have to merge into group. What is group. For "aaa" it is like this: {"aaa", [5, 8, 2,...] }, the values in the array are read from different overflow files, and then added up. Note that because merge merges multiple overwritten files into one file, the same key may exist, and if client sets Combiner, Combiner will be used to merge the same key in the process. At this point, all the work on the map side is done, and the resulting file is stored in a local directory within TaskTracker's reach. Each reduce task continuously obtains information about whether the map task is completed from the JobTracker through the RPC. If the reduce task is notified that the map task execution on a certain TaskTracker is complete, the second half of the Shuffle process starts. To put it simply, the work of reduce task before execution is to constantly pull the final result of each map task in the current job, and then constantly merge the data pulled from different places, and finally form a file as the input file of reduce task. See the following figure:! [] (https://static.oschina.net/uploads/img/201609/09132813_1biy.jpg) such as the detail diagram of the map side, and the process of Shuffle on the reduce side can also be summarized by the three points marked on the diagram. The premise of the current reduce copy data is that it wants to get what map task has been executed from JobTracker. This process is not shown, and interested friends can follow it. Before Reducer actually runs, all the time is pulling data, doing merge, and doing it over and over again. As in the previous way, I also describe the Shuffle details on the reduce side in sections below:
The Copy process, which simply pulls data. The Reduce process starts some data copy threads (Fetcher) and requests the TaskTracker where the map task is located to get the output file of the map task by HTTP. Because map task is already over, these files are managed by TaskTracker on the local disk.
Merge phase. The merge here is like the merge action on the map, except that the array stores the values from different copy on the map. The data from Copy will be put into the memory buffer first. The buffer size here is more flexible than that on the map side. It is based on the heap size setting of JVM. Since Reducer does not run in the Shuffle phase, most of the memory should be used by Shuffle. It should be emphasized here that merge comes in three forms: 1) memory to memory 2) memory to disk 3) disk to disk. The first form is not enabled by default, which is confusing, isn't it? When the amount of data in memory reaches a certain threshold, start the memory-to-disk merge. Similar to the map side, this is also the process of overwriting, in which if you set Combiner, it will be enabled, and then a large number of overflow files are generated on disk. The second merge mode runs until there is no data on the map side, and then starts the third disk-to-disk merge mode to generate the final file.
The input file for Reducer. After constant merge, a "final file" is finally generated. Why are you in quotation marks? Because this file may exist on disk or in memory. For us, of course, we want it to be stored in memory as input to Reducer, but by default, this file is stored on disk. As for how to make this file appear in memory, I'll talk about performance optimization later. When the input file of the Reducer has been decided, the entire Shuffle is finally finished. Then there is the Reducer execution, putting the results on the HDFS. This is the whole process of Shuffle. There are a lot of details, and I've skipped many of them. I'm just trying to make the important points clear. Of course, I may also have a lot of understanding or presentation issues, do not hesitate to comment. I hope to constantly improve and revise this article so as to make it easy to understand and know all aspects of Shuffle after reading it. As for the specific implementation principle, you are interested to explore on your own, if it is not convenient, leave a message to me, I will study and feedback. At this point, the study of "what the MapReduce Shuffle process is like" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.