Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the principle of MapReduce

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

In this issue, the editor will bring you how to analyze the principle of MapReduce. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

1. The logical process of Map-Reduce

Suppose we need to process a batch of weather data in the following format:

Stored according to ASCII code, one record per line

Each line of characters is counted from 0, and the 15th to 18th characters are years.

The 25th to 29th characters are temperature, of which the 25th is the symbol + /-

00670119909999919500515070000+

0043011990999991950051512120022 +

0043011990999991950051518-0011 +

00430126509999919490324120111+

00430126509999919490324180078 +

00670119909999919370515070001 +

0043011990999991937051512-0002+

0043011990999991945051518,0001 +

00430126509999919450324120002 +

00430126509999919450324180078 +

Now we need to figure out the highest temperature of the year.

Map-Reduce mainly includes two steps: Map and Reduce.

At each step, there are key-value pairs as input and output:

The format of the key-value pair in the map phase is determined by the input format. If it is the default TextInputFormat, each line is processed as a recording process, where key is the beginning of the line relative to the beginning of the file, and value is the character text of the line.

The format of the output key-value pair of the map phase must correspond to the format of the input key-value pair of the reduce phase

For the above example, in the map process, the key-value pairs entered are as follows:

(000670119909999919500515070000+)

(33,0043011990999991950051512000022 +)

(66,0043011990999991950051518-0011 +)

(9900430126509999919490324120111 +)

(132,004301265099999194903241800078 +)

(165,00670119909999919370515070001 +)

(1980043011990999991937051512-0002 +)

(231,0043011990999991945051518,0001 +)

(264,00430126509999919450324120002 +)

(29700430126509999919450324180078+)

In the map process, through the parsing of each line of string, the year-temperature key-value pair is obtained as output:

(1950, 0)

(1950, 22)

(1950,-11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937,-2)

(1945, 1)

(1945, 2)

(1945, 78)

In the reduce process, the output from the map process is put into the same list of value according to the same key as the input of the reduce

(1950, [0,22,-11])

(1949, [111,78])

(1937, [1,-2])

(1945, [1,2,78])

During the reduce process, select the maximum temperature in the list and take the year-maximum temperature key-value as the output:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

The logical process can be shown in the following figure:

2. Write Map-Reduce program

To write a Map-Reduce program, you generally need to implement two functions: the map function in mapper and the reduce function in reducer.

Generally follow the following format:

Map: (K1, V1)-> list (K2, V2)

Public interface Mapper extends JobConfigurable, Closeable {

Void map (K1 key, V1 value, OutputCollector output, Reporter reporter)

Throws IOException

}

Reduce: (K2, list (V))-> list (K3, V3)

Public interface Reducer extends JobConfigurable, Closeable {

Void reduce (K2 key, Iterator values

OutputCollector output, Reporter reporter)

Throws IOException

}

For the above example, the mapper implemented is as follows:

