In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Next, through a practical case, we introduce partition, sort and combiner in MR programming.
Traffic Statistics Project case
Data samples:
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4
2052.flash3-http.qq.com Integrated Portal 15 12 1938 2910
Field description:
Demand:
1. Count the total uplink traffic, total downlink traffic and total traffic consumed by each user (mobile number)
2. On the basis of the above results, add another demand: sort the statistical results in reverse order according to the total flow.
3. Output the statistical results of traffic summary to different files according to different provinces where the mobile phone belongs.
Demand 1: calculate the total uplink traffic, total downlink traffic and total traffic consumed by each user (mobile phone number)
Through the demand analysis of , we can know that we can use combiner as an optimization component here. Its function is to locally summarize the results of maptask after maptask, so as to reduce the computing load of reducetask and reduce network transmission.
usage: Combiner is the same as Reducer, write a class, then inherit the specific Combiner logic written in the Reducer,reduce method, and then set the Combiner component in job: job.setCombinerClass (FlowSumCombine.class)
Code implementation:
Public class FlowSum {/ / job public static void main (String [] args) {Configuration conf = new Configuration (true); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource ("core-site.xml"); conf.addResource ("hdfs-site.xml"); System.setProperty ("HADOOP_USER_NAME", "hadoop") Try {Job job = Job.getInstance (conf); job.setJobName ("FlowSum"); / / set task class job.setJarByClass (FlowSum.class); / / set Mapper Reducer Combine job.setMapperClass (MyMapper.class); job.setReducerClass (MyReducer.class); job.setCombinerClass (FlowSumCombine.class) / / set the input and output types of map and reduce job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class); job.setOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class); / / specify the input and output paths of the mapreduce program data Path input=new Path ("/ data/input") Path output = new Path ("/ data/output"); / / make sure that if (output.getFileSystem (conf) .delete (output)) {output.getFileSystem (conf) .delete (output,true) does not exist in output; / / Recursive delete} FileInputFormat.addInputPath (job,input); FileOutputFormat.setOutputPath (job,output) / / finally submit the task boolean success = job.waitForCompletion (true); System.exit (success?0:-1);} catch (Exception e) {e.printStackTrace () }} / / Mapper private class MyMapper extends Mapper {/ / Statistics the total uplink and downlink traffic consumed by each user (mobile phone number) Total traffic / / 1363157984040 / / 13602846565 / / 5C-0E-8B-8B-B6-00:CMCC / / 120.197.40.4 / / 2052.flash3-http.qq.com / / Integrated Portal / / 15 / / 12 / / 1938 upstream traffic / / 2910 downlink traffic / / 200Text mk=new Text () Text mv=new Text (); @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString () .split ("\\ s +"); String phone = fields [0]; String upFlow = fields [8]; String downFlow = fields [9]; mk.set (phone) Mv.set (upFlow+ "_" + downFlow); context.write (mk,mv);}} / / Combiner private class FlowSumCombine extends Reducer {Text rv=new Text (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {int upFlowSum=0; int downFlowSum=0; int upFlow = 0 Int downFlow= 0; for (Text value:values) {String fields [] = value.toString (). Split ("_"); upFlow=Integer.parseInt (fields [0]); downFlow=Integer.parseInt (fields [1]); upFlowSum+=upFlow; downFlowSum+=downFlow;} rv.set (upFlowSum+ "_" + downFlowSum) Context.write (key,rv);}} / / Reducer private class MyReducer extends Reducer {Text rv=new Text (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {int upFlowSum=0; int downFlowSum=0; int upFlow = 0; int downFlow = 0 For (Text value:values) {String fields [] = value.toString (). Split ("_"); upFlow=Integer.parseInt (fields [0]); downFlow=Integer.parseInt (fields [1]); upFlowSum+=upFlow; downFlowSum+=downFlow;} rv.set (upFlowSum+ "\ t" + downFlowSum) Context.write (key,rv);}
Considerations for using Combiner in
The output kv type of -Combiner should correspond to the input kv type of Reducer
The input kv type of -Combiner should correspond to the output kv type of Mapper
The use of -Combiner should be very cautious, because Combiner may or may not be called during MapReduce, and may be called once or multiple times.
The principle used by -Combiner is that having or not does not affect business logic, nor does it affect the final result.
Demand 2: add another demand on the basis of the above results: sort the statistical results in reverse order according to the total flow.
analysis: if flashback sorting is implemented after the uplink and downlink traffic is obtained, in java, if you want objects to be sorted by custom rules, you need to customize the object and implement its comparator. MR can also, in the MR run, if there is a Reducer phase, then it must be sorted, according to the object's comparator, sort, the same sorting results of the key into a reduceTask.
implementation: here we can use the hadoop custom WritableComparable to customize the object and implement its comparator.
Code implementation (note: here is to deal with the results after the implementation of the requirements):
Custom object to implement comparator:
The public class FlowBean implements WritableComparable {private String phone; private long upFlow; private long downFlow; private long sumFlow; / / serialization framework calls public FlowBean () {} public void set (String phone, long upFlow, long downFlow) {this.phone=phone; this.upFlow=upFlow; this.downFlow=downFlow; this.sumFlow=upFlow+downFlow when the deserialization operation creates an object instance. } public String getPhone () {return phone;} public void setPhone (String phone) {this.phone = phone;} public long getUpFlow () {return upFlow;} public void setUpFlow (long upFlow) {this.upFlow = upFlow;} public long getDownFlow () {return downFlow;} public void setDownFlow (long downFlow) {this.downFlow = downFlow } public long getSumFlow () {return sumFlow;} public void setSumFlow (long sumFlow) {this.sumFlow = sumFlow;} / / Serialization method @ Override public void write (DataOutput out) throws IOException {out.writeUTF (this.phone); out.writeLong (this.upFlow); out.writeLong (this.downFlow); out.writeLong (this.sumFlow) } / * deserialization method. Note here that what is written in the write above is read in what order to ensure that the field data types correspond to each other * / @ Override public void readFields (DataInput in) throws IOException {this.phone=in.readUTF (); this.upFlow=in.readLong (); this.downFlow=in.readLong (); this.sumFlow=in.readLong () } / / this is the comparison method to be implemented: / / 0 means equal, and 1 means greater than a negative number means less than @ Override public int compareTo (FlowBean o) {/ / flashback output, that is, the property of the parameter-the attribute in the class return (int) (o.sumFlow-this.sumFlow);}}
MR program:
Public class FlowSumSort {/ / job public static void main (String [] args) {Configuration conf=new Configuration (true); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource ("core-site.xml"); conf.addResource ("hdfs-site.xml") System.setProperty ("HADOOP_USER_NAME", "hadoop"); try {Job job= Job.getInstance (conf); job.setJarByClass (FlowSumSort.class); job.setJobName ("FlowSumSort"); job.setMapperClass (Mapper.class); job.setReducerClass (Reducer.class); job.setOutputKeyClass (FlowBean.class) Job.setOutputValueClass (NullWritable.class); / / specify the input and output paths of the mapreduce program data Path input=new Path ("/ / data/output"); Path output = new Path ("/ data/output1") / / make sure that if (output.getFileSystem (conf) .delete (output)) {output.getFileSystem (conf) .delete (output,true) does not exist in output; / / Recursive delete} FileInputFormat.addInputPath (job,input); FileOutputFormat.setOutputPath (job,output); boolean success=job.waitForCompletion (true) System.exit (success?0:1);} catch (Exception e) {e.printStackTrace ();}} / / Mapper private class MyMapper extends Mapper {FlowBean bean=new FlowBean (); NullWritable mv=NullWritable.get () @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString () .split ("\\ s +"); String phone=fields [0]; long upflow= Long.parseLong (fields [1]); long downflow= Long.parseLong (fields [2]); bean.set (phone,upflow,downflow) Context.write (bean,mv);}} / Reducer private class MyReducer extends Reducer {@ Override protected void reduce (FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {for (NullWritable value:values) {context.write (key,value);}
Note: although only an output is made in the reduce phase here, reduce cannot be used without any business operations, because the sorting of MR will only be done when the data is output to the reduceTask side after the MapTask has been run. If there is no Reduce phase, there will be no sorting.
Requirement 3: output the results of traffic statistics to different files according to the place of ownership, so that the statistical results can be located at the provincial level when querying the statistical results.
Analysis: by default, when running the MR program, only one ReducerTask is run, and a ReducerTask of the default amount will have an output file, so as long as you customize the partition rules and set the number of ReducerTask, you can complete the above requirements. The default MR partition rule is the number of hashcode% partitions of key.
Code implementation:
Custom zones:
/ / the two generics here are key and value output types private static class MyPartitioner extends Partitioner {/ / custom partition rules private static HashMap provincMap = new HashMap (); static {provincMap.put ("138", 0); provincMap.put (" 139", 1); provincMap.put ("136", 2) ProvincMap.put (137,3); provincMap.put (135,4);} @ Override public int getPartition (Text text, FlowBean flowBean, int numPartitions) {/ / million Note: the return value must not be greater than numPartitions, otherwise the error Integer code = provincMap.get (text.toString (). Substring (0,3)) will be reported. If {return code;} return 5;}}
Job:
/ / specify partition rules, and the number of partitions job.setPartitionerClass (MyPartitioner.class); job.setNumReduceTasks (5)
Note:
When using a partition, it must be noted that the return value of the partition must not be greater than the set number of reduceTask. Although setting multiple ReduceTask can increase parallelism, it does not need to be set too much. If there is no data in a certain ReduceTask, then the ReduceTask is empty. Waste resources as far as possible when setting the partition is a continuous integer starting from 0.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.