Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Typical programming scenarios for MapReduce 2

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1.MapReduce multi-Job series connection

   introduction: a slightly more complex processing logic often requires multiple MapReduce programs to process in series, and the series of multiple job can be realized with the help of the JobControl of the MapReduce framework.

Demand:

There are two MapReduce tasks, Flow's SumMR and SortMR, which have dependencies: the output of SumMR is the input of SortMR, so SortMR starts after SumMR is completed

These two programs are available at: https://blog.51cto.com/14048416/2342024

How do you achieve the dependency between the two codes?

Code implementation (only the code with multiple Job concatenation is given here)

Public class JobDecy {public static void main (String [] args) {Configuration conf = new Configuration (true); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource ("core-site.xml"); conf.addResource ("hdfs-site.xml"); System.setProperty ("HADOOP_USER_NAME", "hadoop") Try {/ / job1 FlowSum Job job1 = Job.getInstance (conf); job1.setJobName ("FlowSum"); / / set task class job1.setJarByClass (FlowSum.class); / / set Mapper Reducer Combine job1.setMapperClass (FlowSum.MyMapper.class); job1.setReducerClass (FlowSum.MyReducer.class) Job1.setCombinerClass (FlowSum.FlowSumCombine.class); / / sets the input and output types of map and reduce: job1.setMapOutputKeyClass (Text.class); job1.setMapOutputValueClass (Text.class); job1.setOutputKeyClass (Text.class); job1.setMapOutputValueClass (Text.class) / / specify the input and output paths of the mapreduce program data Path input1 = new Path ("/ data/input"); Path output1 = new Path ("/ data/output"); / / make sure that if (output1.getFileSystem (conf) .output (output1)) {output1.getFileSystem (conf) .delete (output1, true) does not exist in the output. / / Recursively delete} FileInputFormat.addInputPath (job1, input1); FileOutputFormat.setOutputPath (job1, output1); / / Job2 FlowSumSort Job job2= Job.getInstance (conf); job2.setJarByClass (FlowSumSort.class); job2.setJobName ("FlowSumSort"); job2.setMapperClass (Mapper.class); job2.setReducerClass (Reducer.class) Job2.setOutputKeyClass (FlowBean.class); job2.setOutputValueClass (NullWritable.class); / / specify the input and output paths of the mapreduce program data Path input2=new Path ("/ / data/output"); Path output2 = new Path ("/ data/output1") / / make sure that if (output2.getFileSystem (conf) .delete (output2)) {output2.getFileSystem (conf) .delete (output2,true) does not exist in output; / / Recursive delete} FileInputFormat.addInputPath (job2,input2); FileOutputFormat.setOutputPath (job2,output2) / / create ControlledJob ControlledJob job1_cj=new ControlledJob (job1.getConfiguration ()) for each task; ControlledJob job2_cj=new ControlledJob (job2.getConfiguration ()); / / bind job1_cj.setJob (job1); job2_cj.setJob (job2); / / set job dependency job2_cj.addDependingJob (job2_cj) / / job2 depends on job1 / / to create jobControl JobControl jc=new JobControl ("sum and sort"); jc.addJob (job1_cj); jc.addJob (job2_cj); / / to open Job Thread jobThread=new Thread (jc) using a thread; / / to open task jobThread.start () / / to ensure that the main program does not terminate, check whether the job while (! jc.allFinished ()) {try {Thread.sleep (500);} catch (InterruptedException e) {e.printStackTrace () } / / after the crime assignment is completed, terminate the thread and release the resource jc.stop ();} catch (IOException e) {e.printStackTrace ();} 2.TopN algorithm implementation (secondary sorting)

Demand: ask for the top three with the highest total score in each class

Field: class name, mathematics, Chinese and English (tab division between fields)

Analysis:

 -using "Class and Total score" as key, you can send all student performance data read in map phase to reduce in reverse order according to class and grade.

 -use GroupingComparator to aggregate the same kv of the class into a group on the reduce side, and then take the first three as the top three

Code implementation:

Custom student classes:

Public class Student implements WritableComparable {private String tincture class; private String tweak name; private int tweak sumSource; public Student () {} public void set (String tinct class.String tinctameint chinese,int math,int english) {this.t_class=t_class; this.t_name=t_name; this.t_sumSource=chinese+math+english;} public String getT_class () {return t_class } public void setT_class (String t_class) {this.t_class = tweak class;} public String getT_name () {return tweename;} public void setT_name (String t_name) {this.t_name = tweename;} public int getT_sumSource () {return t_sumSource } public void setT_sumSource (int t_sumSource) {this.t_sumSource = tasking sumSource;} / comparison rule @ Override public int compareTo (Student stu) {/ / first compare int result1=this.t_class.compareTo (stu.t_class) according to the class. / / the same class compares if (result1==0) {return stu.t_sumSource-this.t_sumSource;} return result1;} / / serialize @ Override public void write (DataOutput out) throws IOException {out.writeUTF (this.t_class); out.writeUTF (this.t_name); out.writeInt (this.t_sumSource) according to the total score } / / deserialize @ Override public void readFields (DataInput in) throws IOException {this.t_class=in.readUTF (); this.t_name=in.readUTF (); this.t_sumSource=in.readInt ();}}

Custom grouping:

/ / Custom grouping rule private static class MyGroupComparator extends WritableComparator {/ / this code must be added, and the construction of the parent class public MyGroupComparator () {super (Student.class, true) is called. } / the grouping rules that determine the data input into reduce are grouped according to the class / @ Override public int compare (WritableComparable a, WritableComparable b) {Student stu1= (Student) a; Student stu2= (Student) a; return stu1.getTclass (). CompareTo (stu2.getTclass ());}} *

MR program:

/ / Mapper

Private static class MyMapper extends Mapper {Student bean = new Student (); NullWritable mv = NullWritable.get (); @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString () .split ("\\ s +") / / Class name, Mathematical language, English String t_clas=fields [0]; String t_name=fields [1]; int chinese=Integer.parseInt (fields [2]); int math=Integer.parseInt (fields [3]); int english=Integer.parseInt (fields [4]); bean.set Context.write (bean,mv);}} / Reducer private static class MyReducer extends Reducer {@ Override protected void reduce (Student key, Iterable values, Context context) throws IOException, InterruptedException {int count = 0; for (NullWritable value:values) {if (count > 2) {break } context.write (key,value); count++;}

Job:

Public class ClazzScoreGroupComparator {public static void main (String [] args) {Configuration conf=new Configuration (true); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource ("core-site.xml"); conf.addResource ("hdfs-site.xml") System.setProperty ("HADOOP_USER_NAME", "hadoop"); try {Job job= Job.getInstance (conf); job.setJarByClass (ClazzScoreGroupComparator.class); job.setJobName ("ClazzScoreGroupComparator"); job.setMapperClass (MyMapper.class); job.setReducerClass (MyReducer.class) / / specify custom grouping job.setGroupingComparatorClass (MyGroupComparator.class); job.setOutputKeyClass (Student.class); job.setOutputValueClass (NullWritable.class); / / specify the input and output paths of the mapreduce program data Path input=new Path ("/ / data/student.txt"); Path output = new Path ("/ data/output2") / / make sure that if (output.getFileSystem (conf) .delete (output)) {output.getFileSystem (conf) .delete (output,true) does not exist in output; / / Recursive delete} FileInputFormat.addInputPath (job,input); FileOutputFormat.setOutputPath (job,output); boolean success=job.waitForCompletion (true) System.exit (success?0:1);} catch (Exception e) {e.printStackTrace ();}} 3. MapReduce global counters

    introduction: counters are used to record the progress and status of job execution. Its function can be understood as a journal. We can insert a counter somewhere in the program to record changes in data or progress. MapReduce comes with many default Counter. Now let's analyze the meaning of these default Counter to facilitate the observation of Job results, such as input bytes, output bytes, Map input / output bytes and bars, Reduce input / output bytes and bars and so on.

Requirements: use global counters to count the total number of words and lines in all files in a directory

Code implementation:

Public class CounterWordCount {public static void main (String [] args) {Configuration conf=new Configuration (true); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource ("core-site.xml"); conf.addResource ("hdfs-site.xml") System.setProperty ("HADOOP_USER_NAME", "hadoop"); try {Job job= Job.getInstance (conf); job.setJarByClass (CounterWordCount.class); job.setJobName ("CounterWordCount"); job.setMapperClass (MyMapper.class); / / set reduceTask to 0 job.setNumReduceTasks (0); job.setOutputKeyClass (Text.class) Job.setOutputValueClass (LongWritable.class); / / specify the input and output paths of the mapreduce program data Path input=new Path ("/ / data/"); Path output = new Path ("/ data/output3") / / make sure that if (output.getFileSystem (conf) .delete (output)) {output.getFileSystem (conf) .delete (output,true) does not exist in output; / / Recursive delete} FileInputFormat.addInputPath (job,input); FileOutputFormat.setOutputPath (job,output); boolean success=job.waitForCompletion (true) System.exit (success?0:1);} catch (Exception e) {e.printStackTrace ();}} / / define enumerations to store counters enum CouterWordsCounts {COUNT_WORDS, COUNT_LINES} / / Mapper private static class MyMapper extends Mapper {Text mk=new Text (); LongWritable mv=new LongWritable () @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {/ / counts the number of lines, because the default read text is read line by line, so map executes once, the number of lines + 1 context.getCounter (CouterWordsCounts.COUNT_LINES) .increment (1); String words [] = value.toString (). Split ("\\ s +") For (String word:words) {context.getCounter (CouterWordsCounts.COUNT_WORDS) .increment (1);}} / / this method executes @ Override protected void cleanup (Context context) throws IOException, InterruptedException {mk.set ("number of rows:") at the end of the class. Mv.set (context.getCounter (CouterWordsCounts.COUNT_LINES). GetValue ()); context.write (mk,mv); mk.set ("number of words:"); mv.set (context.getCounter (CouterWordsCounts.COUNT_WORDS). GetValue ()); context.write (mk,mv);}} 4.MapReduce Join

    introduction: in a variety of real-world business scenarios, it is very common to join two pieces of data according to a keyword. If both pieces of data are small, the connection can be done directly in memory. What if it's a large amount of data? Obviously, OOM occurs when a connection is made in memory. MapReduce can be used to solve connections with large amounts of data. There are two kinds of MapReduce join, map join and reduce join.

Map join

    introduction: MapJoin is suitable for connections with a small piece of data. The way to do this is to load all the small pieces of data directly into memory and index them according to the link keywords. Then a large amount of data is used as input to the MapTask, and each input to the map () method goes to memory to match the connection directly. Then output the connection result according to key.

Data introduction:

Movies.dat:1::Toy Story (1995):: Animation | Children's | Comedy

Field meaning: movieid, moviename, movietype

Ratings.dat:1::1193::5::978300760

Field meaning: userid, movieid, rate, timestamp

Code implementation:

Public class MovieRatingMapJoinMR {public static void main (String [] args) {Configuration conf = new Configuration (); conf.set ("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty ("HADOOP_USER_NAME", "hadoop"); try {Job job = Job.getInstance (conf); job.setJarByClass (MovieRatingMapJoinMR.class); job.setMapperClass (MyMapper.class) Job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class); job.setNumReduceTasks (0); String minInput = args [0]; String maxInput = args [1]; String output = args [2]; FileInputFormat.setInputPaths (job, new Path (maxInput)); Path outputPath = new Path (output) FileSystem fs = FileSystem.get (conf); if (fs.exists (outputPath)) {fs.delete (outputPath, true);} FileOutputFormat.setOutputPath (job, outputPath); / / load small tables into memory URI uri=new Path (minInput). ToUri (); job.addCacheFile (uri) Boolean status = job.waitForCompletion (true); System.exit (status?0:1);} catch (Exception e) {e.printStackTrace ();}} / / Mapper private static class MyMapper extends Mapper {Text mk = new Text (); Text mv = new Text () / / all parsed key-value private static Map movieMap = new HashMap () used to store small pieces of data; @ Override protected void setup (Context context) throws IOException, InterruptedException {/ / reads the table data loaded into memory and encapsulates the data in a movieMap container URI [] cacheFiles = context.getCacheFiles () / / get the file name String myfilePath = cacheFiles [0] .toString (); BufferedReader br = new BufferedReader (new FileReader (myfilePath)); / / the line here is the movie String line read line by line from the file = "" While ((line = br.readLine ())! = null) {/ / movieid::moviename::movietype String fields [] = line.split ("::"); movieMap.put (fields [0], fields [1] + "\ t" + fields [2]);} IOUtils.closeStream (br) } @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString () .split ("::"); / / userid::movieid::rate::timestamp String userid = fields [0]; String movieid = fields [1]; String rate = fields [2] String timestamp = fields [3]; if (movieMap.containsKey (userid)) {String movieFileds = movieMap.get (userid); mk.set (userid); mv.set (movieFileds + "\ t" + movieid + "\ t" + rate + "\ t" + timestamp); context.write (mk, mv) } reduce join

Introduction to    :

In the    -map phase, two data data1 and data2 are read in by map and parsed into key-value pairs with the link field as key and the query field as value, and indicate whether the data source is data1 or data2.

In the    -reduce phase, reducetask will receive the same key data from data1 and data2, and make product links on the reduce side. The most direct impact is memory consumption, resulting in OOM.

Data introduction:

Movies.dat:1::Toy Story (1995):: Animation | Children's | Comedy

Field meaning: movieid, moviename, movietype

Ratings.dat:1::1193::5::978300760

Field meaning: userid, movieid, rate, timestamp

Code implementation:

Public class MovieRatingReduceJoinMR {public static void main (String [] args) {Configuration conf = new Configuration (); conf.set ("fs.defaultFS", "hdfs://zzy:9000"); System.setProperty ("HADOOP_USER_NAME", "hadoop"); try {Job job = Job.getInstance (conf); job.setJarByClass (MovieRatingReduceJoinMR.class); job.setMapperClass (MyMapper.class) Job.setReducerClass (MyReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class); String Input = args [0]; String output = args [1]; FileInputFormat.setInputPaths (job, new Path (Input)); Path outputPath = new Path (output); FileSystem fs = FileSystem.get (conf) If (fs.exists (outputPath)) {fs.delete (outputPath, true);} FileOutputFormat.setOutputPath (job, outputPath); boolean status = job.waitForCompletion (true); System.exit (status?0:1);} catch (Exception e) {e.printStackTrace () }} / / Mapper private static class MyMapper extends Mapper {private String name; Text mk = new Text (); Text mv = new Text (); / / get the file name @ Override protected void setup (Context context) throws IOException, InterruptedException {/ / InputSplit is an abstract class, using its implementation class FileSplit FileSplit is= (FileSplit) context.getInputSplit () Name=is.getPath () .getName ();} @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {/ / movies.dat movieid::moviename::movietype / / ratings.dat userid::movieid::rate::timestamp String OutputKey=null; String OutputValue=null String fields [] = value.toString (). Split ("::"); if (name.endsWith ("movies.dat")) {OutputKey=fields [0]; OutputValue=fields [1] + "\ t" + fields [2] + "_" + "movies";} else if (name.endsWith ("ratings.dat")) {OutputKey=fields [1] OutputValue=fields [0] + "\ t" + fields [2] + "\ t" + fields [3] + "_" + "ratings";} mk.set (OutputKey); mv.set (OutputValue); context.write (mk,mv);}} / / Reducer private static class MyReducer extends Reducer

< Text, Text, Text, Text>

{Text rv=new Text (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {List movies=new ArrayList (); List ratings=new ArrayList () / / add data to the container where the two table fields are stored respectively for (Text value:values) {String fields [] = value.toString () .split ("_"); if (fields [1] .equals ("movies")) {movies.add (fields [0]) } else if (fields [1] .equals ("ratings")) {ratings.add (fields [0]) }} / / data if (ratings.size () > 0&&movies.size () > 0) {for (String movie:movies) {for (String rate:ratings) {rv.set (movie+ "\ t" + rate) Context.write (key,rv);}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report