Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

11. MapReduce-- Custom Input input

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

When it comes to implementing the method of defining input in the input principle of MapReduce--input, it is actually inheriting the methods of InputFormat and RecordReader implementation. The following example illustrates the operation.

1. Demand

Merge multiple files into one large file (somewhat similar to combineInputFormat) and output. Large files include the path where the small files are located, as well as the contents of the small files.

2. Source code

InputFormat

Public class SFileInputFormat extends FileInputFormat {/ * whether to slice * @ param context * @ param filename * @ return * / @ Override protected boolean isSplitable (JobContext context, Path filename) {return false } / * return the reader * @ param inputSplit * @ param taskAttemptContext * @ throws IOException * @ throws InterruptedException * / @ Override public RecordReader createRecordReader (InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {SRecordReader sRecordReader = new SRecordReader (); sRecordReader.initialize (inputSplit, taskAttemptContext); return sRecordReader;}}

RecordReader:

Public class SRecordReader extends RecordReader {private Configuration conf; private FileSplit split; / / the flag bit of whether the current part has been read or not private boolean process = false; private BytesWritable value = new BytesWritable (); / * * initialize * @ param inputSplit * @ param taskAttemptContext * @ throws IOException * @ throws InterruptedException * / @ Override public void initialize (InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {split = (FileSplit) inputSplit Conf = taskAttemptContext.getConfiguration ();} / * read the next KV * @ return * @ throws IOException * @ throws InterruptedException * / @ Override public boolean nextKeyValue () throws IOException from the part, InterruptedException {if (! process) {byte [] buffer = new byte [(int) split.getLength ()] / / get file system Path path = split.getPath (); FileSystem fs = path.getFileSystem (conf); / / create input stream FSDataInputStream fis = fs.open (path); / / stream docking to read data buffer IOUtils.readFully (fis, buffer, 0, buffer.length) / / load the data into value value.set (buffer, 0, buffer.length); / / close the stream IOUtils.closeStream (fis); / / set the flag bit to true after reading, indicating that process = true; return true;} return false has been read @ Override public NullWritable getCurrentKey () throws IOException, InterruptedException {return NullWritable.get ();} @ Override public BytesWritable getCurrentValue () throws IOException, InterruptedException {return this.value;} @ Override public float getProgress () throws IOException, InterruptedException {return process? 1: 0;} @ Override public void close () throws IOException {}}

Mapper:

Public class SFileMapper extends Mapper {Text k = new Text (); @ Override protected void setup (Context context) throws IOException, InterruptedException {FileSplit inputSplit = (FileSplit) context.getInputSplit (); String name = inputSplit.getPath (). ToString (); k.set (name);} @ Override protected void map (NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write (k, value);}}

Reducer:

Public class SFileReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {context.write (key, values.iterator (). Next ());}}

Driver:

Public class SFileDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {args = new String [] {"G:\\ test\\ date\ A\\ order\", "G:\ test\\ date\ A\\ order2\"}; Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (SFileDriver.class); job.setMapperClass (SFileMapper.class) Job.setReducerClass (SFileReducer.class); / / sets input and output classes. Default is TextInputFormat job.setInputFormatClass (SFileInputFormat.class); job.setOutputFormatClass (SequenceFileOutputFormat.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (BytesWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (BytesWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])) FileOutputFormat.setOutputPath (job, new Path (args [1])); job.waitForCompletion (true);}}

Custom inputformat needs to be specified through job.setInputFormatClass () in job

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report