In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
Editor to share with you how to customize the hadoop MapReduce InputFormat syncopation input file, I hope you will gain something after reading this article, let's discuss it together!
We have implemented secondary sorting by cookieId and time, and now there is a new question: what if I need to analyze by the combination of cookieId and cookieId&time? The best way to do this is to customize the InputFormat and let mapreduce read all the records under one cookieId at a time, and then split the session by time. The logic pseudo code is as follows:
For OneSplit in MyInputFormat.getSplit () / / OneSplit are all records under a cookieId
For session in OneSplit / / session is the second segmentation of OneSplit according to time
For line in session / / line is each record in session, corresponding to a record in the original log
1. Principle:
InputFormat is a very common concept in MapReduce. What role does it play in the running of the program?
InputFormat is actually an interface that contains two methods:
Public interface InputFormat {
InputSplit [] getSplits (JobConf job, int numSplits) throws IOException
RecordReader createRecordReader (InputSplit split
TaskAttemptContext context) throws IOException
}
These two methods accomplish the following tasks respectively:
Method getSplits divides the input data into the number of splits,splits, that is, the number of map tasks, and the size of splits defaults to block size, that is, 64m.
The method getRecordReader parses each split into records, and then parses record into pairs in turn
That is, InputFormat does the following:
InputFile-> splits-- >
What are the InputFormat commonly used in the system?
Among them, Text InputFormat is the most commonly used.
However, the fixed ways provided by the system to convert InputFile to sometimes do not meet our needs:
At this point, we need to customize the InputFormat so that the Hadoop framework sets the
InputFile parses to
Before you understand custom InputFormat, you need to understand several abstract classes, interfaces, and their relationships:
InputFormat (interface), FileInputFormat (abstract class), TextInputFormat (class)
The relationship between RecordReader (interface) and Line RecordReader (class)
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.get RecordReader calls Line RecordReader
Line RecordReader implements RecordReader
For the InputFormat interface, there is already a detailed description
Take a look at FileInputFormat, which implements the getSplits method in the InputFormat interface, while leaving getRecordReader and isSplitable to a specific class (such as TextInputFormat). The isSplitable method usually does not need to be modified, so it only needs to be implemented in a custom InputFormat.
The getRecordReader method is fine, and the core of this method is to call Line RecordReader (that is, the LineRecorderReader class implements "parsing each s plit into records, and then parsing the record into pairs"), which implements the interface RecordReader
Public interface RecordReader {
Boolean next (K key, V value) throws IOException
K createKey ()
V createValue ()
Long getPos () throws IOException
Public void close () throws IOException
Float getProgress () throws IOException
}
Therefore, the core of custom InputFormat is to customize a class that implements interface RecordReader similar to LineRecordReader, and the core of this class is to rewrite several methods in interface RecordReader.
The core of defining an InputFormat is to define your own RecordReader similar to LineRecordReader.
2. Code:
Package MyInputFormat;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat Public class TrackInputFormat extends FileInputFormat {@ SuppressWarnings ("deprecation") @ Override public RecordReader createRecordReader (InputSplit split, TaskAttemptContext context) {return new TrackRecordReader ();} @ Override protected boolean isSplitable (JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory (context.getConfiguration ()) .getCodec (file) Return codec = = null;}}
Package MyInputFormat;import java.io.IOException;import java.io.InputStream;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory Import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;/** * Treats keys as offset in file and value as line. * * @ deprecated Use * {@ link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. * / public class TrackRecordReader extends RecordReader {private static final Log LOG = LogFactory.getLog (TrackRecordReader.class); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null / /-- / / line delimiter, that is, the delimiter of a record private byte [] separator = "END\ n" .getBytes () /-public void initialize (InputSplit genericSplit, TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration () This.maxLineLength = job.getInt ("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart (); end = start + split.getLength (); final Path file = split.getPath (); compressionCodecs = new CompressionCodecFactory (job) Final CompressionCodec codec = compressionCodecs.getCodec (file); FileSystem fs = file.getFileSystem (job); FSDataInputStream fileIn = fs.open (split.getPath ()); boolean skipFirstLine = false; if (codec! = null) {in = new NewLineReader (codec.createInputStream (fileIn), job) End = Long.MAX_VALUE;} else {if (start! = 0) {skipFirstLine = true; this.start-= separator.length;//-- start FileIn.seek (start);} in = new NewLineReader (fileIn, job);} if (skipFirstLine) {/ / skip first line and re-establish "start". Start + = in.readLine (new Text (), 0, (int) Math.min ((long) Integer.MAX_VALUE, end-start));} this.pos = start;} public boolean nextKeyValue () throws IOException {if (key = = null) {key = new LongWritable () } key.set (pos); if (value = = null) {value = new Text ();} int newSize = 0; while (pos)
< end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } LOG.info("Skipped line of size ">Package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat Public class TestMyInputFormat {public static class MapperClass extends Mapper {public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println ("key:\ t" + key); System.out.println ("value:\ t" + value) System.out.println ("-");}} public static void main (String [] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration (); Path outPath = new Path ("/ hive/11") FileSystem.get (conf) .delete (outPath, true); Job job = new Job (conf, "TestMyInputFormat"); job.setInputFormatClass (TrackInputFormat.class); job.setJarByClass (TestMyInputFormat.class); job.setMapperClass (TestMyInputFormat.MapperClass.class); job.setNumReduceTasks (0); job.setMapOutputKeyClass (Text.class) Job.setMapOutputValueClass (Text.class); FileInputFormat.addInputPath (job, new Path (args [0])); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath (job, outPath); System.exit (job.waitForCompletion (true)? 0: 1);}}
3. Test data:
CookieId time url cookieOverFlag
1 a 1_hao1231 a 1_baidu1 b 1_google 2END2 c 2_google2 c 2_hao1232 c 2_google 1END3 a 3_baidu3 a 3_sougou3 b 3_soso 2END
4. Results:
Key: 0value: 1 a 1_hao123 1 a 1_baidu 1 b 1_google 2-key: 47value: 2 c 2_google 2 c 2_hao123 2 c 2_google 1- -key: 96value: 3 a 3_baidu 3 a 3_sougou 3 b 3_soso 2-finished reading this article I believe you have a certain understanding of "how to customize the hadoop MapReduce InputFormat segmentation input file". If you want to know more about it, you are welcome to follow the industry information channel. Thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.