Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Hadoop2.6.0 Learning Notes (6) Analysis of TextOutputFormat and RecordWriter

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Lu Chunli's work notes, who said that programmers should not have literary style?

MapReduce provides many default output formats, such as TextOutputFormat, KeyValueOutputFormat, and so on. The number of output files in MapReduce is the same as the number of Reduce. By default, there is one Reduce, the output has only one file, the file name is part-r-00000, and the number of lines in the file content is the same as the number of different key in the map output. If you have two Reduce, the output will have two files, the first is part-r-00000, the second is part-r-00001, and so on.

The class that implements the output function by default in MapReduce is TextOutputFormat, which is mainly used to output text data to HDFS.

Public class TextOutputFormat extends FileOutputFormat {public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; / / defines an inner class to implement the output, with a newline character\ n and a delimiter\ t (which can be modified by parameters) protected static class LineRecordWriter extends RecordWriter {public LineRecordWriter (DataOutputStream out) {/ / is actually FSDataOutputStream this (out, "\ t") The main structure is two methods: write and close * * / public synchronized void write (K key, V value) throws IOException {boolean nullKey = key = = null | | key instanceof NullWritable; boolean nullValue = value = = null | | value instanceof NullWritable; if (nullKey & & nullValue) {return;} if (! nullKey) {writeObject (key) / / process Text type data into byte arrays} if (! (nullKey | | nullValue)) {out.write (keyValueSeparator);} if (! nullValue) {writeObject (value);} out.write (newline); / / Line feeds (newline = "\ n" .getBytes (utf8);)} public synchronized void close (TaskAttemptContext context) throws IOException {out.close () }} / / Internal class definition ends, the following is TextOutputFormat's only key method public RecordWriter getRecordWriter (TaskAttemptContext job) throws IOException, InterruptedException {/ / 1, determine whether compression is required according to Configuration, and obtain compression format and suffix if compression is required / / 2. Get the file path to be generated, getDefaultWorkFile (job, extension) / / 3. Generate a FSDataOutputStream object from the file and return new LineRecordWriter it. Configuration conf = job.getConfiguration (); boolean isCompressed = getCompressOutput (job); String keyValueSeparator= conf.get (SEPERATOR, "\ t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) {/ / if it is compressed, the extension Class is obtained according to compression, V extends Writable > extends TextOutputFormat {/ * OutputFormat outputs the data to a file with a specific name of the specified directory by obtaining the Writer object. * / private MultipleRecordWriter writer = null; / / when TextOutputFormat is implemented, there is a unique identity for each map or task task, which is controlled by TaskID. / / the file name is fixed at the time of output, each output file corresponds to a LineRecordWriter, and its output stream object (FSDataOutputStream) is taken. / / data output is realized through the output stream object during output. / but when implemented here, it is actually required that for a task task, the data it needs to output is written to multiple files, which is not fixed; / / so determine whether the corresponding file already has a Writer object on each output, and if so, continue to output through this object, otherwise create a new one. @ Override public RecordWriter getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException {if (null = = writer) {writer = new MultipleRecordWriter (context, this.getTaskOutputPath (context));} return writer;} / / get the output path of the task, which is still obtained from committer. TaskAttemptContext encapsulates the context of task for later analysis. / / it is implemented by calling the getDefaultWorkFile of the parent class (FileOutputFormat) in TextOutputFormat. / / the default file name defined by MapReduce is obtained in getDefaultWorkFile. If you need to customize the file name, you need to implement private Path getTaskOutputPath (TaskAttemptContext context) throws IOException {Path workPath = null; OutputCommitter committer = super.getOutputCommitter (context); if (committer instanceof FileOutputCommitter) {/ / Get the directory that the task should write results into. WorkPath = (FileOutputCommitter) committer). GetWorkPath ();} else {/ / Get the {@ link Path} to the output directory for the map-reduce job. / / context.getConfiguration () .get (FileOutputFormat.OUTDIR); Path outputPath = super.getOutputPath (context); if (null = = outputPath) {throw new IOException ("Undefined job output-path.");} workPath = outputPath;} return workPath } / * * @ author luchunli * @ description custom RecordWriter. The LineRecordWriter of MapReduce's TextOutputFormat is also an inner class. Refer to the cache of * / public class MultipleRecordWriter extends RecordWriter {/ * * RecordWriter cache * * / private HashMap recordWriters = null; private TaskAttemptContext context; / * * output directory * * / private Path workPath = null Public MultipleRecordWriter () {} public MultipleRecordWriter (TaskAttemptContext context, Path path) {super (); this.context = context; this.workPath = path; this.recordWriters = new HashMap () } @ Override public void write (K key, V value) throws IOException, InterruptedException {String baseName = generateFileNameForKeyValue (key, value, this.context.getConfiguration ()); RecordWriter rw = this.recordWriters.get (baseName); if (null = = rw) {rw = this.getBaseRecordWriter (context, baseName); this.recordWriters.put (baseName, rw) } / / this is actually still the rw.write (key, value) implemented through LineRecordWriter } / / LineRecordWriter is encapsulated through MultipleRecordWriter, and the same task is split on output / / in the MapReduce implementation, there is only one reduce by default (quantity partition partial analysis of Reduce). According to the previous example, all the output will be written to the part-r-00000 file. / / what is done here is to mask the output to part-r-00000 and split the data of the same reduce into multiple files. Private RecordWriter getBaseRecordWriter (TaskAttemptContext context, String baseName) throws IOException {Configuration conf = context.getConfiguration (); boolean isCompressed = getCompressOutput (context); / / in the implementation of LineRecordWriter, the delimiter is specified by the variable: / / public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; / / String keyValueSeparator= conf.get (SEPERATOR, "\ t") / / A comma is given as the partition String keyValueSeparator = ","; RecordWriter rw = null; if (isCompressed) {Class

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report