In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
When it comes to implementing the method of defining input in the input principle of MapReduce--input, it is actually inheriting the methods of InputFormat and RecordReader implementation. The following example illustrates the operation.
1. Demand
Merge multiple files into one large file (somewhat similar to combineInputFormat) and output. Large files include the path where the small files are located, as well as the contents of the small files.
2. Source code
InputFormat
Public class SFileInputFormat extends FileInputFormat {/ * whether to slice * @ param context * @ param filename * @ return * / @ Override protected boolean isSplitable (JobContext context, Path filename) {return false } / * return the reader * @ param inputSplit * @ param taskAttemptContext * @ throws IOException * @ throws InterruptedException * / @ Override public RecordReader createRecordReader (InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {SRecordReader sRecordReader = new SRecordReader (); sRecordReader.initialize (inputSplit, taskAttemptContext); return sRecordReader;}}
RecordReader:
Public class SRecordReader extends RecordReader {private Configuration conf; private FileSplit split; / / the flag bit of whether the current part has been read or not private boolean process = false; private BytesWritable value = new BytesWritable (); / * * initialize * @ param inputSplit * @ param taskAttemptContext * @ throws IOException * @ throws InterruptedException * / @ Override public void initialize (InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {split = (FileSplit) inputSplit Conf = taskAttemptContext.getConfiguration ();} / * read the next KV * @ return * @ throws IOException * @ throws InterruptedException * / @ Override public boolean nextKeyValue () throws IOException from the part, InterruptedException {if (! process) {byte [] buffer = new byte [(int) split.getLength ()] / / get file system Path path = split.getPath (); FileSystem fs = path.getFileSystem (conf); / / create input stream FSDataInputStream fis = fs.open (path); / / stream docking to read data buffer IOUtils.readFully (fis, buffer, 0, buffer.length) / / load the data into value value.set (buffer, 0, buffer.length); / / close the stream IOUtils.closeStream (fis); / / set the flag bit to true after reading, indicating that process = true; return true;} return false has been read @ Override public NullWritable getCurrentKey () throws IOException, InterruptedException {return NullWritable.get ();} @ Override public BytesWritable getCurrentValue () throws IOException, InterruptedException {return this.value;} @ Override public float getProgress () throws IOException, InterruptedException {return process? 1: 0;} @ Override public void close () throws IOException {}}
Mapper:
Public class SFileMapper extends Mapper {Text k = new Text (); @ Override protected void setup (Context context) throws IOException, InterruptedException {FileSplit inputSplit = (FileSplit) context.getInputSplit (); String name = inputSplit.getPath (). ToString (); k.set (name);} @ Override protected void map (NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write (k, value);}}
Reducer:
Public class SFileReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {context.write (key, values.iterator (). Next ());}}
Driver:
Public class SFileDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {args = new String [] {"G:\\ test\\ date\ A\\ order\", "G:\ test\\ date\ A\\ order2\"}; Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (SFileDriver.class); job.setMapperClass (SFileMapper.class) Job.setReducerClass (SFileReducer.class); / / sets input and output classes. Default is TextInputFormat job.setInputFormatClass (SFileInputFormat.class); job.setOutputFormatClass (SequenceFileOutputFormat.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (BytesWritable.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (BytesWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])) FileOutputFormat.setOutputPath (job, new Path (args [1])); job.waitForCompletion (true);}}
Custom inputformat needs to be specified through job.setInputFormatClass () in job
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.