Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

MapReduce of Hadoop

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1 Overview of MapReduce

MapReduce is a programming framework for distributed computing programs and the core framework for users to develop data analysis applications based on Hadoop.

The core function of MapReduce is to integrate the business logic code written by users and its own default components into a complete distributed operation program, which runs concurrently on a Hadoop cluster.

1.1 advantages and disadvantages of MapReduce

Advantages:

MapReduce is easy to program

By simply implementing some interfaces, it can complete a distributed program, which can be distributed to a large number of cheap PC machines to run, that is to say, writing a distributed program is exactly the same as writing a simple serial program. It is because of this feature that MapReduce programming becomes very popular.

Good scalability

When computing resources cannot be satisfied, its computing power can be expanded by simply adding machines.

High fault tolerance

The original intention of MapReduce design is to enable the program to be deployed on cheap PC machines, which requires it to have high fault tolerance. For example, if one of the machines is down, it can transfer the above computing task to another node to run, so that the task will not fail, and the process does not require human participation, but is completed entirely by Hadoop.

Suitable for offline processing of massive data above PB level

It can realize the concurrent work of thousands of server clusters and provide data processing capacity.

Disadvantages:

Not good at real-time computing

MapReduce cannot return results in milliseconds or seconds as MySQL does.

Not good at streaming computing

The input data of streaming computing is dynamic, while the input data set of MapReduce is static and can not be changed dynamically. This is because the design characteristics of MapReduce determine that the data source must be static.

Not good at DAG (directed graph) computing

Multiple applications have dependencies, and the input of the latter application is the output of the previous one. In this case, MapReduce is not impossible to do, but after use, the output of each MapReduce job will be written to disk, resulting in a large number of disk IO, resulting in very poor performance.

1.2 MapReduce core idea

Distributed computing programs often need to be divided into at least two stages.

The MapTask concurrent instances of the first phase run completely in parallel and are unrelated to each other.

The ReduceTask concurrent instances of the second phase are independent of each other, but their data depends on the output of all MapTask concurrent instances of the previous phase.

The MapReduce programming model can only contain one Map phase and one Reduce phase. If the user's business logic is very complex, then only multiple MapReduce programs can be run serially.

1.3 MapReduce process

A complete MapReduce program has three types of instance processes when it is distributed:

MrAppMaster is responsible for the process scheduling and state coordination of the whole program.

MapTask is responsible for the entire data processing process of the Map phase.

ReduceTask is responsible for the entire data processing process of the Reduce phase.

1.4 Common data serialization type Java type Hadoop Writable type BooleanBooleanWritableByteByteWritableIntIntWritableFloatFloatWritableLongLongWritableDoubleDoubleWritableStringTextMapMapWritableArrayArrayWritable1.5 MapReduce programming specification

The program written by the user is divided into three parts:

Mapper stage

