Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to write different MapReudce programs

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to write different MapReudce programs". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to write different MapReudce programs".

1. Serialization mechanism of slave Hadoop

Serialization is to convert the state information of objects in memory into a sequence of bytes for storage (persistence) and network transmission. Deserialization is to convert the received byte sequence or persistent data of the hard disk into objects in memory.

In fact, in the Java specification, there is already a set of serialization mechanism. Some object-oriented class can implement serialization and deserialization by implementing the Serializable interface, but be sure to add the serialization version ID serialVersionUID. But why did Hadoop develop its own serialization mechanism? What are the characteristics and differences between it and the original ecology?

When JDK serializes, the algorithm takes these things into account:

Outputs the class metadata related to the object instance.

Recursively outputs the superclass description of the class until there is no superclass.

When the class metadata is finished, it begins to output the actual data values of the object instance from the top-level superclass.

Recursively output the data of the instance from top to bottom

Pros: from the above, java serialization is really powerful, and the information after serialization is very detailed, so deserialization becomes very simple.

So as long as we have the implements Serializable interface, JDK will automatically handle everything, and the serialization mechanism of Java is quite complex and can handle all kinds of object relationships.

Disadvantages: the serialization mechanism of Java requires a large amount of computation, and the size of the serialization result is too large, sometimes several times the size of the object. The referencing mechanism can also cause large files to be inseparable.

These disadvantages are very fatal for Hadoop, because if communication or RPC calls are needed between Hadoop clusters, serialization is required, and serialization is fast, small in size, and takes up less bandwidth. So Hadoop played a game on his own.

The characteristics of Hadoop serialization are:

1. Compact: since bandwidth is the most valuable resource for information transmission in a cluster, we must find ways to reduce the size of the message, and hadoop serialization is designed to better sit at this point.

2. Object reusability: the deserialization of JDK will create objects constantly, which will certainly cause some overhead, but in the deserialization of hadoop, the readField method of one object can be repeatedly used to regenerate different objects.

3. Extensibility: there are many options for serialization of Hadoop

a. You can take advantage of the Writable interface in the implementation hadoop framework. (native)

b. Use frameworks such as the open source serialization framework protocol Buffers,Avro.

PS (network source): hadoop2.X is followed by the implementation of a called YARN, all applications (such as mapreduce, or other spark real-time or offline computing frameworks can run on YARN), YARN is also responsible for resource scheduling and so on. The serialization of YARN is the serialization framework protocol Buffers developed in Google. Currently, it supports three languages, Cobalt, Java, and Python. So at the RPC level, we can use other languages to meet the needs of other language developers.

The next word is how to use the serialization mechanism, which is described below by Writable.

2.Writable interface and other implementation classes

Hadoop native serialization, hadoop native serialization classes need to implement an interface called Writeable, similar to the Serializable interface.

Hadoop also provides us with several serialization classes, all of which implement the Writable interface directly or indirectly. Such as: IntWritable,LongWritable,Text,org.apache.hadoop.io.WritableComparable and so on.

To implement the Writable interface, you must implement two methods:

Public void write (DataOutput out) throws IOException; public void readFields (DataInput in) throws IOException

To achieve the WritableComparable interface must implement three methods, look at the source code of the interface, have been given demo. Because of the space, go and see for yourself.

Case 1: the data is shown in the following figure, counting the upload and download traffic and total traffic with the same phone number. Phone number, upload traffic, download traffic, total traffic.

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 40 264 02001363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 24 132 1512 2001363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 40 240 0 2001363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com video website 15 12 1527 2106 2001363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 2001363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116954 2001363157995033 15920133257 5C-0E-8B-C7 -BA-20:CMCC 120.197.40.4 sug.so.360.cn Information Security 20 20 3156 2936 2001363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 40 240 02001363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40. 4 s19.cnzz.com site Statistics 24 9 6960 690 2001363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com search engine 28 27 3659 3538 2001363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com site Statistics 3 3 1938 180 2001363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 918 4938 2001363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 180 2001363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash3-http.qq.com Integrated Portal 15 12 1938 2910 2001363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 123008 3720 2001363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196. 100.99 y0.ifengimg.com Integrated Portal 57 102 7335 110349 2001363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com search engine 21 18 9531 2412 2001363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com search engine 69 63 11058 48243 2001363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 120 2001363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 2481 24681 2001363157993055 13560436666 C4-17 FEBA color de- D9:CMCC 120.196.100.99 18 15 1116 954 200

