In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
Today, I will talk to you about how to understand the TopK algorithm and its implementation. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.
1. Problem description
In the scale of big data, we often encounter a kind of K which requires the highest frequency, which is called "TOPK" problem. For example: statistics of the top 10 most popular songs, statistics of the top five websites with the highest traffic, etc.
2. For example, the top 5 websites with the highest traffic:
Data test.data file:
Data format interpretation: domain name uplink traffic downlink traffic
Train of thought:
1. For each row parsed by Mapper, get each field according to "\ t".
2. Because URL has many duplicate records, put URL on key (by analyzing the principle of MapReduce) and traffic on value
3. Calculate the total traffic in reduce, cache the data through TreeMap, and finally output it at the same time (it is worth noting that the cleanup method of Reduce class must be used for one-time output)
The procedure is as follows:
Mapper class:
Package com.itheima.hadoop.mapreduce.mapper;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Counter;import com.itheima.hadoop.mapreduce.bean.FlowBean Public class TopKURLMapper extends Mapper {/ * * @ param key *: offset per line * @ param value *: content of each line * @ param context *: environment context * / @ Override public void map (LongWritable key, Text value, Context context) throws IOException InterruptedException {/ * this counter is org.apache.hadoop.mapreduce.Counter * / Counter counter = context .getCounter ("ExistProblem", "ExistProblemLine") / / customize the defective row error counter String line = value.toString (); / / read a row of data String [] fields = line.split ("\ t"); / / get each field and divide it into try {String url = fields [0] by\ t; / / get the URL field long upFlow = Long.parseLong (fields [1]) / / get uplink traffic (upFlow) field long downFlow = Long.parseLong (fields [2]); / / get downlink traffic (downFlow) field FlowBean bean = new FlowBean (upFlow, downFlow); / / encapsulate uplink and downlink traffic into bean Text tUrl = new Text (url); / / convert java data type to hadoop data type context.write (tUrl, bean) / / A large amount of data is passed. It is encapsulated into bean for transmission (serialization should be paid attention to when tips:bean transfers)} catch (Exception e) {e.printStackTrace (); counter.increment (1); / / record the number of error rows}
Reduce class:
Package com.itheima.hadoop.mapreduce.reducer;import java.io.IOException;import java.util.Map.Entry;import java.util.TreeMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLReducer extends Reducer {private TreeMap treeMap = new TreeMap () / * * @ param key *: same URL * @ param values * on every line: total traffic bean * / @ Override public void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {long countUpFlow = 0; long countDownFlow = 0 / * * 1. Take out the total traffic of each bean 2, count the total traffic of multiple bean 3, cache it to treeMap * / for (FlowBean bean: values) {countUpFlow + = bean.getUpFlow (); / / Statistics upstream traffic countDownFlow + = bean.getDownFlow () / / Total downlink traffic} / / encapsulated traffic FlowBean bean = new FlowBean (countUpFlow, countDownFlow); treeMap.put (bean, new Text (key)) / caching to treeMap} @ Override public void cleanup (Context context) throws IOException, InterruptedException {/ / traversing cache for (Entry entry: treeMap.entrySet ()) {context.write (entry.getKey (), entry.getValue ());} super.cleanup (context); / / cannot move the original destroy operation}}
FlowBean class:
Package com.itheima.hadoop.mapreduce.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable, Comparable {private long upFlow; private long downFlow; private long maxFlow; @ Override public String toString () {return upFlow + "\ t" + downFlow + "\ t" + maxFlow } / * 1. Serialization needs default constructor (reflection) 2. In readFields () and write () methods, write and read * / public FlowBean () {} public FlowBean (long upFlow, long downFlow) {this.upFlow = upFlow; this.downFlow = downFlow; this.maxFlow = upFlow + downFlow in sequence } public long getUpFlow () {return upFlow;} public void setUpFlow (long upFlow) {this.upFlow = upFlow;} public long getDownFlow () {return downFlow;} public void setDownFlow (long downFlow) {this.downFlow = downFlow;} public long getMaxFlow () {return maxFlow;} public void setMaxFlow (long maxFlow) {this.maxFlow = maxFlow } @ Override public void readFields (DataInput dataIn) throws IOException {upFlow = dataIn.readLong (); downFlow = dataIn.readLong (); maxFlow = dataIn.readLong ();} @ Override public void write (DataOutput dataOut) throws IOException {dataOut.writeLong (upFlow); dataOut.writeLong (downFlow); dataOut.writeLong (maxFlow) @ Override public int compareTo (FlowBean o) {return this.maxFlow > o.maxFlow?-1: this.maxFlow < o.maxFlow? 1: 0;}}
Driver class:
Package com.itheima.hadoop.drivers;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import com.itheima.hadoop.mapreduce.bean.FlowBean;import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper Import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer Public class TopKURLDriver extends Configured implements Tool {@ Override public int run (String [] args) throws Exception {/ * 1, create job Job * 2, set Class * 3 submitted by job, set MapperClass, set ReduceClass * 4, set OutputKey and OutputValue types * 5 of Mapper and Reduce, set the path to process files Path to output result * 6, submit job * / Configuration conf = new Configuration () Job job = Job.getInstance (conf); job.setJarByClass (TopKURLRunner.class); job.setMapperClass (TopKURLMapper.class); job.setReducerClass (TopKURLReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (FlowBean.class); job.setOutputKeyClass (FlowBean.class); job.setOutputValueClass (Text.class) FileInputFormat.setInputPaths (job,new Path (args [0])); FileOutputFormat.setOutputPath (job,new Path (args [1])); / / Parameter true is the printing progress return job.waitForCompletion (true)? 0FileOutputFormat.setOutputPath 1;}} package com.itheima.hadoop.runner;import org.apache.hadoop.util.ToolRunner;import com.itheima.hadoop.runner.TopKURLRunner Public class TopKURLRunner {public static void main (String [] args) throws Exception {int res = ToolRunner.run (new TopKURLRunner (), args); System.exit (res);}}
Run the command: hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver / test/inputData / test/outputData
Running result:
After reading the above, do you have any further understanding of how to understand the TopK algorithm and its implementation? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.