Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Typical programming scenarios for MapReduce 3

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Custom InputFormat-data classification output   requirements: merge   analysis of small files:

    -when collecting data, synthesize small files or small batches of data into large files and upload them to HDFS

    -use the MapReduce program to merge small files on HDFS before business processing

    -CombineFileInputFormat can be used to improve efficiency in MapReduce processing

The idea of   implementation:

    -write custom InoputFormat

    -rewrite RecordReader to encapsulate the complete contents of a small file read by maptask into a KV pair

    -be sure to set up a custom InputFormat: job.setInputFormatClass (WholeFileInputFormat.class) in the Driver class

Code implementation:

Public class MergeDriver {/ / job public static void main (String [] args) {Configuration conf = new Configuration (); conf.set ("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try {job = Job.getInstance (conf, "combine small files to bigfile"); job.setJarByClass (MergeDriver.class); job.setMapperClass (MyMapper.class) Job.setReducerClass (MyReducer.class); job.setMapOutputKeyClass (NullWritable.class); job.setMapOutputValueClass (Text.class); job.setOutputKeyClass (NullWritable.class); job.setOutputValueClass (Text.class); / / set the custom input class job.setInputFormatClass (MyMyFileInputForamt.class); Path input = new Path ("/ hadoop/input/num_add") Path output = new Path ("/ hadoop/output/merge_output1"); / / use my custom FileInputForamt here to format input MyMyFileInputForamt.addInputPath (job,input); FileSystem fs = FileSystem.get (conf); if (fs.exists (output)) {fs.delete (output, true) } FileOutputFormat.setOutputPath (job, output); int status = job.waitForCompletion (true)? 0: 1; System.exit (status);} catch (Exception e) {e.printStackTrace () }} / / Mapper static private class MyMapper extends Mapper {/ * the map method here is called * / @ Override protected void map (NullWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {context.write (key, value) every time a file is read. }} / / Reducer private static class MyReducer extends Reducer {@ Override protected void reduce (NullWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {for (Text v: values) {context.write (key, v) } / / RecordReader, these two generics are the input key and the value type private static class MyRecordReader extends RecordReader {/ / output value object Text map_value = new Text (); / / file system object, which is used to get the input stream FileSystem fs of the file / / determine whether the current file has finished reading Boolean isReader = false; / / the slice information of the file FileSplit fileSplit / / initialization method, similar to setup in Mapper. The whole class starts by running @ Override public void initialize (InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {/ / initializes the file system object fs = FileSystem.get (context.getConfiguration ()); / / gets the file path fileSplit = (FileSplit) split } / / this method, in the Kmurv passed in each call to map, is the @ Override public boolean nextKeyValue () throws IOException assigned to Kmurv in this method. InterruptedException {/ / reads if (! isReader) {FSDataInputStream input = fs.open (fileSplit.getPath ()) first. / / read the contents of the entire small file at one time byte flush [] = new byte [(int) fileSplit.getLength ()] / / read the contents of the file into the byte array / * Parameter 1: read byte array * Parameter 2: offset to start reading * Parameter 3: read length * / input.readFully (flush, 0 (int) fileSplit.getLength () IsReader = true; map_value.set (flush); / / put the read content in the value of map / / ensure that it can be read exactly once. NextKeyValue () returns true just once to call map, and returns false return isReader;} return false the second time. } @ Override public NullWritable getCurrentKey () throws IOException, InterruptedException {return NullWritable.get ();} @ Override public Text getCurrentValue () throws IOException, InterruptedException {return map_value;} @ Override public float getProgress () throws IOException, InterruptedException {return 0 } @ Override public void close () throws IOException {fs.close ()}} / / FileInputFormat private static class MyMyFileInputForamt extends FileInputFormat {@ Override public RecordReader createRecordReader (InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {MyRecordReader mr = new MyRecordReader (); / / call initialization method mr.initialize (split, context) first Return mr;} 2. Custom OutputFormat   requirements: some original logs need to be enhanced parsed, process

    -read data from the original log file

    -getting data from the business database based on the business

    -get the appropriate connection result based on a connection condition

  Analysis:

    -access to external resources in MapReduce

    -use the MapReduce program to merge small files on HDFS before business processing

    -Custom OutputFormat, rewrite RecordWriter, rewrite specific output data method write () CombineFileInputFormat to improve efficiency

Code implementation

/ / take a simple case as an example to output files according to different levels in different files.

Public class Score_DiffDic {/ / job public static void main (String [] args) {Configuration conf = new Configuration (); conf.set ("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try {job = Job.getInstance (conf, "Score_DiffDic"); job.setJarByClass (Score_DiffDic.class); job.setMapperClass (MyMapper.class) Job.setReducerClass (MyReducer.class); / / set custom output types job.setOutputFormatClass (MyOutputFormat.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (DoubleWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (DoubleWritable.class); Path input = new Path ("/ hadoop/input/num_add") FileInputFormat.addInputPath (job,input); Path output = new Path ("/ hadoop/output/merge_output1"); / / this is the custom output type MyOutputFormat.setOutputPath (job,output); FileSystem fs = FileSystem.get (conf); if (fs.exists (output)) {fs.delete (output, true) } FileOutputFormat.setOutputPath (job, output); int status = job.waitForCompletion (true)? 0: 1; System.exit (status);} catch (Exception e) {e.printStackTrace ();}} / / Mapper private static class MyMapper extends Mapper {Text mk=new Text (); DoubleWritable mv=new DoubleWritable () @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString () .split ("\\ s +"); / / computer,huangxiaoming,85 if (fields.length==3) {mk.set (fields [1]); mv.set (Double.parseDouble (fields [2])) Context.write (mk, mv);}} / / Reducer private static class MyReducer extends Reducer {DoubleWritable mv=new DoubleWritable (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {double sum=0; int count=0 For (DoubleWritable value:values) {sum+=value.get (); count++;} mv.set (sum/count); context.write (key,mv) }} / / FileOutputFormat private static class MyOutputFormat extends FileOutputFormat {@ Override public RecordWriter getRecordWriter (TaskAttemptContext job) throws IOException, InterruptedException {FileSystem fs = FileSystem.get (job.getConfiguration ()); return new MyRecordWrite (fs);}} / / RecordWriter, where the two generics are the Reudcer output Kmuri V type private static class MyRecordWrite extends RecordWriter {FileSystem fs / / path of the output file Path path2 = new Path ("/ hadoop/output/score_out1"); Path path3 = new Path ("/ hadoop/output/score_out2"); FSDataOutputStream output1; FSDataOutputStream output2; public MyRecordWrite () {} / / initialization parameter public MyRecordWrite (FileSystem fs) {this.fs = fs Try {output1=fs.create (path2); output2=fs.create (path3);} catch (IOException e) {e.printStackTrace () } @ Override public void write (Text key, DoubleWritable value) throws IOException, InterruptedException {/ / business logic operations, those with an average score greater than 80 are in path2, and the others in path3 are if (value.get () > 80) {output1.write ((key.toString () + ":" + value.get () + "\ n") .getBytes ()) } else {output2.write ((key.toString () + ":" + value.get () + "\ n") .getBytes ();} @ Override public void close (TaskAttemptContext context) throws IOException, InterruptedException {fs.close (); output1.close (); output2.close ();}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report