In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains the "Hadoop Mapreduce secondary sorting process is how", the content of the article is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in-depth, together to study and learn "Hadoop Mapreduce secondary sorting process is how" it!
1. Data flow in MapReduce
(1) the simplest process: map-reduce
(2) the process of customizing partitioner to send the results of map to the specified reducer: map-partition-reduce
(3) added a reduce (optimization) process: map-combin (local reduce)-partition-reduce.
2. The concept and use of Partition in Mapreduce.
(1) the principle and function of Partition.
After getting the records given by map, which reducer should they assign to process them? The default dispatch method used by hadoop is based on hash values, but in practice, this is not very efficient or as required to perform tasks. For example, after partition processing, one node's reducer is assigned 20 records, while the other assigns 100,000 records. Imagine the efficiency of this situation. Or, we want the processed file to be output according to a certain rule, suppose there are two reducer, we want the result of the record that begins with "h" in the part-00000 in the final result, and the result of the other beginning in the part-00001, these default partitioner cannot be done. So we need to customize the partition to select the recorded reducer according to our own requirements. Customizing partitioner is very simple, as long as you customize a class, inherit the Partitioner class, and rewrite its getPartition method. You can specify it by calling Job's setPartitionerClass when using it.
The results of Map will be distributed to Reducer via partition. The result of Mapper may be sent to Combiner for merging. Combiner does not have its own base class in the system, but uses Reducer as the base class of Combiner. Their external functions are the same, but the location and context are not the same. The key-value pairs processed by Mapper need to be sent to Reducer for merging. When merging, key / value pairs with the same key will be sent to the same Reducer. The process of assigning which key to which Reducer is specified by the Partitioner. There's only one way.
GetPartition (Text key, Text value, int numPartitions)
The input is the result pair of Map and the number of Reducer, and the output is the assigned Reducer (integer number). Is to specify which reducer the key-value pair of the Mappr output goes to. The default Partitioner of the system is HashPartitioner, which takes the module of the number of Reducer with the hash value of key to get the corresponding Reducer. This ensures that if there is the same key value, it must be assigned to the same reducre. If there are N reducer, the number is 0pm 1pm 2pm 3. (NMur1).
(2) the use of Partition
How to use Hadoop to generate a globally sorted file because of the necessity of partitioning? The easiest way is to use a partition, but this method is extremely inefficient when dealing with large files, because a machine must process all output files, thus completely losing the advantages of the parallel architecture provided by MapReduce. In fact, we can do this by first creating a series of ordered files; second, concatenating these files (similar to merge sorting); and finally getting a globally ordered file. The main idea is to use a partitioner to describe the output of the global sort. For example, we have 1000 1-10000 data, run 10 ruduce tasks, and if we run partition, we can allocate data in 1-1000 to the first reduce, 1001-2000 of the data to the second reduce, and so on. That is, the data allocated by the nth reduce is all greater than the data in the nth-1st reduce. In this way, each reduce is ordered after it comes out. As long as all the output files of the cat are turned into one large file, they will all be in order.
This is the basic idea, but now there is a problem, that is, how to divide the interval of the data, in the case of a large amount of data and we are not clear about the distribution of the data. A relatively simple method is sampling, if there are 100 million data, we can sample the data, such as taking 10000 data samples, and then partition the sampled data. In Hadoop, we can replace the default partition with TotalOrderPartitioner for patition. Then pass the result of the sampling to him, and we can achieve the partition we want. When sampling, we can use several sampling tools of hadoop, RandomSampler,InputSampler,IntervalSampler.
In this way, we can sort a large amount of data using the distributed file system, and we can also rewrite the compare function in the Partitioner class to define the rules of comparison, so that we can sort strings or other non-numeric types, or even multiple times.
2. The concept and use of grouping in MapReduce
The purpose of partitioning is to determine which Reducer the output record of the Mapper is sent to for processing based on the key value. On the other hand, the grouping is easier to understand. The author believes that grouping is related to the recorded Key. In the same partition, records with the same key value belong to the same group.
3. Use of Combiner in MapReduce
Many MapReduce programs are limited by the bandwidth available on the cluster, so it tries to minimize the intermediate data that needs to be transferred between map and reduce tasks. Hadoop allows users to declare a combiner function to handle the output of the map while taking their own processing of the map as input to the reduce. Because combiner function itself is just an optimization, hadoop does not guarantee how many times this method will be called for a certain map output. In other words, no matter how many times combiner function is called, the corresponding reduce output should be the same.
Let's take the authoritative Guide as an example, assuming that the reading of weather data in 1950 was done by two map, and the output of the first map is as follows:
(1950, 0)
(1950, 20)
(1950, 10)
The output of the second map is:
(1950, 25)
(1950, 15)
The input of reduce is (1950, [0, 20, 10, 25, 15]), and the output is (1950, 25).
Since 25 is the maximum value in the collection, we can use a combiner function similar to reduce function to find the maximum value in each map output, so that the input to reduce becomes:
(1950, [20,25])
The processing process of each funciton to the temperature value can be expressed as follows: max (0,20,10,25,15) = max (max (0,20,10), max (25,15)) = max (20,25) = 25
Note: not all functions have this property (functions with this attribute are called commutative and associative). For example, if we want to calculate the average temperature, we can't use combiner function this way, because mean (0,20,10,25,15) = 14 and mean (mean (0,20,10), mean (25,15)) = mean (10,20) = 15
Combiner function does not replace reduce function (because reduce function is still required to process records with the same key from different map). But he can help reduce the amount of data that needs to be transferred between map and reduce, and combiner function is worth considering.
4. Detailed explanation of Shuffle stage sorting process
Let's first take a look at the overall flow of sorting in MapReduce.
The MapReduce framework ensures that the input to each Reducer is sorted by Key. In general, the process of transferring sorting and the output of Map to Reduce is called shuffle. Each Map contains a circular cache, and the default 100m Magi Map first writes the output to the cache. When the content of the cache reaches the "threshold" (the default size of the threshold is 80% of the cache), a background thread is responsible for writing the results to the hard disk, a process called "spill". During the Spill process, the Map can still write the results to the cache, and if the cache is full, Map waits.
The specific process of Spill is as follows: first, the background thread groups the output results according to the number of Reducer, and each packet corresponds to a Reducer. Second, the Key of the output is sorted for each grouping background thread. During the sorting process, if there is a Combiner function, the Combiner function is called on the sort result. Each time spill generates a spill file on the hard disk. Therefore, it is possible for a Map task to produce multiple spill files, and when Map writes out the last output, all spill files are merged and sorted to output the final result file. The Combiner function is still called during this process. From the perspective of the whole process, the number of calls to the Combiner function is uncertain. Let's focus on the sorting process of the Shuffle phase:
The sorting of Shuffle phase can be understood into two parts. One is that when partitioning spill, because a partition contains multiple key values, it is necessary to sort the partitions according to key, that is, a string with the same key value is stored together, so that a partition is ordered according to the key value as a whole.
The second part is not sorting, but merge,merge twice, once the map side will merge multiple spill according to the partition and the key within the partition to form a large file. The second merge is on the reduce side, where the output merge of multiple map entering the same reduce is together. The merge is a bit complicated to understand, and eventually does not form a large file, and the data is available in both memory and disk during the period. So shuffle phase of merge is not a strict sorting meaning, just multiple overall orderly files merge into a large file, because different task implementation of map output will be different, so the results after merge is not the same every time, but still strict requirements for partition division, while each partition with the same key pairs next to each other.
Summary of Shuffle sorting: if only the map function is defined, but no reduce function is defined, then after the input data is sorted by shuffle, the output with the same key value will be next to each other, and the output with the small key value must be in front of it, so that the key value as a whole is ordered (in the macro sense, it is not necessarily from the largest to the lowest, because if the default HashPartitioner is used, the hash value of key is equal in a partition, if key is IntWritable. The key in each partition is sorted, and the value corresponding to each key is not ordered.
5. The principle and implementation of auxiliary sorting in MapReduce.
(1) Task
We need to process the sample.txt file with the following contents into the following files:
Source file: Sample.txt
Bbb 654
Ccc 534
Ddd 423
Aaa 754
Bbb 842
Ccc 120
Ddd 219
Aaa 344
Bbb 214
Ccc 547
Ddd 654
Aaa 122
Bbb 102
Ccc 479
Ddd 742
Aaa 146
Target: part-r-00000
Aaa 122
Bbb 102
Ccc 120
Ddd 219
(2) working principle
Process guidance:
1. Define a key combination that contains record values and natural values, in this case MyPariWritable.
2. Custom key comparator (comparator) to sort records according to key combinations, that is, to sort records using natural keys and natural values at the same time. (aaa 122 is combined into one key).
3. Partitioner for key combinations (the default hashPartitioner is used in this example) and grouping comparator only consider natural keys when partitioning and grouping.
Detailed process:
First, in the map phase, the input data set is divided into small data blocks splites using InputFormat defined by job.setInputFormatClass, while InputFormat provides an implementation of RecordReder. TextInputFormat is used in this example, and the RecordReder provided by him takes the line number of the line of text as key, and the text of this line as value. This is why the input of the custom Map is. Then call the map method of the custom Map and match the map methods entered into Map one by one. Note that the output should match the output defined in the custom Map
< MyPariWritable, NullWritable>. The end result is to generate a List
< MyPariWritable, NullWritable>. At the end of the map phase, job.setPartitionerClass is first called to partition the List, with each partition mapped to a reducer. Within each partition, the key set by job.setSortComparatorClass is called to compare function class sorting. As you can see, this in itself is a secondary sort. In the reduce phase, after reducer receives all the map output mapped to this reducer, it also calls the key comparison function class set by job.setSortComparatorClass to sort all data pairs. Then we start to construct a value iterator corresponding to key. Grouping is used at this point, using the grouping function class set by jobjob.setGroupingComparatorClass. As long as the two key compared by the comparator are the same, they belong to the same group (in this case, because the minimum value in each partition is required, when comparing the MyPariWritable type Key, only the natural keys need to be compared, so as to ensure that as long as the natural keys of the two MyPariWritable are the same, the Key when they are sent to the reduce end is considered to be in the same packet, because the Key of the packet only takes the first packet. If the data has been sorted according to the custom MyPariWritable comparator, the first Key contains exactly the minimum value for each natural key), their value is placed in a value iterator, and the key of this iterator uses the first key of all key belonging to the same group. Finally, enter the reduce method of Reducer, where the input to the reduce method is all the key and its value iterator. Also note that the types of inputs and outputs must be the same as those declared in the custom Reducer.
Thank you for your reading, the above is the content of "what is the secondary sorting process of Hadoop Mapreduce". After the study of this article, I believe you have a deeper understanding of the problem of how the secondary sorting process of Hadoop Mapreduce is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.