User-defined Mapper inherits its parent class Mapper. The input data is in the form of KV pairs (the type of KV is customizable. The business logic in Mapper is written in the map () method. The output data of Mapper is in the form of KV pairs (the type of KV is customizable). The map () method (MapTask process) is called once for each.

Reduce stage

The user-defined Reducer inherits the input data type of its parent class Reducer corresponding to the output data type of Mapper, and the business logic of KVReducer is written in the reduce () method. The ReduceTask process calls the reduce () method once for each group with the same k.

Driver stage

The client, which is equivalent to the YARN cluster, is used to submit our entire program to the YARN cluster, and the job object 1.6 WordCount case operation encapsulating the relevant running parameters of the MapReduce program is submitted.

Import dependency

Junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2 Jdk.tools jdk.tools 1.8 system ${JAVA_HOME} / lib/tools.jar

Log4j.properties

Log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d% p [% c] -% m%nlog4j.appender.logfile=org.apache.log4j.FileAppenderlog4j.appender.logfile.File=target/spring.loglog4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d% p [% c] -% m% n

WcMapper

Package com.djm.mapreduce;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WcMapper extends Mapper {private Text key = new Text (); private IntWritable one = new IntWritable (1); @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString () String [] words = line.split (""); for (String word: words) {this.key.set (word); context.write (this.key, this.one);}

WcReduce

Package com.djm.mapreduce;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WcReduce extends Reducer {private IntWritable total = new IntWritable (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {int sum = 0; for (IntWritable count: values) {sum + = 1 } this.total.set (sum); context.write (key, this.total);}}

WcDriver

Package com.djm.mapreduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException Public class WcDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {/ / get the task Job job = Job.getInstance (new Configuration ()); / / set Classpath job.setJarByClass (WcDriver.class); / / set Mapper job.setMapperClass (WcMapper.class); / / set Reducer job.setReducerClass (WcReduce.class) / / set the type of Mapper's output key and value job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (IntWritable.class); / / set the type of Reducer's output key and value job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); / / set the input and output paths FileInputFormat.setInputPaths (job, new Path (args [0])) FileOutputFormat.setOutputPath (job, new Path (args [1])); boolean result = job.waitForCompletion (true); System.exit (result? 0: 1);}} 2 Hadoop serialization 2.1 Why not serialize using the Java serialization framework

Serializable is a heavyweight Java sequence framework, after an object is serialized, it will produce a lot of additional information (various parity information, Header, inheritance system, etc.), and a lot of IO will be generated, so it is not suitable for efficient transmission in the network, so Hadoop has developed a lightweight serialization framework (Writable).

Hadoop serialization features:

1. Compact: efficient use of storage space.

2. Fast: the extra overhead of reading and writing data is small

3. Scalable: it can be upgraded with the upgrade of communication protocol.

4. Interoperability: supports multi-language interaction.

2.2 Custom bean objects implement serialization interfaces

The basic serialization types often provided in the development process do not meet the requirements, so it is generally necessary to create a Bean implementation Writable interface.

The specific steps to implement bean object serialization are as follows:

1. Implement Writable interface

2. When deserialization, reflection is required to call empty parameter constructor, and null parameter construction must be provided.

3. Override the serialization method

4. The method of rewriting inverse sequence

5. The order of deserialization and serialization must be exactly the same

6. To display the results in a file, you need to override toString ()

7. If you need to transfer the custom bean in key, you also need to implement the Comparable interface, because the Shuffle procedure in the MapReduce box requires that the key must be sorted.

2.3 Serialization case practice

Count the total uplink traffic, downlink traffic and total traffic consumed by each mobile phone number

Input data format: id mobile number network ip uplink traffic downstream traffic network status code

Output data format: mobile phone number uplink traffic downlink traffic total traffic

FlowBean

Package com.djm.mapreduce.flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements Writable {private long upFlow; private long downFlow; private long sumFlow; public FlowBean () {} public void set (long upFlow, long downFlow) {this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = this.upFlow + this.downFlow } public long getUpFlow () {return upFlow;} public void setUpFlow (long upFlow) {this.upFlow = upFlow;} public long getDownFlow () {return downFlow;} public void setDownFlow (long downFlow) {this.downFlow = downFlow;} public long getSumFlow () {return sumFlow;} public void setSumFlow (long sumFlow) {this.sumFlow = sumFlow } @ Override public String toString () {return upFlow + "\ t" + downFlow + "\ t" + sumFlow;} public void write (DataOutput out) throws IOException {out.writeLong (upFlow); out.writeLong (downFlow); out.writeLong (sumFlow);} public void readFields (DataInput in) throws IOException {this.upFlow = in.readLong (); this.downFlow = in.readLong () This.sumFlow = in.readLong ();}}

FlowMapper

Package com.djm.mapreduce.flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper {private FlowBean flowBean = new FlowBean (); private Text phone = new Text (); @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString (); String [] words = line.split ("\ t") Phone.set (words [1]); long upFlow = Long.parseLong (word [words.length-3]); long downFlow = Long.parseLong (word [words.length-2]); flowBean.set (upFlow, downFlow); context.write (phone, flowBean);}}

