In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
1. Demand
Statistics of the total flow of each mobile phone number (uplink traffic + downlink traffic), uplink traffic, downlink traffic, and finally sort the mobile phone number according to the total traffic. ****
2. Data input and output format source data is sensitive, so it will not be shown here.
The input format is:
Timestamp, phone number, physical address of the base station, ip of the visiting URL, domain name of the website, packets, number of packets received, uplink / traffic, downlink / carrier traffic, response code delimiter is "\ t"
The output format is:
Mobile phone number uplink traffic downlink traffic total traffic and sort according to the size of total traffic 3. Train of thought analysis
Map phase:
Split the field. Take the mobile phone number as key,value as a bean object, and value stores the upstream / downstream traffic and total traffic of the corresponding mobile number. Key stores the mobile number, that is, a similar structure:
Reduce phase:
Accumulate the upstream and downlink traffic of the same key (that is, the same mobile number) to obtain the total upstream and downstream traffic and total traffic.
And finally, the total traffic needs to be sorted, so the output key of reduce is empty for the whole bean,value.
4. Specific procedures
FlowBean.java
/ * Custom serializable class for saving traffic data * / package PhoneData;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException @ Getter@Setter@NoArgsConstructorpublic class FlowBean implements WritableComparable {/ * * this class is serializable and comparable, so implement WritableComparable interface * upload, download, total traffic * / private int upFlow; private int downFlow; private int sumFlow; public FlowBean (int upFlow, int downFlow) {super (); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow } / * Serialization method * * @ param dataOutput * @ throws IOException * / @ Override public void write (DataOutput dataOutput) throws IOException {dataOutput.writeInt (this.upFlow); dataOutput.writeInt (this.downFlow); dataOutput.writeInt (this.sumFlow) } / * deserialization * @ param dataInput * @ throws IOException * / @ Override public void readFields (DataInput dataInput) throws IOException {this.upFlow = dataInput.readInt (); this.downFlow = dataInput.readInt (); this.sumFlow = dataInput.readInt () } / * print string method * @ return * / @ Override public String toString () {StringBuilder sb = new StringBuilder (); sb.append (this.upFlow); sb.append ("); sb.append (this.downFlow); sb.append ("); sb.append (this.sumFlow); return sb.toString () } / * * object comparison method, used to sort and compare * @ param o * @ return * / @ Override public int compareTo (FlowBean o) {return this.getSumFlow () > o.getSumFlow ()?-1: 1;}}
Mapper:
Package PhoneData;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper {Text k = new Text (); FlowBean v = new FlowBean (); @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString (); String [] fields = line.split ("\ t") / / start parsing the cutting data k.set (fields [1]); int downFlow = Integer.parseInt (fields.length-2]); int upFlow = Integer.parseInt (fields.length-3]); v.setDownFlow (downFlow); v.setUpFlow (upFlow); v.setSumFlow (upFlow + downFlow); context.write (k, v);}}
Reducer:
Package PhoneData;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer {FlowBean v = new FlowBean (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {int upFlow = 0; int downFlow = 0; int sumFlow = 0 / / A pair of for (FlowBean f: values) {upFlow + = f.getUpFlow (); downFlow + = f.getDownFlow (); sumFlow + = f.getSumFlow ();} / / write the aggregated data to the new bean, and then output v.setUpFlow (upFlow); v.setDownFlow (downFlow) V.setSumFlow (sumFlow); context.write (v, key);}}
Driver:
Package PhoneData;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.BZip2Codec;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException Public class PhoneDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {args = new String [] {"G:\\ test\\ A\\ phone_data.txt", "G:\\ test\ A\\ phonetest5\"}; Configuration conf = new Configuration (); / / get job object Job job = Job.getInstance (conf) / / configure driver,map,reduce classes job.setJarByClass (PhoneDriver.class); job.setMapperClass (PhoneMapper.class); job.setReducerClass (PhoneReducer.class); / / specify output classes job.setMapOutputKeyClass (Text.class) for map and reduce; job.setMapOutputValueClass (FlowBean.class); job.setOutputKeyClass (FlowBean.class); job.setOutputValueClass (Text.class) / / specify input data, output path FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); / / submit job job.waitForCompletion (true);}}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un