Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use MapReduce to analyze star Weibo data

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces how to use MapReduce to analyze star Weibo data, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let the editor take you to understand it.

1. Project requirements

Custom input format, the star Weibo data will be sorted by the number of fans, followers, Weibo, the number of output to different files.

2. Data set

Star star Weibo name fan number Weibo number of followers

Yu Haoming 10591367 206558

Lee Min-ho 22898071 11 268

Lin Xinru 57488649 214 5940

Huang Xiaoming 22616497 506 2011

Zhang Liangying 27878708 238 3846

Li Na 23309493 81 631

Xu Xiaoping 11659926 1929 13795

Tang Yan 24301532 200 2391

Fei Jun 8779383 577 4251

3. Analysis

Custom InputFormat reads star Weibo data, sorts star fan, followers and microblogs data by custom getSortedHashtableByValue method, and then uses MultipleOutputs to output different items to different files.

4. Realize

1) define WeiBo entity class and implement WritableComparable interface

Package com.buaa; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; / * * @ ProjectName MicroblogStar * @ PackageName com.buaa * @ ClassName WeiBo * @ Description TODO * @ Author Liu Jichao * @ Date 2016-05-07 14:54:29 * / public class WeiBo implements WritableComparable {/ / fans private int fan; / / follow private int followers / the number of Weibo private int microblogs; public WeiBo () {}; public WeiBo (int fan,int followers,int microblogs) {this.fan = fan; this.followers = followers; this.microblogs = microblogs;} public void set (int fan,int followers,int microblogs) {this.fan = fan; this.followers = followers; this.microblogs = microblogs } / / implement the readFields () method of WritableComparable so that the data can be serialized to complete network transfer or file input @ Override public void readFields (DataInput in) throws IOException {fan = in.readInt (); followers = in.readInt (); microblogs = in.readInt () } / / implement the write () method of WritableComparable so that the data can be serialized to complete network transfer or file output @ Override public void write (DataOutput out) throws IOException {out.writeInt (fan); out.writeInt (followers); out.writeInt (microblogs) } @ Override public int compareTo (Object o) {/ / TODO Auto-generated method stub return 0;} public int getFan () {return fan;} public void setFan (int fan) {this.fan = fan;} public int getFollowers () {return followers } public void setFollowers (int followers) {this.followers = followers;} public int getMicroblogs () {return microblogs;} public void setMicroblogs (int microblogs) {this.microblogs = microblogs;}}

2) Custom WeiboInputFormat, inheriting FileInputFormat abstract class

