Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does MapReduce work?

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you how MapReduce works. It is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

I started to talk about mapreduce,mapreduce as the computing framework of hadoop. I started learning hadoop from hive, and then to hdfs. When I learned hdfs, I felt the close relationship between hdfs and mapreduce. This may be related to the way I do technical research. When I start learning a certain set of technologies, I always think about what this technology can do. Only when I really understand what problems this technology solves, my subsequent learning can be gradually accelerated. While learning hdfs, I found that to understand the meaning of the hadoop framework, hdfs and mapreduce are inseparable, so when I write distributed file systems. I always feel that my understanding is superficial. Today I began to write mapreduce.

Preliminary Analysis of Mapreduce

Mapreduce is a computing framework, since it is a computing framework, then the form of expression is to have an input (input), mapreduce operation of this input (input), through its own defined computing model, get an output (output), this output is the result we need.

What we need to learn is the running rules of this computing model. When running a mapreduce computing task, the task process is divided into two phases: the map phase and the reduce phase, each using key-value pairs (key/value) as input (input) and output (output). What the programmer needs to do is to define the functions of these two stages: the map function and the reduce function.

Basic example of Mapreduce

Before explaining how mapreduce works, let's first take a look at the hello world instance WordCount in mapreduce. This instance can be found in any version of hadoop installer, and it is easy to find. Here, I will post the code for my later explanation. The code is as follows:

/ * *

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "ASIS" BASIS

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

, /

Package org.apache.hadoop.examples

Import java.io.IOException

Import java.util.StringTokenizer

Import org.apache.hadoop.conf.Configuration

Import org.apache.hadoop.fs.Path

Import org.apache.hadoop.io.IntWritable

Import org.apache.hadoop.io.Text

Import org.apache.hadoop.mapreduce.Job

Import org.apache.hadoop.mapreduce.Mapper

Import org.apache.hadoop.mapreduce.Reducer

Import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

Import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

Import org.apache.hadoop.util.GenericOptionsParser

Public class WordCount {

Public static class TokenizerMapper

Extends Mapper {

Private final static IntWritable one = new IntWritable (1)

Private Text word = new Text ()

Public void map (Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer (value.toString ())

While (itr.hasMoreTokens ()) {

Word.set (itr.nextToken ())

Context.write (word, one)

}

}

}

Public static class IntSumReducer

Extends Reducer {

Private IntWritable result = new IntWritable ()

Public void reduce (Text key, Iterable values

Context context

) throws IOException, InterruptedException {

Int sum = 0

For (IntWritableval: values) {

Sum + = val.get ()

}

Result.set (sum)

Context.write (key, result)

}

}

Public static void main (String [] args) throws Exception {

Configuration conf = new Configuration ()

String [] otherArgs = new GenericOptionsParser (conf, args) .getRemainingArgs

If (otherArgs.length! = 2) {

System.err.println ("Usage: wordcount")

System.exit (2)

}

Job job = new Job (conf, "word count")

Job.setJarByClass (WordCount.class)

Job.setMapperClass (TokenizerMapper.class)

Job.setCombinerClass (IntSumReducer.class)

Job.setReducerClass (IntSumReducer.class)

Job.setOutputKeyClass (Text.class)

Job.setOutputValueClass (IntWritable.class)

FileInputFormat.addInputPath (job, new Path (otherArgs [0]))

FileOutputFormat.setOutputPath (job, new Path (otherArgs [1]))

System.exit (job.waitForCompletion (true)? 0: 1)

}

}

How to run it, here do not repeat, we can Baidu, a lot of information in this area on the Internet. The example code here is to use the new api. You may see in many books that the WordCount instances that explain mapreduce are the old version of api. Here, I will not give the old version of api, because the old version of api is not recommended. It is best to use the new version of api for development. What is the difference between the new version of api and the old version of api:

The new api is on: org.apache.hadoop.mapreduce, and the old version of api is on: org.apache.hadoop.mapred

The new version of api uses virtual classes, while the old version uses interfaces, and virtual classes are more conducive to expansion. This is an experience, and you can learn from this experience of hadoop.

There are a lot of other differences, all of which illustrate the advantages of the new version of api, because I advocate using the new version of api, and I won't talk about it here, because there's no need to use the old version, so this comparison doesn't make much sense.

I will briefly explain the code below. You can see that we are going to write a mapreduce program, and we are implementing a map function and a reduce function. Let's take a look at map's approach:

Public void map (Object key, Text value, Context context) throws IOException, InterruptedException {… }

There are three parameters, the first two Object key, Text value is the input key and value, and the third parameter Context context is the input key and value, for example: context.write (word, one). In addition, context also records the status of the map operation.

For the reduce function:

Public void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {… }

The input of the reduce function is also in the form of a key/value, but its value is the form of an iterator Iterable values, that is to say, the input of reduce is a value,reduce corresponding to a set of values of key, and the context of context and map is the same.

As for the logic of computing, it is the programmer's own implementation.

Here's the call to the main function, which I'm going to talk about in detail, first of all:

Configuration conf = new Configuration ()

Initialize Configuration before running the mapreduce program. This class mainly reads the mapreduce system configuration information, including hdfs and mapreduce, that is, the information in the configuration files when installing hadoop, such as core-site.xml, hdfs-site.xml, mapred-site.xml, and so on. Some children's shoes do not understand why they want to do this. This is because we programmers are just filling in the blanks when developing mapreduce. Write the actual business logic in the map function and the reduce function, and the rest of the work is left to the mapreduce framework to operate, but at least we have to tell it how to operate, such as where hdfs is, where mapreduce's jobstracker is, and the information is in the configuration file under the conf package.

The next code is:

String [] otherArgs = new GenericOptionsParser (conf, args). GetRemainingArgs (); if (otherArgs.length! = 2) {System.err.println ("Usage: wordcount"); System.exit (2);}

The sentence of If is easy to understand, that is, when running the WordCount program, there must be two parameters, if not, it will report an error to exit. As for the GenericOptionsParser class in the first sentence, it is used to explain commonly used hadoop commands and set the corresponding value for the Configuration object as needed, in fact, we do not usually use it in development, but let the class achieve the Tool interface, and then use ToolRunner to run the program in the main function, while GenericOptionsParser will be called inside ToolRunner.

The next code is:

Job job = new Job (conf, "word count"); job.setJarByClass (WordCount.class); job.setMapperClass (TokenizerMapper.class); job.setCombinerClass (IntSumReducer.class); job.setReducerClass (IntSumReducer.class)

The first line is to build a job. In the mapreduce framework, a mapreduce task is also called a mapreduce job is also called a mapreduce job, and the specific map and reduce operation is task. Here we build a job, which has two parameters, one is conf, which is not repeated, and the other is the name of the job.

The second line is to load the calculation program written by the programmer, for example, our program class name is WordCount. Here I want to correct, although we write mapreduce programs only need to implement map functions and reduce functions, but the actual development we need to implement three classes, the third class is to configure how mapreduce runs map and reduce functions, to be exact, is to build a job that mapreduce can execute, such as the WordCount class.

The third and fifth lines are loaded with the map function and the reduce function implementation class, and there is a fourth line here. This is loading the Combiner class. I will talk about this later when I talk about the mapreduce operating mechanism. In fact, it doesn't matter to remove the fourth line in this example, but using the fourth line will be more efficient in theory.

The following code:

Job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class)

This is the type of key/value that defines the output, that is, the type of key/value that ultimately stores the result file on the hdfs.

The final code is:

FileInputFormat.addInputPath (job, new Path (otherArgs [0])); FileOutputFormat.setOutputPath (job, new Path (otherArgs [1])); System.exit (job.waitForCompletion (true)? 0: 1)

The first line is to build the input data file, the second line is to build the output data file, and the last line is if job runs successfully, our program will exit normally. FileInputFormat and FileOutputFormat are very knowledgeable, and I'll explain them in the mapreduce operating mechanism below.

All right, the hello word program in mapreduce has been explained, and my explanation is carried out from the new api. This set of explanation is still relatively few on the network and should be very representative.

Mapreduce operation mechanism

Now I would like to talk about the operating mechanism of mapreduce. Not long ago, I issued a set of hadoop interview questions for the company, in which I asked about the operating mechanism of mapreduce. When I issued the question, I found that I myself would not be very clear about this problem, so I have made it up in recent days. I hope I can explain this problem clearly in this article.

Below I post some pictures, these are the better pictures I found in Baidu pictures:

Picture 1:

Picture 2:

Picture 3:

Picture 4:

Picture 5:

Picture 6:

I like to look at pictures very much when I study technology now. Every time I have a new understanding, I will look at the pictures, and every time I will make new discoveries.

The running mechanism of mapreduce can be described from many different angles, for example, from the running process of mapreduce, or from the logical flow of the computing model. Some people may have a deep understanding of the running mechanism of mapreduce and will describe it from a better point of view, but there are some things that cannot be avoided, that is, the instance objects involved one by one, and one is the logical definition stage of the computing model. I will not start from the process here, but from the objects involved, whether they are physical or logical entities.

First, let's talk about physical entities. Participating in the execution of mapreduce jobs involves four separate entities:

Client (client): write mapreduce programs, configure jobs, submit jobs, this is what programmers do