FlowReduce

Package com.djm.mapreduce.flow;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReduce extends Reducer {private FlowBean totalFlow = new FlowBean (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean value: values) {long upFlow = value.getUpFlow () Long downFlow = value.getDownFlow (); sumUpFlow + = upFlow; sumDownFlow + = downFlow;} totalFlow.set (sumUpFlow, sumDownFlow); context.write (key, totalFlow);}}

FlowDriver

Package com.djm.mapreduce.flow;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance (new Configuration ()) Job.setJarByClass (FlowDriver.class); job.setMapperClass (FlowMapper.class); job.setReducerClass (FlowReduce.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (FlowBean.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (FlowBean.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])) Boolean result = job.waitForCompletion (true); System.exit (result? 0: 1);} 3 MapReduce Framework principle 3.1 InputFormat data input 3.1.1 slice and MapTask parallelism determination mechanism

The parallelism of the Map phase of a Job is determined by the number of slices that the client submits the Job.

Each Split slice allocates one MapTask parallel instance for processing

By default, slice size = BlockSize

Slicing does not take into account the data set as a whole, but slices each file individually.

3.1.2 FileInputFormat slicing mechanism

Slicing mechanism:

Simply slice according to the content length of the file when the slice size is equal to the Block size slice does not consider the data set as a whole, but slice each file individually.

How to calculate the slice size in the source code?

Math.max (minSize, Math.min (maxSize, blockSize)); mapreduce.input.fileinputformat.split.minsize=1 default value is 1mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue default value Long.MAXValue

How do I customize the slice size?

Maxsize (slice maximum): if the parameter is set smaller than blockSize, it will make the slice smaller and equal to the value of the configured parameter. Minsize (slice minimum): if the parameter is adjusted larger than blockSize, you can make the slice larger than blockSize. 3.1.3 CombineTextInputFormat slicing mechanism

CombineTextInputFormat is used in scenarios where there are too many small files. It can logically plan multiple small files into a single slice so that multiple small files can be handed over to one MapTask for processing.

3.1.4 other implementation classes for FileInputFormat

TextInputFormat:

TextInputForma is the default FileInputFormat implementation class, which reads each record by line, the key stores the starting byte offset of the line in the entire file, the LongWritable type, the value is the content of the line, excluding any line Terminator (newline and carriage return), Text type.

KeyValueTextInputFormat:

