Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Serialization principle of MapReduce Program and Writable case

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

The Serialization principle of MapReduce Program and the preface of Writable case

When writing MapReduce programs, we will find that for the input and output data (key-value) of MapReduce, we can only use the data types provided by Hadoop, but not the basic data types of Java itself. For example, if the data type is long, then when writing MR programs, the data type of the corresponding Hadoop is LongWritable. A brief explanation of the reasons is as follows:

Hadoop's internal communication between nodes uses the RPC,RPC protocol to translate messages into binary byte streams and send them to remote nodes.

The remote node then deserializes the binary flow into the original information. In other words, the content of the message passed needs to go through hadoop-specific serialization and deserialization operations, so you need to use the data types provided by hadoop. Of course, if you want to customize the data types of key-value in MR programs, you need to implement corresponding interfaces, such as Writable and WritableComparable interfaces.

That is, if you need to customize the data type of key-value, you must follow the following principles:

/ * any key and value of MapReduce must implement the Writable interface * any key of MapReduce must implement the WritableComparable interface, WritableComparable is an enhanced version of Writable * the reason why key also needs to implement Comparable is that sorting key is a basic function of the MapReduce model * /

In fact, many of the previous Netty articles, when writing codec technology, need to achieve the same function as Hadoop, because in the end, I also want to write a RPC framework (imitating Ali's dubbo).

Writable interface

As for the Writable interface, the explanation in the source code is very good:

/ * A serializable object which implements a simple, efficient, serialization * protocol, based on {@ link DataInput} and {@ link DataOutput}. * *

Any key or value type in the Hadoop Map-Reduce * framework implements this interface.

* *

Implementations typically implement a static read (DataInput) * method which constructs a new instance, calls {@ link # readFields (DataInput)} * and returns the instance.

* *

Example:

*

* public class MyWritable implements Writable {* / / Some data * private int counter; * private long timest * * public void write (DataOutput out) throws IOException {* out.writeInt (counter); * out.writeLong (timestamp); *} * public void readFields (DataInput in) throws IOException {* counter = in.readInt () * timestamp = in.readLong (); * * public static MyWritable read (DataInput in) throws IOException {* MyWritable w = new MyWritable (); * w.readFields (in); * return w; *} *} *

* / WritableComparable API

Give the explanation in the official source code directly:

/ * A {@ link Writable} which is also {@ link Comparable}. * *

WritableComparables can be compared to each other, typically * via Comparators. Any type which is to be used as a * key in the Hadoop Map-Reduce framework should implement this * interface.

* *

Note that hashCode () is frequently used in Hadoop to partition * keys. It's important that your implementation of hashCode () returns the same * result across different instances of the JVM. Note also that the default * hashCode () implementation in Object does not * satisfy this property.

* *

Example:

*

* public class MyWritableComparable implements WritableComparable {* / / Some data * private int counter; * private long timest * * public void write (DataOutput out) throws IOException {* out.writeInt (counter); * out.writeLong (timestamp); *} * public void readFields (DataInput in) throws IOException {* counter = in.readInt () * timestamp = in.readLong (); * public int compareTo (MyWritableComparable o) {* int thisValue = this.value; * int thatValue = o.value; * return (thisValue)

< thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); * } * * public int hashCode() { * final int prime = 31; * int result = 1; * result = prime * result + counter; * result = prime * result + (int) (timestamp ^ (timestamp >

> > 32)); * return result *} * *

* / Writable API case

The following figure shows the table structure of telecom log records. Now you need to count the sum of upPackNum, downPackNum, upPayLoad and downPayLoad of each mobile phone number.

Requirements: use custom Writable to complete.

Data preparation

The text data provided is as follows:

136315798506meme 13726230503memoir 0726230503memoir 120.196.100.82 memoir i02.c.aliimg.compjimg.comvery2481meme 2001363157995052 13826544101pence0Ehr8BUEHYF1MICRE0E70.40.40.40.4pence40.699799106613631579910766139264356jet 20-10-7A145356j 20,196.100.9999 lead lead author 120.196.100.99 lead lead author 120.196.100.99 lead lead author 120.196.100.9999 lead lead author 4132131200022926251065C color E8BF1FE8BB1-5CZV 71-AC-CD-E6 120.197.40.4Graver, 24041061g 71-AC-CD-E6 under the force of the force of force 13014649 120.196.100.99 recoveryiface.qiyi.com, video website, 151363157995074Power8413138413 5CMai0EFly8CFFFFFLYE8CFFFLYE8-20PUBFFFLY 7DaysInn 120.197.40.4122.72.52.12 phainn 120.197.40.4122.72.52.12 phaier 20pha1pl 1432pr 200136315799305MIT 13560439658FE4-17color FEMALY DEPREFIND9here CMCCJue 120.196.9999106.100.99997503MAE 15933257550310159332575C50310159332575C50310159332575C5031015332575C5031015332575C5031015332575C5031015332579950310159332575C031015332579950310159332579950310159332575C03109201332579950310159332579950310159332575C03109201332579950310159332575031015332575031015332575031015933257995033101593325799503101593325799503101333257995033101333257995033101333257995031015933257995033101533257995031015933257995031015933257995031015933257995031015933257995031015933257995031015933257995031015933257995031015933257995031015933257995031015933257995031015933257 120.196.100.82 memoir, 4memo, 240meme, 2001363157984041, 13660577991, 5Cfu0ELY, 8B, 92-5C, 20HD CMCCLys, EASYMES, 120.197.40.40.4, 220.197.40.4, 19.cnzz.com, site statistics, 24,999,690,2001315797308, 1501368585, 1501368585, 5Cmur0E8BC7, F7-90CMCCjudic120.197.40.4rank.ie.sogou.com, search engine, 28je, 27,365, 35398, 338, 136386029, 15989002119, E8-99-C4-449, E099, E0CMCCY120.196.100.99umwww.eng.com, statistics, 33899720913966053958. C4-17When FEMAUFEFEFEFEFEFEFEFEMAFEFEMAFEFEFEFEFEWEFEFEFEFEMAFEWEFEFEFEMAE-FCFEFEMAE-FCFFEMAE-- FCFUFEN 12recovery1363157982040, 2001363157982040, 13502468823, search engine, 2118recover.953. 953, 100.99, 183382, 884-25-DB-4F-10, 120.196.100.99, input.input.sogou.com, search engine, 2118je, 953953, 11212, 1363157990041, 1392505741300-1F-64-E1-E6-9Anet, CMCC, 120.196.100.553.tbaidu.com, search engine, 666pr, 66e, 311051, 48824, 137792, 13770,87ji.sogou.com, search engine, 2118je, 953, 953, 953799004, 131212, 136315757990043, 1392505741300-CMCC, 120.196.555, 1053.tbaidu.com, search engine 66, 66pr, 311010848824324615797108071070FD0777fccccccccccmccccccccccccmcccccccccccccccccccccccnical force. 120.196.100.55 the HttpDataWritable of Writable interface is realized by HttpDataWritable of Writable interface.

Let's write a HttpDataWritable class based on the Writable interface. The code is as follows:

Any key and value of package com.uplooking.bigdata.mr.http;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * MapReduce must implement the Writable interface * any key of MapReduce must implement the WritableComparable interface. WritableComparable is an enhanced version of Writable * / public class HttpDataWritable implements Writable {private long upPackNum; private long downPackNum; private long upPayLoad; private long downPayLoad Public HttpDataWritable () {} public HttpDataWritable (long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) {this.upPackNum = upPackNum; this.downPackNum = downPackNum; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad;} public void write (DataOutput out) throws IOException {out.writeLong (upPackNum); out.writeLong (downPackNum); out.writeLong (upPayLoad); out.writeLong (downPayLoad) } public void readFields (DataInput in) throws IOException {this.upPackNum = in.readLong (); this.downPackNum = in.readLong (); this.upPayLoad = in.readLong (); this.downPayLoad = in.readLong ();} public long getUpPackNum () {return upPackNum;} public void setUpPackNum (long upPackNum) {this.upPackNum = upPackNum } public long getDownPackNum () {return downPackNum;} public void setDownPackNum (long downPackNum) {this.downPackNum = downPackNum;} public long getUpPayLoad () {return upPayLoad;} public void setUpPayLoad (long upPayLoad) {this.upPayLoad = upPayLoad;} public long getDownPayLoad () {return downPayLoad;} public void setDownPayLoad (long downPayLoad) {this.downPayLoad = downPayLoad @ Override public String toString () {return upPackNum + "\ t" + downPackNum + "\ t" + upPayLoad + "\ t" + downPayLoad;}} MapReduce program

The program code is as follows:

Package com.uplooking.bigdata.mr.http;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat Import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class HttpDataJob {public static void main (String [] args) throws Exception {if (args = = null | | args.length < 2) {System.err.println ("Parameter Errors! Usages: "); System.exit (- 1);} Path inputPath = new Path (args [0]); Path outputPath = new Path (args [1]); Configuration conf = new Configuration (); String jobName = HttpDataJob.class.getSimpleName (); Job job = Job.getInstance (conf, jobName); / / set jar job.setJarByClass (HttpDataJob.class) under which job runs / / set the input FileInputFormat.setInputPaths of the whole program (job, inputPath); job.setInputFormatClass (TextInputFormat.class); / / set the parsing class of how to parse the input file into one line of content / / set mapper job.setMapperClass (HttpDataMapper.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (HttpDataWritable.class) / / set the output of the entire program / / outputpath.getFileSystem (conf) .delete (outputpath, true); / / if the current output directory exists, delete it to avoid .FileAlreadyExistsException FileOutputFormat.setOutputPath (job, outputPath); job.setOutputFormatClass (TextOutputFormat.class); / / set reducer job.setReducerClass (HttpDataReducer.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (HttpDataWritable.class) / / specify several reducer to run job.setNumReduceTasks (1); / / submitter job.waitForCompletion (true);} public static class HttpDataMapper extends Mapper {@ Override protected void map (LongWritable K1, Text v1, Context context) throws IOException, InterruptedException {String line = v1.toString (); String [] items = line.split (",") / / get the mobile phone number String phoneNum = items [1]; / / get upPackNum, downPackNum, upPayLoad, downPayLoad long upPackNum = Long.parseLong (items [6]); long downPackNum = Long.parseLong (items [7]); long upPayLoad = Long.parseLong (items [8]); long downPayLoad = Long.parseLong (items [9]) / / build HttpDataWritable object HttpDataWritable httpData = new HttpDataWritable (upPackNum, downPackNum, upPayLoad, downPayLoad); / / write data to context context.write (new Text (phoneNum), httpData);}} public static class HttpDataReducer extends Reducer {@ Override protected void reduce (Text K2, Iterable v2s, Context context) throws IOException, InterruptedException {long upPackNum = 0L Long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; / / traverses V2s to calculate the sum of the parameters for (HttpDataWritable htd: V2s) {upPackNum + = htd.getUpPackNum (); downPackNum + = htd.getDownPackNum (); upPayLoad + = htd.getUpPayLoad () DownPayLoad + = htd.getDownPayLoad ();} / build HttpDataWritable object HttpDataWritable httpData = new HttpDataWritable (upPackNum, downPackNum, upPayLoad, downPayLoad); / / write data to context context.write (K2, httpData);} Test

Note that the above program needs to read the parameters entered from the command line and can be executed in the local environment, or it can be packaged as a jar package and uploaded to the Linux server in the Hadoop environment. Here, I am using the local environment (my operating system is Mac OS). The input parameters are as follows:

/ Users/yeyonghao/data/input/HTTP_20160415143750.dat / Users/yeyonghao/data/output/mr/http/h-1

After running the program, view the output as follows:

Yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/http/h-1$ cat part-r-0000013480253104 33 180 18013502468823 57 102 7335 11034913560439658 33 2434 589213600217502 18 138 1080 18685213602846565 1538 1938 291013660577991 24 6960 69013719199419 40 01372623010 3 24 27 2481 24681137607710 10 22 120 12013823070001 63 360 18013214466 12123008 372013957413 69 63 11058 4824313926251064024013926435656 24 132 15125013685858 27 3659 1533320 2915369002119 3 1938 18018211575961 15 12 1527 210618320173382 21 18 9531 241284138413 20 16 4116 1432

It shows that there is nothing wrong with our MapReduce program, and the HttpDataWritable class written can be used normally.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report