Package com.buaa; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit Import org.apache.hadoop.util.LineReader; / * * @ ProjectName MicroblogStar * @ PackageName com.buaa * @ ClassName WeiboInputFormat * @ Description TODO * @ Author Liu Jichao * @ Date 2016-05-07 10:23:28 * / public class WeiboInputFormat extends FileInputFormat {@ Override public RecordReader createRecordReader (InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {/ / Custom WeiboRecordReader class, read return new WeiboRecordReader () by line () } public class WeiboRecordReader extends RecordReader {public LineReader in; / / declare key type public Text lineKey = new Text (); / / declare value type public WeiBo lineValue = new WeiBo () @ Override public void initialize (InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {/ / get split FileSplit split = (FileSplit) input; / / get configuration Configuration job = context.getConfiguration () / / sharding path Path file = split.getPath (); FileSystem fs = file.getFileSystem (job); / / Open file FSDataInputStream filein = fs.open (file); in = new LineReader (filein,job) } @ Override public boolean nextKeyValue () throws IOException, InterruptedException {/ / one row of data Text line = new Text (); int linesize = in.readLine (line); if (linesize = = 0) return false / / parse the data of each row into an array String [] pieces = line.toString (). Split ("\ t"); if (pieces.length! = 5) {throw new IOException ("Invalid record received") } int an Integer.parseInt; try {/ / fans a = Integer.parseInt (pieces [2] .trim ()); / / follow b = Integer.parseInt (pieces [3] .trim ()) / / number of Weibo c = Integer.parseInt (pieces [4] .trim ());} catch (NumberFormatException nfe) {throw new IOException ("Error parsing floating poing value in record") } / / Custom key and value values lineKey.set (pieces [0]); lineValue.set (a, b, c); return true } @ Override public void close () throws IOException {if (in! = null) {in.close ();} @ Override public Text getCurrentKey () throws IOException, InterruptedException {return lineKey } @ Override public WeiBo getCurrentValue () throws IOException, InterruptedException {return lineValue;} @ Override public float getProgress () throws IOException, InterruptedException {return 0;}

3) write mr program

Package com.buaa; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job Import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner / * @ ProjectName MicroblogStar * @ PackageName com.buaa * @ ClassName WeiboCount * @ Description TODO * @ Author Liu Jichao * @ Date 2016-05-07 09:07:36 * / public class WeiboCount extends Configured implements Tool {/ / tab separator private static String TAB_SEPARATOR = "\ t"; / / fan private static String FAN = "fan"; / / follow private static String FOLLOWERS = "followers" / number of Weibo private static String MICROBLOGS = "microblogs"; public static class WeiBoMapper extends Mapper {@ Override protected void map (Text key, WeiBo value, Context context) throws IOException, InterruptedException {/ / Fan context.write (new Text (FAN), new Text (key.toString () + TAB_SEPARATOR + value.getFan () / follow context.write (new Text (FOLLOWERS), new Text (key.toString () + TAB_SEPARATOR + value.getFollowers (); / / number of Weibo context.write (new Text (MICROBLOGS), new Text (key.toString () + TAB_SEPARATOR + value.getMicroblogs ();}} public static class WeiBoReducer extends Reducer {private MultipleOutputs mos Protected void setup (Context context) throws IOException, InterruptedException {mos = new MultipleOutputs (context);} protected void reduce (Text Key, Iterable Values,Context context) throws IOException, InterruptedException {Map map = new HashMap

< String,Integer>

(); for (Text value: Values) {/ / value = name + (number of fans or followers or Weibo) String [] records = value.toString () .split (TAB_SEPARATOR); map.put (records [0], Integer.parseInt (records [1] .toString () } / / sort the data in Map Map.Entry [] entries = getSortedHashtableByValue (map); for (int I = 0; I < entries.length;i++) {mos.write (Key.toString (), entries.getKey (), entries.getValue ()) } protected void cleanup (Context context) throws IOException, InterruptedException {mos.close ();} @ SuppressWarnings ("deprecation") @ Override public int run (String [] args) throws Exception {/ / profile object Configuration conf = new Configuration () / / determine whether the path exists, and if so, delete Path mypath = new Path (args [1]); FileSystem hdfs = mypath.getFileSystem (conf); if (hdfs.isDirectory (mypath)) {hdfs.delete (mypath, true);} / / Construction task Job job = new Job (conf, "weibo") / / main class job.setJarByClass (WeiboCount.class); / / Mapper job.setMapperClass (WeiBoMapper.class); / / Mapper key output type job.setMapOutputKeyClass (Text.class); / / Mapper value output type job.setMapOutputValueClass (Text.class); / / Reducer job.setReducerClass (WeiBoReducer.class) / / Reducer key output type job.setOutputKeyClass (Text.class); / / Reducer value output type job.setOutputValueClass (IntWritable.class); / / input path FileInputFormat.addInputPath (job, new Path (args [0])); / / output path FileOutputFormat.setOutputPath (job, new Path (args [1])) / / Custom input format job.setInputFormatClass (WeiboInputFormat.class); / Custom file output categories MultipleOutputs.addNamedOutput (job, FAN, TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput (job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput (job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class) / / remove job to set outputFormatClass, and instead set LazyOutputFormat.setOutputFormatClass (job, TextOutputFormat.class) through LazyOutputFormat; / / submit task return job.waitForCompletion (true)? 0:1 } / / sort the data in Map (only suitable for small amounts of data) @ SuppressWarnings ("unchecked") public static Entry [] getSortedHashtableByValue (Map h) {Entry [] entries = (Entry []) h.entrySet () .toArray (new Entry [0]) / sort Arrays.sort (entries, new Comparator () {public int compare (Entry entry1, Entry entry2) {return entry2.getValue () .compareTo (entry1.getValue ());}}); return entries } public static void main (String [] args) throws Exception {String [] args0 = {"hdfs://ljc:9000/buaa/microblog/weibo.txt", "hdfs://ljc:9000/buaa/microblog/out/"}; int ec = ToolRunner.run (new Configuration (), new WeiboCount (), args0); System.exit (ec) }}

5. Running result

Thank you for reading this article carefully. I hope the article "how to use MapReduce to analyze star Weibo data" shared by the editor will be helpful to everyone. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report