Each line is a record and is divided into key,value by delimiters. You can set the delimiter by setting conf.set (KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\ t") in the driver class. The default delimiter is tab.

NLineInputFormat:

If you use NlineInputFormat, the InputSplit processed on behalf of each map process is no longer divided by Block blocks, but by the number of lines specified by NlineInputFormat, that is, the total number of lines of the input file / N = the number of slices, if not divisible, the number of slices = quotient + 1.

3.1.5 Custom InputFormat

Both HDFS and MapReduce are very inefficient when dealing with small files, but they are inevitably faced with the scenario of dealing with a large number of small files, so a corresponding solution is needed. You can customize InputFormat to merge small files.

Program implementation:

WholeFileInputformat

Package com.djm.mapreduce.inputformat;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException Public class WholeFileInputformat extends FileInputFormat {@ Override protected boolean isSplitable (JobContext context, Path filename) {return false;} public RecordReader createRecordReader (InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {return new WholeRecordReader ();}

WholeRecordReader

Package com.djm.mapreduce.inputformat;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException Public class WholeRecordReader extends RecordReader {private boolean notRead = true; private Text key = new Text (); private BytesWritable value = new BytesWritable (); private FSDataInputStream fis; private FileSplit fs / * initialization method. The framework will call * @ param split * @ param context * @ throws IOException * @ throws InterruptedException * / public void initialize (InputSplit split, TaskAttemptContext context) throws IOException once at the beginning, and InterruptedException {/ / switch type to file slice fs = (FileSplit) split / / obtain the file path Path path = fs.getPath () through slicing; / / obtain the file system FileSystem fileSystem = path.getFileSystem (context.getConfiguration ()) through the path; / / Open stream fis = fileSystem.open (path) } / * read the next set of KV * @ return * @ throws IOException * @ throws InterruptedException * / public boolean nextKeyValue () throws IOException, InterruptedException {if (notRead) {/ / read K key.set (fs.getPath (). ToString ()); / / read V byte [] buf = new byte [(int) fs.getLength ()] Fis.read (buf); value.set (buf, 0, buf.length); notRead = false; return true;} else {return false }} / * get key * @ return * @ throws IOException * @ throws InterruptedException * / public Text getCurrentKey () throws IOException, InterruptedException {return this.key } / * get value * @ return * @ throws IOException * @ throws InterruptedException * / public BytesWritable getCurrentValue () throws IOException, InterruptedException {return this.value } / * * current data read progress * @ return * @ throws IOException * @ throws InterruptedException * / public float getProgress () throws IOException, InterruptedException {return notRead? 0: 1 } / * close resource * @ throws IOException * / public void close () throws IOException {if (fis! = null) {fis.close ();}

WholeFileDriver

Package com.djm.mapreduce.inputformat;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException Public class WholeFileDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance (new Configuration ()); job.setJarByClass (WholeFileDriver.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (BytesWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (BytesWritable.class); job.setInputFormatClass (WholeFileInputformat.class); job.setOutputFormatClass (SequenceFileOutputFormat.class) FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); boolean b = job.waitForCompletion (true); System.exit (b? 0: 1);} 3.2 MapReduce workflow

The above process is the most complete workflow of the entire MapReduce, but the Shuffle process only starts from step 7 to the end of step 16. The specific Shuffle process is described in detail as follows:

1) MapTask collects the KV pairs output by our map () method and puts them in the memory buffer

2) keep overflowing local disk files from memory buffers, possibly overflowing multiple files

3) multiple overflow files will be merged into large overflow files

4) in both the overflow process and the merge process, Partitioner is called to partition and sort key.

5) ReduceTask goes to each MapTask machine to get the corresponding result partition data according to its own partition number

6) ReduceTask will get the result files from different MapTask of the same partition, and ReduceTask will merge (merge and sort) these files.

7) after merging into a large file, the process of Shuffle ends, and then goes into the logical operation of ReduceTask (taking a key-value pair Group from the file and calling the user-defined reduce () method)

3.3The Shuffle mechanism

3.3.1 Partition partition

Partitions can output statistical results to different files according to conditions.

Default Partition partition:

Public class HashPartitioner extends Partitioner {public int getPartition (K key, V value, int numReduceTasks) {return (key.hashCode () & Integer.MAX_VALUE)% numReduceTasks;}}

The default partition is determined by modulating the number of ReduceTasks based on the hashCode of key.

Customize the Partition steps:

The custom class inherits Partitioner and overrides the getPartition () method public class CustomPartitioner extends Partitioner {@ Override public int getPartition (Text key, FlowBean value, int numPartitions) {/ / control partition code logic return partition;}} in the driver class, after specifying the Partitioner custom Partition, you need to set the corresponding number of ReduceTask according to the logic of the custom Partitioner

Note:

If the number of ReduceTask is greater than the number of results of getPartition, several more empty output files part-r-000xx; will be generated if 1

