In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "Mapper output buffer MapOutputBuffer how to understand", the article explains the content is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in depth, together to study and learn "Mapper output buffer MapOutputBuffer how to understand" it!
Output buffer MapOutputBuffer of Mapper
Now that we know the input of Map, then we look at the output of map, and the focus here is on the meaning of the statement context.write.
Find out that Mapper is passed as a parameter to map's context. Here we see that Mapper's run is passed as a parameter when it is called. The one who calls Mapper.run is MapTask. RunNewMapper . Let's take a closer look at runNewMapper here. Let's look at MapTask's run method: let's focus on runNewMapper
Public void run (final JobConf job, final TaskUmbilicalProtocol umbilical)
Throws IOException, ClassNotFoundException, InterruptedException {this.umbilical = umbilical;if (isMapTask ()) {/ / If there are no reducers then there won't be any sort. Hence the map / / phase will govern the entire attempt's progress. If (conf.getNumReduceTasks () = 0) {mapPhase = getProgress (). AddPhase ("map", 1.0f);} else {/ / If there are reducers then the entire attempt's progress will be / / split between the map phase (67%) and the sort phase (33%). MapPhase = getProgress (). AddPhase ("map", 0.667f); sortPhase = getProgress (). AddPhase ("sort", 0.333f);}} TaskReporter reporter = startReporter (umbilical); for partners who get Chinese video files and complete videos, please add QQ group: 947967114boolean useNewApi = job.getUseNewMapper (); initialize (job, getJobID (), reporter, useNewApi); / / check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask (umbilical, reporter); return;} if (jobSetup) {runJobSetupTask (umbilical, reporter) Return;} if (taskCleanup) {runTaskCleanupTask (umbilical, reporter); return;} if (useNewApi) {runNewMapper (job, splitMetaInfo, umbilical, reporter);} else {runOldMapper (job, splitMetaInfo, umbilical, reporter);} done (umbilical, reporter)
}
When we click runNewMapper, we can get into the real implementation:
Private
Void runNewMapper (final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException
InterruptedException {
/ / make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl (job, getTaskID (), reporter); / / make a mapper org.apache.hadoop.mapreduce.Mapper mapper = (org.apache.hadoop.mapreduce.Mapper)
ReflectionUtils.newInstance (taskContext.getMapperClass (), job)
/ / determine which specific Mapper to use, and then create it. For partners who get Chinese files and complete videos, please add QQ group: 947967114
Org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat)
ReflectionUtils.newInstance (taskContext.getInputFormatClass (), job)
/ / determine the input file format
/ / rebuild the input splitorg.apache.hadoop.mapreduce.InputSplit split = null
Split = getSplitDetails (new Path (splitIndex.getSplitLocation ()), splitIndex.getStartOffset ()); / / determine which split is the input used by this Mapper
LOG.info ("Processing split:" + split); org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader (split, inputFormat, reporter, taskContext)
/ / create a RecordReader commensurate with InputFormat
Job.setBoolean (JobContext.SKIP_RECORDS, isSkipping ()); org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output object
If (job.getNumReduceTasks () = = 0) {
/ / if the number of reduce set is 0, it will be output directly.
Output = new NewDirectOutputCollector (taskContext, job, umbilical, reporter);} else {output = new NewOutputCollector (taskContext, job, umbilical, reporter)
}
Next, let's take a look at the NewOutputCollector source code to obtain Chinese video documents and complete videos. Please add QQ group: 947967114.
NewOutputCollector (org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {collector = createSortingCollector (job, reporter)
/ / create a collecter that leads to the sorting phase
Partitions = jobContext.getNumReduceTasks ()
/ / obtain the number of Reduce by obtaining the number of partitions. The two quantities correspond to each other.
If (partitions > 1) {
/ / the number of partitions obtained is greater than 1
Partitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils.newInstance (jobContext.getPartitionerClass (), job)
/ / ReflectionUtils.newInstance creates the Partitioner set by the user, and the parameter jobContext.getPartitionerClass () in it is an extension of the abstract class, indicating that you can write a Partitioner class, which is obtained by this method. If you don't write it yourself, you use the default HashPartitioner.
} else {partitioner = new org.apache.hadoop.mapreduce.Partitioner () {@ Override public int getPartition (K key, V value, int numPartitions) {return partitions-1;} / / only one partition dynamically extends the abstract class Partitioner class};}
}
Back to the runNewMapper source code:
Org.apache.hadoop.mapreduce.MapContextmapContext = new MapContextImpl (job, getTaskID (), input, output, committer, reporter, split)
/ / create a Context for Mapper.
Org.apache.hadoop.mapreduce.Mapper.Context
MapperContext = new WrappedMapper () .getMapContext (mapContext)
/ / obtain the mapContext created above through getMapContext and finally pass it to mapperContext. Let's continue to look at the getMapContext source code.
Public Mapper.Context
GetMapContext (MapContext mapContext) {
Return new Context (mapContext)
}
/ / the Context object is returned here, and the Context object is being viewed. For partners who get Chinese files and complete videos, please add QQ group: 947967114
Public Context (MapContext mapContext) {this.mapContext = mapContext
}
/ / We see that the value of mapContext has been obtained. So we know that WrappedMapper-- > Context-- > mapContext is a MapContextImpl.
Try {input.initialize (split, mapperContext)
/ / initialize input,input is a recordReader object, with split and mapperContext as parameters
Mapper.run (mapperContext)
/ / We know that this run method runs Mapper's run method, so take a look at this run
Public void run (Context context) throws IOException, InterruptedException {
Setup (context)
/ / obtain context
Try {while (context.nextKeyValue ()) {
/ / use nextKeyValue to control the operation
Map (context.getCurrentKey (), context.getCurrentValue (), context)
/ / the map method is run and the key-value pair provided by recordReader is given.
}} finally {cleanup (context);}
}
Back to the MapTask source code
MapPhase.complete ()
/ / Lock
SetPhase (TaskStatus.Phase.SORT)
/ / sort all task results
StatusUpdate (umbilical)
/ / update runNewMapper status.
Input.close ()
/ / close the input stream
Input = null; output.close (mapperContext)
/ / close the output stream
Output = null;} finally {closeQuietly (input); closeQuietly (output, mapperContext);}
}
For the input format and shards have been discussed in detail before, you need to pay attention to NewTrackingRecordReader. We know that with InputFormat, we need to create a corresponding RecordReader. But on RecordReader, you use NewTrackingRecordReader. The difference is that Tracking is a trace, a trace of RecordReader. He has a parameter reporter here, which is used to report the tracking results, while RecordReader does not have this function.
What is related to the output is collecter, which is the collector of the output data, and context.write is finally implemented on collector.collect through RecodWriter. RecordWriter and RecordReader are at the same level. RecodWriter is an abstract class defined by hadoop, and the concrete RecodWriter is an extension of this abstract class. The ones used for maptask are NewDrictDoutputCollecter and NewOutputCollecter.
These two classes are called OutputCollecter, and both are actually RecordWriter. Collecter is just a semantic description. From Mapper's point of view, it's Writer, it's output. From a framework or downstream point of view, it is Collect, which is a collection.
If the number of reducer is 0, the output without reducer,Mapper is the output of the whole MR. In this case, the NewDrictDoutputCollecter of RecordWriter is used to output directly. Instead, there is at least one Reducer, so you are using RecordWriter's NewOutputCollecter. This is the key content we pay attention to. Let's look at the NewOutputCollecter source code. Several things are defined:
Collector = createSortingCollector (job, reporter)
/ / implement MapOutputCollector
Partitions = jobContext.getNumReduceTasks ()
/ / Partition responsible for Mapper output
Partitioner = (org.apache.hadoop.mapreduce.Partitioner)
/ / the number of distribution targets, that is, the number of Reducer.
@ Overridepublic void write (K key, V value) throws IOException, InterruptedException {collector.collect (key, value, partitioner.getPartition (key, value, partitions))
}
/ / write only writes but does not read.
Overridepublic void close (TaskAttemptContext context) throws IOException,InterruptedException {try {collector.flush ();} catch (ClassNotFoundException cnf) {throw new IOException ("can't find class", cnf);} collector.close ();}
}
NewOutputCollector is divided into two parts, one is collecter and the other is partitioner. Collecter is responsible for actually collecting Mapper output and delivering it to Reducer, and partitioner is responsible for deciding which Reducer to hand over the specific output to.
There are multiple Reducer, and the MR framework needs to put each output of each Mapper, that is, all the KV pairs collected. Output to a different Reducer under certain conditions (that is, how Partioner is implemented, the default is HashPartitioner). In this way, the output of Mapper is divided into multiple Partition, and several Reducer also divides each Mapper into several Partition,Partitioner. The hash way.
So in the constructor that creates the NewOutputCollector, you need to create the specific collector and partitioner.
MapOutputCollector is defined in the source code of hadoop. Whenever you implement this class, in addition to the init and close methods, you must also provide two functions, collect and flush. You know from NewOutputCollector that the caller of these two functions is collector, and the way to create collector is done through createSortingCollector. And the sorting of KV pairs is realized. The dependencies are as follows:
YarnChild.main- > PrivilegeExceptionAction.run- > Maptask.run-- > RunNewMapper- > NewOutputCollector- > MapTask.createSortingCollector
So let's take a look at the createSortingCollector source code. For partners who get Chinese files and complete videos, please add QQ group: 947967114
Private MapOutputCollector
CreateSortingCollector (JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException {MapOutputCollector.Context context = new MapOutputCollector.Context (this, job, reporter); Class [] collectorClasses = job.getClasses (JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class)
/ / if no settings are added, MapOutputBuffer.class is used by default.
Int remainingCollectors = collectorClasses.length
For (Class clazz: collectorClasses) {
/ / collectorClasses set one by one in the experiment
Try {if (! MapOutputCollector.class.isAssignableFrom (clazz)) {throw new IOException ("Invalid output collector class:" + clazz.getName () + "(does not implement MapOutputCollector)")
/ / it tells us that we must implement MapOutputCollector.class.
} Class
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.