Public class MaxTemperatureMapper extends MapReduceBase implements Mapper {

@ Override

Public void map (LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {

String line = value.toString ()

String year = line.substring (15,19)

Int airTemperature

If (line.charAt (25) = ='+') {

AirTemperature = Integer.parseInt (line.substring (26,30))

} else {

AirTemperature = Integer.parseInt (line.substring (25,30))

}

Output.collect (new Text (year), new IntWritable (airTemperature))

}

}

The reducer implemented is as follows:

Public class MaxTemperatureReducer extends MapReduceBase implements Reducer {

Public void reduce (Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {

Int maxValue = Integer.MIN_VALUE

While (values.hasNext ()) {

MaxValue = Math.max (maxValue, values.next (). Get ())

}

Output.collect (key, new IntWritable (maxValue))

}

}

To run the Mapper and Reduce implemented above, you need to generate a Map-Reduce task (Job), which basically consists of the following three parts:

Input data, that is, data that needs to be processed

Map-Reduce program, that is, Mapper and Reducer implemented above

Configuration item JobConf for this task

To configure JobConf, you need to have an overview of how Hadoop runs job:

Hadoop divides Job into task for processing, there are two kinds of task:map task and reduce task

Hadoop has two types of nodes that control the operation of job: JobTracker and TaskTracker

JobTracker coordinates the operation of the entire job, assigning task to different TaskTracker

TaskTracker is responsible for running task and returning the results to JobTracker

Hadoop divides the input data into fixed-size blocks, which we call input split

Hadoop creates a task for each input split, in which the records in this split are processed in turn (record).

Hadoop will try its best to make the DataNode where the input data block is located and the DataNode executed by task (there is a TaskTracker on each DataNode) the same, which can improve the operation efficiency, so the size of input split is generally the size of the block of HDFS.

The input of Reduce task is generally the output of Map Task, and the output of Reduce Task is the output of the whole job, which is saved on HDFS.

In reduce, all records of the same key must run on the same TaskTracker, but different key can run on different TaskTracker, which we call partition.

The rule of partition is: (K2, V2)-> Integer, that is, according to K2, an id of partition is generated, and K2 with the same id enters the same partition and is processed by the same Reducer on the same TaskTracker.

Public interface Partitioner extends JobConfigurable {

Int getPartition (K2 key, V2 value, int numPartitions)

}

The following figure roughly describes the fundamentals of Map-Reduce 's Job operation:

Let's talk about JobConf, which has many items that can be configured:

SetInputFormat: sets the input format of map. Default is TextInputFormat,key: LongWritable, value is Text.

SetNumMapTasks: sets the number of map tasks. This setting usually does not work. The number of map tasks depends on the number of input split that the input data can be divided into.

SetMapperClass: sets Mapper. Default is IdentityMapper.

SetMapRunnerClass: set MapRunner. Map task is run by MapRunner and defaults to MapRunnable. Its function is to read the record of input split and call the map function of Mapper in turn.

SetMapOutputKeyClass and setMapOutputValueClass: format key-value pairs for the output of Mapper

SetOutputKeyClass and setOutputValueClass: format key-value pairs for the output of Reducer

SetPartitionerClass and setNumReduceTasks: set Partitioner. The default is HashPartitioner, which determines which partition to enter according to the hash value of key. Each partition is processed by a reduce task, so the number of partition is equal to the number of reduce task.

SetReducerClass: sets Reducer. Default is IdentityReducer.

SetOutputFormat: sets the output format of the task. The default is TextOutputFormat.

FileInputFormat.addInputPath: set the path of the input file, so that a file, a path, a wildcard. Can be called multiple times to add multiple paths

FileOutputFormat.setOutputPath: sets the path to the output file, which should not exist until job runs

Of course, not all of them are set up. From the above example, you can write a Map-Reduce program as follows:

Public class MaxTemperature {

Public static void main (String [] args) throws IOException {

If (args.length! = 2) {

System.err.println ("Usage: MaxTemperature")

System.exit (- 1)

}

JobConf conf = new JobConf (MaxTemperature.class)

Conf.setJobName ("Max temperature")

FileInputFormat.addInputPath (conf, new Path (args [0]))

FileOutputFormat.setOutputPath (conf, new Path (args [1]))

Conf.setMapperClass (MaxTemperatureMapper.class)

Conf.setReducerClass (MaxTemperatureReducer.class)

Conf.setOutputKeyClass (Text.class)

Conf.setOutputValueClass (IntWritable.class)

JobClient.runJob (conf)

}

}

3. Map-Reduce data flow (data flow)

The processing of Map-Reduce mainly involves the following four parts:

Client Client: used to submit Map-reduce task job

JobTracker: coordinates the operation of the entire job, which is a Java process whose main class is JobTracker

TaskTracker: the task running this job, processing input split, which is a Java process whose main class is TaskTracker

HDFS:hadoop distributed file system for sharing Job-related files among processes

3.1. Task submission

JobClient.runJob () creates a new JobClient instance and calls its submitJob () function.

Request a new job ID from JobTracker

Detect the output configuration for this job

Calculate the input splits for this job

Copy the resources needed for Job to run into a folder in the file system of JobTracker, including job jar files, job.xml configuration files, input splits

Notify JobTracker that this Job is ready to run

After the task is submitted, runJob polls the progress of the job every second and returns the progress to the command line until the task is finished.

3.2. Task initialization

When JobTracker receives a submitJob call, it puts the task in a queue, and the job scheduler fetches the task from the queue and initializes the task.

Initialization first creates an object to encapsulate the tasks, status, and progress that job is running.

Before creating a task, the job scheduler first obtains the input splits calculated by JobClient from the shared file system.

It creates a map task for each input split.

Each task is assigned an ID.

3.3. Task assignment

TaskTracker periodically sends heartbeat to JobTracker.

In heartbeat, TaskTracker tells JobTracker that it is ready to run a new task,JobTracker that will be assigned to a task.

Before JobTracker can select a task for TaskTracker, JobTracker must first select a Job according to priority and a task among the highest priority Job.

TaskTracker has a fixed number of locations to run map task or reduce task.

The default scheduler treats map task over reduce task

When choosing reduce task, JobTracker does not choose between multiple task, but directly takes the next one, because reduce task has no concept of data localization.

3.4. Task execution

TaskTracker is assigned a task, and this task will be run next.

First, TaskTracker copies the jar of this job from the shared file system to the file system of TaskTracker.

TaskTracker copies the files needed for job to run from distributed cache to the local disk.

Second, it creates a local working directory for each task and unzips the jar into a file directory.

Third, it creates a TaskRunner to run task.

TaskRunner creates a new JVM to run task.

The created child JVM communicates with the TaskTracker to report the progress of the run.

3.4.1, process of Map

MapRunnable reads the record from input split, and then calls the map function of Mapper in turn to output the result.

The output of map is not written directly to the hard disk, but to the cache memory buffer.

When the data in the buffer reaches a certain size, a background thread starts writing the data to the hard disk.

Before being written to the hard disk, the data in memory is divided into multiple partition via partitioner.

In the same partition, the background thread sorts the data in memory by key.

Each time you flush data from memory to your hard disk, a new spill file is generated.

Before the end of this task, all spill files are merged into one whole partition and ordered file.

Reducer can request the output file of map through http protocol, and tracker.http.threads can set the number of http service threads.

3.4.2. The process of Reduce

When the map task ends, it notifies the TaskTracker,TaskTracker to notify the JobTracker.

For a job,JobTracker, know the correspondence between TaskTracer and map output.

A thread in reducer periodically requests the location of the map output to the JobTracker until it gets all the map output.

Reduce task requires all the map output of its corresponding partition.

The copy process in reduce task begins to copy the output when each map task ends, because the completion time of different map task varies.

There are multiple copythreads in reduce task that can copy map output in parallel.

When many of the map output is copied to reduce task, a background thread merges it into a large, ordered file.

After all the map output is copied to reduce task, enter the sort process and merge all the map output into a large ordered file.

Finally, enter the reduce process, call the reduce function of reducer, process each key of the sorted output, and write the final result to HDFS.

3.5. End of mission

When JobTracker gets the last report on the success of task, change the status of job to success.

When JobClient polls from JobTracker and finds that the job has ended successfully, it prints a message to the user and returns from the runJob function.

This is how the MapReduce principle shared by the editor is analyzed. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report