< ReduceTask的数量 < getPartition 的结果数,则有一部分分区数据无处安放,会 Exception;如果 ReduceTask 的数量 = 1,则不管 MapTask 端输出多少个分区文件,最终结果都交给这一个 ReduceTask,最终也就只会产生一个结果文件 part-r-00000;分区号必须从零开始,逐一累加。 需求分析: 代码实现: # ProvincePartitionerpackage com.djm.mapreduce.partitioner;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner { @Override public int getPartition(FlowBean flowBean, Text text, int numPartitions) { switch (text.toString().substring(0, 3)) { case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 4; } }}# PartitionerFlowDriverpackage com.djm.mapreduce.partitioner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class PartitionerFlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(PartitionerFlowDriver.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReduce.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setPartitionerClass(ProvincePartitioner.class); job.setNumReduceTasks(5); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}3.3.2 WritableComparable 排序 排序是 MapReduce 框架中最重要的操作之一,MapTask 和 ReduceTask 均会对数据按照 key 进行排序,该操作属于Hadoop 的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序: 对于 MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于 ReduceTask,它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中,如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件,如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上,当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。 排序分类: 需求分析: 代码实现: package com.djm.mapreduce.partitioner;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Datapublic class FlowBean implements WritableComparable { private long upFlow; private long downFlow; private long sumFlow; public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = this.upFlow + this.downFlow; } public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public int compareTo(FlowBean o) { return this.sumFlow >

O.sumFlow?-1 1;}} 3.3.3 GroupingComparator

The data of the Reduce phase is grouped according to one or more fields.

Group sorting steps:

Custom classes inherit from WritableComparator

Override the compare () method

Create a construct to pass the class of the comparison object to the parent class

Protected OrderGroupingComparator () {super (OrderBean.class, true);}

Requirements Analysis:

Code implementation:

# OrderBeanpackage com.djm.mapreduce.order;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Datapublic class OrderBean implements WritableComparable {private String orderId; private String productId; private double price; @ Override public int compareTo (OrderBean o) {int compare = this.orderId.compareTo (o.orderId); if (compare = = 0) {return Double.compare (o.price, this.price) } else {return compare;}} @ Override public void write (DataOutput out) throws IOException {out.writeUTF (orderId); out.writeUTF (productId); out.writeDouble (price);} @ Override public void readFields (DataInput in) throws IOException {this.orderId = in.readUTF (); this.productId = in.readUTF (); this.price = in.readDouble () } # OrderSortGroupingComparatorpackage com.djm.mapreduce.order;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class OrderSortGroupingComparator extends WritableComparator {public OrderSortGroupingComparator () {super (OrderBean.class, true);} @ Override public int compare (WritableComparable a, WritableComparable b) {OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getOrderId () .compareTo (ob.getOrderId ()) }} # OrderSortDriverpackage com.djm.mapreduce.order;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException Public class OrderSortDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance (new Configuration ()); job.setJarByClass (OrderSortDriver.class); job.setMapperClass (OrderSortMapper.class); job.setReducerClass (OrderSortReduce.class); job.setMapOutputKeyClass (OrderBean.class); job.setMapOutputValueClass (NullWritable.class); job.setGroupingComparatorClass (OrderSortGroupingComparator.class); job.setOutputKeyClass (OrderBean.class) Job.setOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); boolean result = job.waitForCompletion (true); System.exit (result? 0: 1);} 3.4 MapTask working mechanism

1) Read phase: MapTask parses each key/value from the input InputSplit through the RecordReader written by the user.

2) Map phase: this node mainly gives the parsed key/value to the user to write the map () function to deal with, and generates a series of new key/value.

3) Collect collection phase: when the user writes the map () function, when the data processing is completed, OutputCollector.collect () is usually called to output the result. Inside the function, it will partition the generated key/value (call Partitioner) and write it to a ring memory buffer.

4) Spill phase: that is, overflow write. When the ring buffer is full, MapReduce will write the data to the local disk and generate a temporary file. It should be noted that before writing the data to the local disk, the data should be sorted locally, and the data should be merged and compressed if necessary.

The quick sort algorithm is used to sort the data in the cache. The sorting method is that the data is sorted according to the partition number Partition, and then by key, so that after sorting, the data are gathered together in units of partition, and all the data in the same partition are sorted according to key. According to the partition number, the data in each partition is written to the temporary file output/spillN.out under the task working directory (N represents the current number of overwrites). If the user sets Combiner, the data in each partition is aggregated before writing to the file. The meta-information of the partition data is written to the memory index data structure SpillRecord, wherein the meta-information of each partition includes the offset in the temporary file, the data size before compression and the data size after compression. If the current memory index size exceeds 1MB, the memory index is written to the file output/spillN.out.index.