JobTracker: initialize jobs, assign jobs, communicate with TaskTracker, coordinate the execution of the whole job

TaskTracker: maintain communication with JobTracker and execute Map or Reduce tasks on assigned data fragments. A very important aspect of the difference between TaskTracker and JobTracker is that when executing tasks, there can be more than n TaskTracker and only one JobTracker. (there can be only one JobTracker, which has a single point of failure like the namenode in hdfs, which I will talk about in the following questions about mapreduce.)

Hdfs: save job data, configuration information, etc., and the final result is also saved on hdfs

So how exactly does mapreduce work?

First of all, the client needs to write a mapreduce program, configure the mapreduce job, that is, job, and then submit the job. The submission job is submitted to the JobTracker. At this time, JobTracker will build the job, specifically assigning the ID value of a new job task, and then it will do a check operation, which is to determine whether the output directory exists. If it exists, then job will not run normally. JobTracker will throw an error to the client, and then check whether the input directory exists. If there is no error, JobTracker will calculate the input shard (Input Split) based on the input, and if the shard cannot be calculated, it will throw an error. As for the input shard, I will explain later. When all these are done, JobTracker will configure the resources needed by Job. After allocating resources, JobTracker initializes the job. The main task of initialization is to put Job into an internal queue so that the configured job scheduler can schedule the job, and the job scheduler initializes the job. Initialization is to create a running job object (encapsulating tasks and recording information), so that JobTracker can track the status and process of job. After initialization, the job scheduler gets the input shard information (input split), and each shard creates a map task. Next is the task assignment. At this time, tasktracker will run a simple loop mechanism to send the heartbeat to jobtracker periodically. The heartbeat interval is 5 seconds. Programmers can configure this time. Heartbeat is the bridge between jobtracker and tasktracker. Through heartbeat, jobtracker can monitor whether tasktracker is alive, and can also obtain the status and problems processed by tasktracker. At the same time, tasktracker can also obtain the operation instructions given by jobtracker through the return value in heartbeat. After the task is assigned, the task will be carried out. During the execution of a task, jobtracker can monitor the status and progress of tasktracker through the heartbeat mechanism, and it can also calculate the status and progress of the entire job, while tasktracker can also monitor its own status and progress locally. When jobtracker is notified of the success of the last tasktracker operation to complete the specified task, jobtracker will set the entire job status to successful, and then when the client queries the running status of job (note: this is an asynchronous operation), the client will find the notification of job completion. If job fails halfway, mapreduce will also have a corresponding mechanism to deal with it. Generally speaking, if not the programmer program itself has an bug,mapreduce error handling mechanism, it can ensure that the submitted job can be completed normally.

Let me explain the mapreduce operation mechanism from the perspective of logical entities, which in chronological order include: input shard (input split), map phase, combiner phase, shuffle phase, and reduce phase.

Input shard (input split): before map calculation, mapreduce calculates the input shard (input split) according to the input file. Each input shard (input split) is for a map task. The input shard (input split) stores not the data itself, but an array of shard length and a location where the data is recorded. The input shard (input split) is often closely related to the block (block) of the hdfs. If we set the block size of hdfs to 64mb, and if we input three files, the sizes are 3mb, 65mb and 127mb, then mapreduce will divide the 3mb file into one input shard (input split), 65mb is two input shards (input split), and 127mb is also two input shards (input split). In other words, if we make input shard adjustment before map calculation, such as merging small files, then five map tasks will be performed. And the data size of each map is uneven, which is also a key point of mapreduce optimization calculation.

Map phase: the programmer wrote the map function, so the efficiency of the map function is relatively easy to control, and generally the map operation is localized, that is, it is carried out on the data storage node.

Combiner phase: the combiner phase is optional for programmers, and combiner is actually a reduce operation, so we see that the WordCount class is loaded with reduce. Combiner is a localized reduce operation, which is a follow-up operation of map operation, mainly to do a simple operation of merging and repeating key values before map calculates the intermediate files. For example, we do statistics on the frequency of words in the file. If we encounter a hadoop word in map calculation, it will be recorded as 1, but in this article, hadoop may appear many times, so the map output file will be redundant a lot. Therefore, do a merge operation on the same key before reduce calculation, then the file will become smaller, thus improving the transmission efficiency of broadband. After all, the broadband resource of hadoop computing power is often the bottleneck of computing and the most valuable resource, but the combiner operation is risky. The principle of using it is that the input of combiner will not affect the final input of reduce calculation. For example, if the calculation is just to calculate the total, the maximum value, the minimum value can be used combiner. But if you use combiner to do the average calculation, the final reduce calculation will go wrong.