Define serializable JavaBean.com.codewatching.fluxcount.bean.FlowBean

Package com.codewatching.fluxcount.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {private String phoneNum; private long upFlow; private long downFlow; private long sumFlow; public FlowBean () {} public FlowBean (String phoneNum, long upFlow, long downFlow) {super () This.phoneNum = phoneNum; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow;} public String getPhoneNum () {return phoneNum;} public void setPhoneNum (String phoneNum) {this.phoneNum = phoneNum } public long getUpFlow () {return upFlow;} public void setUpFlow (long upFlow) {this.upFlow = upFlow;} public long getDownFlow () {return downFlow;} public void setDownFlow (long downFlow) {this.downFlow = downFlow } public long getSumFlow () {return sumFlow;} public void setSumFlow (long sumFlow) {this.sumFlow = sumFlow;} @ Override public void write (DataOutput out) throws IOException {out.writeUTF (phoneNum); out.writeLong (downFlow); out.writeLong (upFlow) Out.writeLong (sumFlow);} @ Override public void readFields (DataInput in) throws IOException {phoneNum = in.readUTF (); downFlow = in.readLong (); upFlow = in.readLong (); sumFlow = in.readLong () } @ Override public String toString () {return upFlow+ "\ t" + downFlow+ "\ t" + sumFlow;}}

two。 Write Mapper,Reducer,Runner.