5) Combine phase: when all data processing is complete, MapTask merges all temporary files once to ensure that only one data file is eventually generated.

6) when all the data is processed, MapTask will merge all temporary files into one large file, save it to the file output/file.out, and generate the corresponding index file output/file.out.index.

7) in the process of file merging, MapTask merges in units of partitions. For a partition, it will merge io.sort.factor (default 10) files per round recursively, and add the resulting files back to the list to be merged. After sorting the files, repeat the above process until you finally get a large file.

8) Let each MapTask finally generate only one data file, which avoids the overhead of random reading caused by opening a large number of files and reading a large number of small files at the same time.

3.5 ReduceTask working mechanism

1) Copy phase: ReduceTask remotely copies a piece of data from each MapTask, and for a certain piece of data, if its size exceeds a certain threshold, it is written to disk, otherwise it is directly stored in memory.

2) Merge phase: while copying data remotely, ReduceTask starts two background threads to merge files on memory and disk to prevent excessive memory usage or too many files on disk.

3) Sort stage: according to MapReduce semantics, users write reduce () function input data is a group of data gathered by key, in order to gather the same data of key together, Hadoop uses a sorting-based strategy, because each MapTask has implemented a local sort of their own processing results, so ReduceTask only needs to merge and sort all the data once.

4) Reduce phase: the reduce () function writes the calculation result to HDFS.

The parallelism of ReduceTask also affects the execution concurrency and execution efficiency of the entire Job, but unlike the number of MapTask concurrency determined by the number of slices, the decision on the number of ReduceTask can be set manually:

Job.setNumReduceTasks (4)

Note:

ReduceTask=0, which means that there is no Reduce phase, and the number of output files is consistent with the number of Map. The default value of ReduceTask is 1, so the number of output files is 1. If the data is not evenly distributed, it is possible to generate data skew in the Reduce phase. The number of ReduceTask is not arbitrarily set, and the business logic requirements are also taken into account. In some cases, if you need to calculate the global summary results, you can only have 1 ReduceTask and how many ReduceTask. Depending on the performance of the cluster, if the number of partitions is not 1, but ReduceTask is 1, the partitioning process 3.6 OutputFormat data output 3.6.1 OutputFormat interface implementation class will not be performed

3.6.2 Custom OutputFormat

Requirements Analysis:

Code implementation:

# FilterOutputFormatpackage com.djm.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterOutputFormat extends FileOutputFormat {@ Override public RecordWriter getRecordWriter (TaskAttemptContext job) throws IOException, InterruptedException {return new FilterRecordWriter (job);}} # FilterRecordWriterpackage com.djm.mapreduce.outputformat Import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter {private FSDataOutputStream atguiguOut = null; private FSDataOutputStream otherOut = null Public FilterRecordWriter () {} public FilterRecordWriter (TaskAttemptContext job) {FileSystem fs; try {fs = FileSystem.get (job.getConfiguration ()); Path atguigu = new Path ("C:\\ Application\\ Apache\\ hadoop-2.7.2\\ djm.log"); Path other = new Path ("C:\ Application\\ Apache\\ hadoop-2.7.2\\ other.log") AtguiguOut = fs.create (atguigu); otherOut = fs.create (other);} catch (IOException e) {e.printStackTrace ();}} @ Override public void write (Text key, NullWritable value) throws IOException, InterruptedException {if (key.toString (). Contains ("atguigu")) {atguiguOut.write (key.toString (). GetBytes ()) } else {otherOut.write (key.toString (). GetBytes ());} @ Override public void close (TaskAttemptContext context) throws IOException, InterruptedException {IOUtils.closeStream (atguiguOut); IOUtils.closeStream (otherOut);}} # FilterDriverpackage com.djm.mapreduce.outputformat;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable Import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance (new Configuration ()); job.setJarByClass (FilterDriver.class); job.setMapperClass (FilterMapper.class) Job.setReducerClass (FilterReduce.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (NullWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (NullWritable.class); job.setOutputFormatClass (FilterOutputFormat.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); boolean result = job.waitForCompletion (true) System.exit (result? 0: 1);}} 3.7 Join3.7.1 Reduce Join