Shuffle phase: the process of taking the output of map as the input of reduce is shuffle, which is the focus of mapreduce optimization. I'm not going to talk about how to optimize the shuffle phase here, but I'm going to talk about the principle of the shuffle phase, because the shuffle phase is not clear in most books. Shuffle starts with the output operation in the map stage. Generally speaking, mapreduce calculates huge amounts of data, so it is impossible for map to put all files into memory, so the process of writing map to disk is very complicated, not to mention the map output has to sort the results, memory overhead is very large, map will open a ring memory buffer in memory when doing output, this buffer is specially used for output. The default size is 100mb, and a threshold is set for the buffer in the configuration file. (both the size and threshold can be configured in the configuration file). At the same time, map also starts a daemon thread for the output operation. If the memory of the buffer reaches 80% of the threshold, the daemon thread will write to disk, a process called spill. The other 20% of the memory can continue to write data to be written to disk. Writing to disk and writing to memory do not interfere with each other. If the cache is full, map will block the operation of writing to memory and let the operation of writing to disk be completed before continuing to write to memory. As I mentioned earlier, there will be a sort operation before writing to disk, which is done when writing to disk. Not when writing to memory, if we define the combiner function, then the combiner operation will be performed before sorting. Every time the spill operation is written to disk, an overflow file will be written, that is, how many overflow files will be generated by spill after several times of map output, and when the map output is all done, map will merge these output files. In this process, there will also be a Partitioner operation, for which many people are very confused. In fact, the Partitioner operation is very similar to the input slicing (Input split) in the map stage. A Partitioner corresponds to a reduce job. If our mapreduce operation has only one reduce operation, then there is only one Partitioner. If we have multiple reduce operations, then there will be multiple Partitioner corresponding to it. Partitioner is therefore the input shard of reduce, which the programmer can program to control. It is mainly based on the actual values of key and value, according to the actual business type or for better reduce load balancing requirements, which is a key to improve the efficiency of reduce. When it comes to the reduce stage, the map output file is merged. Partitioner will find the corresponding map output file, and then carry out the copy operation. During the copy operation, reduce will start several replication threads. The default number of these threads is 5. Programmers can also change the number of replication threads in the configuration file. This replication process is similar to the process of writing map to disk. It also has a threshold and memory size. The threshold can also be configured in the configuration file. The memory size is the memory size of reduce's tasktracker directly. When copying, reduce will also sort and merge files. After these operations, reduce calculation will be carried out.

The reduce phase: like the map function, it is written by the programmer, and the final result is stored on the hdfs.

Problems related to Mapreduce

Here I would like to talk about some of the problems I think about in learning mapreduce, which are all problems that I have come up with to explain, but whether some problems are right in the end needs to be confirmed by the majority of children's shoes.

Jobtracker's single point of failure: jobtracker and hdfs's namenode also have a single point of failure, single point of failure has always been criticized by hadoop, why hadoop's file system and mapreduce computing framework are highly fault-tolerant, but the most important management node fault mechanism is so bad, I think the main reason is that namenode and jobtracker operate in memory in actual operation, and memory fault tolerance is more complex. Fault tolerance is easy to do only when memory data is persisted. Both namenode and jobtracker can back up their persistent files, but this persistence will be delayed, so it really fails, but it still can not be recovered as a whole. In addition, hadoop framework contains zookeeper framework, zookeeper can be combined with jobtracker, and jobtracker can be deployed with several machines at the same time to ensure that one machine fails, and one can be added immediately, but this way can not restore the running mapreduce task.

When doing mapreduce calculation, the output is usually a folder, and the folder cannot exist. I mentioned this problem when I presented the test question, and this check was done very early, when we submitted the job. The reason why mapreduce is so designed is to ensure data reliability. If there is a reduce in the output directory, it is not clear whether you want to append or overwrite it. Both append and overwrite operations may lead to problems in the final result. Mapreduce does massive data calculations, and the cost of a production calculation is very high. For example, a job may take several hours to complete, so mapreduce is zero-tolerant for all cases that affect errors.

Mapreduce also has an InputFormat and OutputFormat. When we write the map function, we find that the parameters of the map method operate between rows of data and do not involve InputFormat. These things are done by the mapreduce computing framework for us when we new Path, and OutputFormat is also done for us by reduce. What kind of input file we use, we have to call what kind of InputFormat,InputFormat is related to the type of file we enter. The InputFormat commonly used in mapreduce has FileInputFormat plain text files. SequenceFileInputFormat refers to the serialization file of hadoop, as well as KeyValueTextInputFormat. OutputFormat is the file format we want to finally store on the hdfs system, which is defined according to your needs. Hadoop supports many file formats, which are not listed here. If you want to know Baidu, you will see it.

The above is how MapReduce works. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report