Package com.codewatching.fluxcount.hadoop;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import com.codewatching.fluxcount.bean.FlowBean;public class FlowSumMapper extends Mapper {@ Override protected void map (LongWritable key, Text value,Context context) throws IOException, InterruptedException {String line = value.toString () String [] fileds = line.split ("\ t"); int length = fileds.length; String phoneNum = fileds [1]; long upFlow = Long.parseLong (fileds [length-3]); long downFlow = Long.parseLong (fileds [length-2]); FlowBean flowBean = new FlowBean (phoneNum, upFlow, downFlow) / / use flowBean as value for reducer to process context.write (new Text (phoneNum), flowBean);}} package com.codewatching.fluxcount.hadoop;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import com.codewatching.fluxcount.bean.FlowBean Public class FlowSumReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values,Context context) throws IOException, InterruptedException {long _ downFlow = 0; long _ upFlow = 0; for (FlowBean flowBean: values) {_ downFlow + = flowBean.getDownFlow (); _ upFlow + = flowBean.getUpFlow () } FlowBean bean = new FlowBean (key.toString (), _ upFlow, _ downFlow); context.write (key, bean);}} package com.codewatching.fluxcount.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text Import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.codewatching.fluxcount.bean.FlowBean;public class FlowSumRunner extends Configured implements Tool {@ Override public int run (String [] args) throws Exception {Configuration configuration = new Configuration () Job job = Job.getInstance (configuration); configuration.set ("mapreduce.job.jar", "fluxcount.jar"); job.setJarByClass (FlowSumRunner.class); job.setMapperClass (FlowSumMapper.class); job.setReducerClass (FlowSumReducer.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (FlowBean.class) FileInputFormat.setInputPaths (job, new Path (args [0])); FileSystem fileSystem = FileSystem.get (configuration); Path path = new Path (args [1]); if (fileSystem.exists (path)) {fileSystem.delete (path, true);} FileOutputFormat.setOutputPath (job, path) Return job.waitForCompletion (true)? 0args 1;} public static void main (String [] args) throws Exception {ToolRunner.run (new Configuration (), new FlowSumRunner (), args);}} programming of 3.Partitioner class

Hadoop's map/reduce supports partitioning key, so that the data from map is evenly distributed on reduce. Map results will be distributed to Reducer through partition. After Reducer completes the Reduce operation, it will output the results through OutputFormat. Mapper results may be sent to Combiner (discussed below) for merging. The key-value pairs finally processed by Mapper need to be sent to Reducer for merging. When merging, keys / value pairs with the same key will be sent to the same Reducer. The process of assigning which key to which Reducer is specified by Partitioner. What a troublesome thing to say. If we look at the source code of the Partitioner class, we know that it is an abstract class with an abstract method:

/ * Get the partition number for a given key (hence record) given the total * number of partitions i.e. Number of reduce-tasks for the job. * *

Typically a hash function on an all or a subset of the key.

* * @ param key the key to be partioned. * @ param value the entry value. * @ param numPartitions the total number of partitions. * @ return the partition number for the key. * / public abstract int getPartition (KEY key, VALUE value, int numPartitions)

And the comments in the class are also very comprehensive, do not complain. If foreign languages were better, it would be much easier to learn. Alas, it is always difficult.

Partitionercontrols the partitioning of the keys of the intermediate map-outputs. Omit; omit.

Case 2: on the basis of case 1, then partition the number, assuming that 135 is Beijing and 139 is Jiangxi. Count out the statistics of each region, and each region stores documents separately. The effect picture is as follows:

Based on the case, write a Partitionner implementation class.

Package com.codewatching.fluxcount.hadoop;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import com.codewatching.fluxcount.bean.FlowBean;public class AreaPartitioner extends Partitioner {private static Map areaMap; static {areaMap = new HashMap (); areaMap.put ("135s", 0); / / simulated partition, stored in memory. AreaMap.put ("137", 1); areaMap.put (" 138", 2); areaMap.put ("139", 3);} @ Override public int getPartition (Text key, FlowBean value, int numPartitions) {int area = 4; / / the default is 4 String prefix = key.toString (). Substring (0Pol 3) / / determine whether it is in a partition Integer index = areaMap.get (prefix); if (indexable empty null) {area = index; / / if it exists, take the corresponding number 0Jing 1JI 2je 3} return area;}}

two。 Add two lines of code to Runner.

3. The result of running in Hadoop.

In fact, Hadoop has provided a default implementation class called HashPartitioner. See how it is partitioned by key.

Key is evenly distributed on ReduceTasks. For example, if Key is Text, the hashcode method of Text is basically the same as that of String, which is calculated by using the Horner formula. If the int,string is too large, the int value may overflow into a negative number, so it is equal to the Integer.MAX_VALUE (that is, 01111111111111), and then take the remainder on the number of reduce, so that key can be evenly distributed on the reduce.

PS: the result of this simple algorithm may not be uniform, because key is not so linearly continuous after all.

Output processing class and input processing class of 4.MapReduce

Input processing class: the role of InputFormat is responsible for the input part of MR

1. Verify whether the input of the job is standardized.

2. Cut the input file into InputSplit.

3. Provide the implementation class of RecordReader, read the InputSplit into Mapper for processing.

The optimal shard size should be the same as the block size: because it is the size of the largest input block that ensures that it can be stored on a single node. If the sharding spans two data blocks, it is almost impossible for any HDFS node to store two data blocks at the same time, so part of the data in the shard needs to be transmitted to the Map task node through the network, which is obviously less efficient than running the entire Map task with local data.

PS: you can also write custom input handling classes, inherit InputFormat, and rewrite the corresponding methods. Of course, you first need to know the role of the methods.

Output processing class: OutputFormat, after Ruduce processing.

When programming, the output input processing class specifies where to use it:

Thank you for your reading, the above is the content of "how to write different MapReudce programs". After the study of this article, I believe you have a deeper understanding of how to write different MapReudce programs, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report