How it works:

Map end

Label key/value pairs from different tables or files to distinguish records from different sources, then use the connection field as key, the rest and the newly added flag as value, and finally output.

Reduce end

The grouping of connection fields as key on the Reduce side has been completed, and we just need to separate the records from different files in each grouping and finally merge them.

Requirements Analysis:

Code implementation:

# TableBeanpackage com.djm.mapreduce.table;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Datapublic class TableBean implements Writable {private String orderId; private String productId; private int amount; private String pname; private String flag; @ Override public void write (DataOutput out) throws IOException {out.writeUTF (orderId); out.writeUTF (productId); out.writeInt (amount) Out.writeUTF (pname); out.writeUTF (flag);} @ Override public void readFields (DataInput in) throws IOException {this.orderId = in.readUTF (); this.productId = in.readUTF (); this.amount = in.readInt (); this.pname = in.readUTF (); this.flag = in.readUTF ();}} # TableMapperpackage com.djm.mapreduce.table;import java.io.IOException Import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class TableMapper extends Mapper {String name; TableBean bean = new TableBean (); Text k = new Text (); @ Override protected void setup (Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit (); name = split.getPath () .getName () } @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString (); if (name.startsWith ("order")) {/ / order table processing String [] fields = line.split ("\ t"); bean.setOrder_id (fields [0]); bean.setP_id (fields [1]) Bean.setAmount (Integer.parseInt (fields [2])); bean.setPname ("); bean.setFlag (" order "); k.set (fields [1]);} else {String [] fields = line.split ("\ t "); bean.setP_id (fields [0]); bean.setPname (fields [1]) Bean.setFlag ("pd"); bean.setAmount (0); bean.setOrder_id (""); k.set (fields [0]);} context.write (k, bean);}} # TableReducerpackage com.djm.mapreduce.table;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text Import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class TableReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {ArrayList orderBeans = new ArrayList (); TableBean pdBean = new TableBean () For (TableBean bean: values) {if ("order" .equals (bean.getFlag () {TableBean orderBean = new TableBean (); try {BeanUtils.copyProperties (orderBean, bean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace () } orderBeans.add (orderBean);} else {try {BeanUtils.copyProperties (pdBean, bean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace () } for (TableBean bean: orderBeans) {bean.setPname (pdBean.getPname ()); context.write (bean, NullWritable.get ());}} package com.djm.mapreduce.table;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text Import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class TableDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Job job = Job.getInstance (new Configuration ()); job.setJarByClass (TableDriver.class); job.setMapperClass (TableMapper.class) Job.setReducerClass (TableReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (TableBean.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])) Job.addCacheFile (new URI ("file:///C:/Application/Apache/hadoop-2.7.2/input/pd.txt")); boolean result = job.waitForCompletion (true); System.exit (result? 0: 1);}} 3.7.2 Map Join

Map Join is suitable for scenarios where a table is very small and a table is large.

Advantages:

Cache multiple tables on the Map side and process the business logic in advance, so as to increase the business on the Map side and reduce the pressure on the data on the Reduce side, so that the data tilt can be reduced as much as possible.

Requirements Analysis:

Code implementation:

