In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces the hadoop mapreduce how to customize InputFormat, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.
First we define a class that inherits FileInputFormat, overrides the createRecordReader method to return RecordReader, and then defines a class that inherits the RecordReader,createRecordReader method to return an object that is a subclass of our defined RecordReader.
The code is as follows
Public class TrackInputFormat extends FileInputFormat {@ Override public RecordReader createRecordReader (InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {/ / TODO Auto-generated method stub return new TrackRecordReader ();}
Package input;import java.io.IOException;import java.io.InputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader Import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.log4j.Logger;/** * Treats keys as offset in file and value as line. * * @ deprecated Use * {@ link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. * / public class TrackRecordReader extends RecordReader {Logger logger = Logger.getLogger (TrackRecordReader.class.getName ()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null / /-- / / line delimiter, that is, the private byte [] separator = "] @\ n" .getBytes () of a record /-public void initialize (InputSplit genericSplit, TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration () / / mapreduce.input.linerecordreader.line.maxlength this.maxLineLength = job.getInt ("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart (); end = start + split.getLength (); final Path file = split.getPath (); / / logger.info ("path=" + file.toString ()) CompressionCodecs = new CompressionCodecFactory (job); final CompressionCodec codec= compressionCodecs.getCodec (file); FileSystem fs = file.getFileSystem (job); FSDataInputStream fileIn = fs.open (split.getPath ()); boolean skipFirstLine = false; / / logger.info ("codec=" + codec) If (codec! = null) {in = new NewLineReader (codec.createInputStream (fileIn), job); end = Long.MAX_VALUE;} else {if (start! = 0) {skipFirstLine = true This.start-= separator.length;//-- start; fileIn.seek (start);} in = new NewLineReader (fileIn, job) } if (skipFirstLine) {/ / skip first line and re-establish "start". Start + = in.readLine (new Text (), 0, (int) Math.min ((long) Integer.MAX_VALUE, end-start);} this.pos = start / * if (skipFirstLine) {int newSize = in.readLine (new Text (), 0, (int) Math.min ((long) Integer.MAX_VALUE, end-start)); if (newSize > 0) {start + = newSize } * /} public boolean nextKeyValue () throws IOException {if (key = = null) {key = new LongWritable ();} key.set (pos) If (value = = null) {value = new Text ();} int newSize = 0; while (pos)
< end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } if (newSize == 0) { //读取下一个buffer key = null; value = null; return false; } else { //读同一个buffer的下一个记录 return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } public synchronized void close() throws IOException { if (in != null) { in.close(); } } public class NewLineReader { private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private InputStream in; private byte[] buffer; private int bufferLength = 0; private int bufferPosn = 0; public NewLineReader(InputStream in) { this(in, DEFAULT_BUFFER_SIZE); } public NewLineReader(InputStream in, int bufferSize) { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; } public NewLineReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); } public void close() throws IOException { in.close(); } public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); Text record = new Text(); int txtLength = 0; long bytesConsumed = 0L; boolean newline = false; int sepPosn = 0; do { // 已经读到buffer的末尾了,读下一个buffer if (this.bufferPosn >= this.bufferLength) {bufferPosn = 0; bufferLength = in.read (buffer) / / if you have reached the end of the file, jump out and read the next file if (bufferLength 0 & & buffer [bufferPosn]! = separator [sepPosn]) {sepPosn = 0 } / / the first character encountered with the line delimiter if (buffer [bufferPosn] = = separator [sepPosn]) {bufferPosn++ Int I = 0; / / determine whether the next character is also the character for (+ + sepPosn; sepPosn) in the line delimiter
< separator.length; i++, sepPosn++) { // buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半 if (bufferPosn + i >= bufferLength) {bufferPosn + = I-1; break } / / once one of the characters is different It is determined that it is not the delimiter if (this.buffer [this.bufferPosn + I]! = separator [sepPosn]) {sepPosn = 0 Break }} / / did encounter the line delimiter if (sepPosn = = separator. Length) {bufferPosn + = I Newline = true; sepPosn = 0; break } int readLength = this.bufferPosn-startPosn; bytesConsumed + = readLength / / the line delimiter is not placed in the block if (readLength > maxLineLength-txtLength) {readLength = maxLineLength-txtLength } if (readLength > 0) {record.append (this.buffer, startPosn, readLength); txtLength + = readLength / / remove the record delimiter if (newline) {str.set (record.getBytes (), 0, record.getLength ()-separator.length) } while (! newline & & (bytesConsumed)
< maxBytesToConsume)); if (bytesConsumed >(long) Integer.MAX_VALUE) {throw new IOException ("Too many bytes before newline:" + bytesConsumed);} return (int) bytesConsumed } public int readLine (Text str, int maxLineLength) throws IOException {return readLine (str, maxLineLength, Integer.MAX_VALUE);} public int readLine (Text str) throws IOException {return readLine (str, Integer.MAX_VALUE, Integer.MAX_VALUE);}
Private byte [] separator = "] @\ n" .getBytes (); Thank you for reading this article carefully. I hope the article "how to customize InputFormat in hadoop" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.