# TableMapperpackage com.djm.mapreduce.table;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.Map Public class TableMapper extends Mapper {private Text k = new Text (); private Map pdMap = new HashMap (); @ Override protected void setup (Context context) throws IOException, InterruptedException {URI [] cacheFiles = context.getCacheFiles (); String path = cacheFiles [0] .getPath (); BufferedReader reader = new BufferedReader (new InputStreamReader (new FileInputStream (path), StandardCharsets.UTF_8); String line While (StringUtils.isNotEmpty (line = reader.readLine ()) {String [] fields = line.split ("\ t"); pdMap.put (fields [0], fields [1]);} reader.close ();} @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString (). Split ("\ t") String pId = fields [1]; String pdName = pdMap.get (pId); k.set (fields [0] + "\ t" + pdName + "\ t" + fields [2]); context.write (k, NullWritable.get ());}} # TableDriverpackage com.djm.mapreduce.table;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text Import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class TableDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Job job = Job.getInstance (new Configuration ()); job.setJarByClass (TableDriver.class); job.setMapperClass (TableMapper.class) Job.setOutputKeyClass (Text.class); job.setOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); job.addCacheFile (new URI ("file:///C:/Application/Apache/hadoop-2.7.2/input/pd.txt")); job.setNumReduceTasks (0); boolean result = job.waitForCompletion (true)) System.exit (result? 0: 1);}} 3.8 ETL

Before running the core business MapReduce program, it is often necessary to clean the data and clean up the data that does not meet the requirements of users. The cleaning process often only needs to run the Mapper program, not the Reduce program.

Requirements Analysis:

The input data needs to be filtered and cleaned according to the rules in the Map phase.

Code implementation:

# LogBeanpackage com.djm.mapreduce.etl;@Datapublic class LogBean {private String remoteAddr; private String remoteUser; private String timeLocal; private String request; private String status; private String bodyBytesSent; private String httpReferer; private String httpUserAgent; private boolean valid = true;} # LogMapperpackage com.djm.mapreduce.etl;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper Import java.io.IOException;public class LogMapper extends Mapper {private Text k = new Text (); @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString (); LogBean bean = parseLog (line); if (! bean.isValid ()) {return;} k.set (bean.toString ()); context.write (k, NullWritable.get ()) } private LogBean parseLog (String line) {LogBean logBean = new LogBean (); String [] fields = line.split (""); if (fields.length > 11) {logBean.setRemoteAddr (fields [0]); logBean.setRemoteUser (fields [1]); logBean.setTimeLocal (fields [3] .substring (1)); logBean.setRequest (fields [6]) LogBean.setStatus (fields [8]); logBean.setBodyBytesSent (fields [9]); logBean.setHttpReferer (fields [10]); if (fields.length > 12) {logBean.setHttpUserAgent (fields [11] + "+ fields [12]);} else {logBean.setHttpUserAgent (fields [11]) } if (Integer.parseInt (logBean.getStatus ()) > = 400) {logBean.setValid (false);}} else {logBean.setValid (false);} return logBean;}} # LogDriverpackage com.djm.mapreduce.etl;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable Import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance (new Configuration ()); job.setJarByClass (LogDriver.class); job.setMapperClass (LogMapper.class) Job.setNumReduceTasks (0); job.setOutputKeyClass (Text.class); job.setOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); job.waitForCompletion (true);}} 3.9 MapReduce Development Summary

When writing MapReduce programs, you need to consider the following aspects:

Mapper

Users implement three of these methods according to business requirements: map () setup () cleanup ()

Partitioner partition

There is a default implementation of HashPartitioner, and the logic is to return a partition number based on the hash value of key and numReduces

Key.hashCode () & Integer.MAXVALUE% numReduces if there are special business needs, you can customize the partition

Comparable

When we use custom objects as key for output, we must implement the WritableComparable interface and override the partial sorting of the compareTo () method: internal sorting for each file of the final output: sorting all data, usually only one Reduce secondary sort: there are two conditions for sorting

Combiner

Combiner merging can improve program execution efficiency and reduce IO transmission, but it must not affect the original business processing results.

GroupingComparator

Grouping key on the Reduce side

Reducer

Users implement three of these methods according to business requirements: reduce () setup () cleanup ()

OutputFormat

The default implementation class is TextOutputFormat, and the functional logic is to output each KV pair to the target text file with a line of SequenceFileOutputFormat output as the input of subsequent MapReduce tasks, which is a good output format, because it is compact and easy to compress. Users can also customize OutputFormat.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report