In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Because of the existence of sort in Mapreduce, MapTask and ReduceTask are directly the architecture of workflow. Not the architecture of the data flow. Before the MapTask is finished, and before its output is sorted and merged, the ReduceTask has data input again, so even if the ReduceTask has been created, it can only sleep and wait for the MapTask to complete. Thus, the data can be obtained from the MapTask node. The final data output of a MapTask is a merged spill file that can be accessed through the Web address. So reduceTask usually starts when MapTask is almost finished. It is a waste of container resources to start early.
ReduceTask is a thread that runs on YarnChild's Java virtual machine. Let's start with ReduceTask.run and look at the Reduce phase. For more video information of big data, please add QQ group: 947967114
Public void run (JobConf job, final TaskUmbilicalProtocol umbilical)
Throws IOException, InterruptedException, ClassNotFoundException {
Job.setBoolean (JobContext.SKIP_RECORDS, isSkipping ())
If (isMapOrReduce ()) {
Several stages of the / add reduce process. In order to inform TaskTracker of the current status of the operator /
CopyPhase = getProgress () .addPhase ("copy")
SortPhase = getProgress () .addPhase ("sort")
ReducePhase = getProgress () .addPhase ("reduce")
}
/ / start thread that will handle communication with parent
TaskReporter reporter = startReporter (umbilical)
/ / set up and start the reporter process to communicate with TaskTracker
Boolean useNewApi = job.getUseNewReducer ()
/ / when initializing job in job client, the default is to use the new API. For more information, please see Job.setUseNewAPI () method.
Initialize (job, getJobID (), reporter, useNewApi)
/ used to initialize the task, mainly to make some settings related to the task output, such as creating commiter, setting working directory, etc. /
/ / check if it is a cleanupJobTask
/ the following four if statements all operate according to the type of task. These methods are methods of class Task, so it has nothing to do with whether the task is MapTask or ReduceTask.
If (jobCleanup) {
RunJobCleanupTask (umbilical, reporter)
Return;// is just for JobCleanup. Stop as soon as you finish it.
}
If () {
RunJobSetupTask (umbilical, reporter)
Return
/ / mainly to create FileSystem objects in the working directory
}
If (taskCleanup) {
RunTaskCleanupTask (umbilical, reporter)
Return
/ / set the current stage of the task as the end phase, and delete the working directory
}
Here is what you really want to be reducer.
/ / Initialize the codec
Codec = initCodec ()
RawKeyValueIterator rIter = null
ShuffleConsumerPlugin shuffleConsumerPlugin = null
Class combinerClass = conf.getCombinerClass ()
CombineOutputCollector combineCollector =
(null! = combinerClass)?
New CombineOutputCollector (reduceCombineOutputCounter, reporter, conf): null
/ / create a combineCollector if necessary
Classextends ShuffleConsumerPlugin > clazz =
Job.getClass (MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class)
/ / find mapreduce.job.reduce.shuffle.consumer.plugin.class for configuration file. Default is shuffle.class.
ShuffleConsumerPlugin = ReflectionUtils.newInstance (clazz, job)
/ / create a shuffle class object
LOG.info ("Using ShuffleConsumerPlugin:" + shuffleConsumerPlugin)
ShuffleConsumerPlugin.Context shuffleContext =
New ShuffleConsumerPlugin.Context (getTaskID (), job, FileSystem.getLocal (job), umbilical
Super.lDirAlloc, reporter, codec
CombinerClass, combineCollector
SpilledRecordsCounter, reduceCombineInputCounter
ShuffledMapsCounter
ReduceShuffleBytes, failedShuffleCounter
MergedMapOutputsCounter
TaskStatus, copyPhase, sortPhase, this
MapOutputFile, localMapFiles)
/ / create context object, ShuffleConsumerPlugin.Context
ShuffleConsumerPlugin.init (shuffleContext)
/ / the call starts with the init function of shuffle. The key points are summarized below.
This.localMapFiles = context.getLocalMapFiles ()
Scheduler = new ShuffleSchedulerImpl (jobConf, taskStatus, reduceId
This, copyPhase, context.getShuffledMapsCounter ()
Context.getReduceShuffleBytes (), context.getFailedShuffleCounter ()
/ / create a scheduler required for shuffle
Merger = createMergeManager (context)
/ / create the source code in the merge,createMergeManager inside the shuffle:
Return new MergeManagerImpl (reduceId, jobConf, context.getLocalFS ()
Context.getLocalDirAllocator (), reporter, context.getCodec ()
Context.getCombinerClass (), context.getCombineCollector ()
Context.getSpilledRecordsCounter ()
Context.getReduceCombineInputCounter ()
Context.getMergedMapOutputsCounter (), this, context.getMergePhase ()
Context.getMapOutputFile ()
/ / create MergeMnagerImpl objects and merge threads
RIter = shuffleConsumerPlugin.run ()
/ / copy the output files from each Mapper, merge them and sort them, and wait until they are finished.
/ / free up the data structures
MapOutputFilesOnDisk.clear ()
SortPhase.complete ()
/ / sorting phase completed
SetPhase (TaskStatus.Phase.REDUCE)
/ / enter the reduce phase
StatusUpdate (umbilical)
Class keyClass = job.getMapOutputKeyClass ()
Class valueClass = job.getMapOutputValueClass ()
RawComparator comparator = job.getOutputValueGroupingComparator ()
/ / the last phase of the 3.Reduce 1.Reduce task. It will have Map's keyClass ("mapred.output.key.class", "mapred.mapoutput.key.class"), valueClass ("mapred.mapoutput.value.class" or "mapred.output.value.class") and Comparator ("mapred.output.value.groupfn.class" or "mapred.output.key.comparator.class") ready.
If (useNewApi) {
/ / 2. Determine whether to execute runNewReduce or runOldReduce according to the parameter useNewAPI. Analytical moisturizing runNewReduce
RunNewReducer (job, umbilical, reporter, rIter, comparator
KeyClass, valueClass)
/ / 0. Like the reporting process to write some information, 1. Get a TaskAttemptContext object. Create reduce, output and RecordWrit for statistical output for tracking through this object, and finally create Context,2.reducer.run (reducerContext) for collecting reduce results to execute reduce
} else {/ / Old APIs
RunOldReducer (job, umbilical, reporter, rIter, comparator
KeyClass, valueClass)
}
ShuffleConsumerPlugin.close ()
Done (umbilical, reporter)
}
(1) reduce is divided into three stages (copy is to remotely copy the output data of Map, sort is to sort all the data, and reduce to aggregate is our own reducer). Set Progress for these three stages to communicate and report status with TaskTracker.
(2) the corresponding part of the running source code level analysis of the 15-40 lines of the above code and the MapTask task of MapReduce is basically the same, which can be referenced.
(3) the sentence codec = initCodec () checks whether the output of map is compressed. Compressed codec instance is returned if compressed, otherwise null is returned. Uncompressed is discussed here.
(4) We discuss the fully distributed hadoop, that is, isLocal==false, then construct a ReduceCopier object reduceCopier, and call the reduceCopier.fetchOutputs () method to copy the output of each Mapper to the local
(5) then the copy phase is completed, and the next stage is set to the sort phase, and the status information is updated.
(6) KV iterator is selected according to isLocal. Fully distributed ones will use reduceCopier.createKVIterator (job, rfs, reporter) as KV iterator.
(7) the sort phase is completed, and the next stage is the reduce phase. Update the status information.
(8) then get some configuration information and choose different processing methods according to whether to use the new API. Here is the new API. Calling runNewReducer (job, umbilical, reporter, rIter, comparator, keyClass, valueClass) will execute the reducer.
(9) done (umbilical, reporter) this method is used to do some cleanup work to finish the task: update the counter updateCounters (); if the task needs to be submitted, set the Taks status to COMMIT_PENDING, and use TaskUmbilicalProtocol to report Task completion, wait for submission, and then call commit to submit the task; set the task end flag; end the Reporter communication thread; send the last statistical report (through the sendLastUpdate method) Report the end status with TaskUmbilicalProtocol (through the sendDone method).
Some people divide Reduce Task into five stages: first, shuffle stage: also known as Copy stage, which is to remotely copy a piece of data from each MapTask, and if the size exceeds a certain threshold, write it to disk, otherwise put it into memory; second, Merge phase: while copying data remotely, Reduce Task starts two background threads to merge files on memory and disk to prevent excessive memory use and too many disk files. Third, the sort phase: the input data of the reduce method written by the user is aggregated by key, and the data from copy needs to be sorted, which is sorted here by merging, because the results of Map Task are in order; fourth, the Reduce phase: each group of data is handed over to the user-written Reduce method in turn to deal with; fifth, the write phase: the result is written into HDFS.
The above five stages are divided into three stages: copy, sort, and reduce. When we run the MR program in eclipse, the percentage of reduce phases seen in the console is divided into three stages, each accounting for 33.3%.
ShuffleConsumerPlugin here is a class object that implements ShuffleConsumerPlugin. You can set it through the configuration file mapreduce.job.reduce.shuffle.consumer.plugin.class option, which is shuffle by default. We have analyzed the completion of the shuffleConsumerPlugin.run, usually shuffle.run, in the code, because only with this process can the synthesized spill file of the Mapper be transferred to the Reducer side through the HTTP protocol. Only with the data can we do runNewReducer or runOldReducer. It can be said that the shuffle object is the porter of MapTask. And the way of handling shuffle is not to transport Reducer once, but to move all the data from MapTask, and to merge and sort them before providing them to the corresponding Reducer.
Generally speaking, MapTask and ReduceTask have a many-to-many relationship if there are M Mapper and N Reducer. We know that N Reducer corresponds to N partition, so each Mapper is divided into N Partition, and each Reducer undertakes the operation of a Partition part. In this way, each Reducer takes its own part of the data from each different Mapper, so that each Reducer has M pieces of different Mapper data. Merging M pieces of data together is a final complete Partition, and it is necessary to sort it, which becomes the specific input data of Reducer. This process of data handling and reorganization is called the shuffle process. The process of shuffle is expensive and will take up a lot of network traffic, because it involves the transmission of a large amount of data, and there will also be delays in the shuffle process, because the calculation of M Mapper is fast and slow, but the shuffle needs all the Mapper to complete before it can start, and the Reduce must wait for the shuffle to complete before it can start. Of course, this delay is not caused by shuffle, if the Reducer does not need all the Partition data in place and sorted, you do not have to synchronize with the slowest Mapper. This is the price of sorting.
So shuffle plays a very important role in the MapReduce framework. Let's first take a look at the summary of shuffle: for more video materials of big data, please add QQ group: 947967114
Public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
Private ShuffleConsumerPlugin.Context context
Private TaskAttemptID reduceId
Private JobConf jobConf
Private TaskUmbilicalProtocol umbilical
Private ShuffleSchedulerImpl scheduler
Private MergeManager merger
Private Task reduceTask; / / Used for status updates
Private Map localMapFiles
Public void init (ShuffleConsumerPlugin.Context context)
Public RawKeyValueIterator run () throws IOException, InterruptedException
You can see that shuffle.init is called in ReduceTask.run, and ShuffleSchedulerImpl and MergeManagerImpl objects are created in Runli. Later, I will explain what it is for.
Then comes the call to shuffle.run. Shuffle has a run, but it's not a thread, it just uses that name.
Let's see: ReduceTask.run- > Shuffle.run
Public RawKeyValueIterator run () throws IOException, InterruptedException {
Int eventsPerReducer = Math.max (MIN_EVENTS_TO_FETCH
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks ()
Int maxEventsToFetch = Math.min (MAX_EVENTS_TO_FETCH, eventsPerReducer)
/ / Start the map-completion events fetcher thread
Final EventFetcher eventFetcher =
New EventFetcher (reduceId, umbilical, scheduler, this
MaxEventsToFetch)
EventFetcher.start ()
/ / by looking at EventFetcher, we see that he inherits Thread, so he is a thread.
/ / Start the map-output fetcher threads
Boolean isLocal = localMapFiles! = null
Final int numFetchers = isLocal? 1:
JobConf.getInt (MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5)
Fetcher [] fetchers = new Fetcher [numFetchers]
/ / A thread pool has been created
If (isLocal) {
/ / if Mapper and Reducer are on the same machine, it is on the local fetche
Fetchers [0] = new LocalFetcher (jobConf, reduceId, scheduler
Merger, reporter, metrics, this, reduceTask.getShuffleSecret ()
LocalMapFiles)
/ / LocalFetcher is an extension of Fetcher and is also a thread.
Fetchers [0] .start (); / / there is only one local Fecher
} else {
/ / Mapper collection Reducer is not on the same machine and needs to be Fecher across multiple nodes
For (int item0; I
< numFetchers; ++i) { //启动所有的Fecher fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); //创建Fecher线程 fetchers[i].start(); //跨节点的Fecher需要好多个,都需要开启 } } // Wait for shuffle to complete successfully while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { reporter.progress(); //等待所有的Fecher都完成,如果有超时情况就报告进度 synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } } // Stop the event-fetcher thread eventFetcher.shutDown(); //关闭eventFetcher,代表shuffle操作完成,所有的MapTask的数据都拷贝过来了 // Stop the map-output fetcher threads for (Fetcher fetcher : fetchers) { fetcher.shutDown();//关闭所有的fetcher。 } // stop the scheduler scheduler.close(); //也不需要shuffle的调度,所以关闭 copyPhase.complete(); // copy is already complete //文件复制阶段结束 以下就是Reduce阶段的MergeSort了 taskStatus.setPhase(TaskStatus.Phase.SORT); //完成排序 reduceTask.statusUpdate(umbilical); //通过umbilical向MRAppMaster汇报,更新状态 // Finish the on-going merges... RawKeyValueIterator kvIter = null; try { kvIter = merger.close(); //合并和排序,完成后返回一个队列kvIter 。 } catch (Throwable e) { throw new ShuffleError("Error while doing final merge " , e); } // Sanity check synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } return kvIter; } 数据从MapTask转移到ReduceTask就两种方式,一MapTask送,二ReduceTask取,hadoop采用的是第二种方式,就是文件的复制。在Shuffle进入run之前,RduceTask.run调用过他的init函数shuffleConsumerPlugin.init(shuffleContext),在init里创建了scheduler和用于合并排序的merge,进入run后又创建了EventFetcher线程和若干个Fetcher线程。Fetcher的作用就是拿取,向MapTask节点提取数据。但是我们要清楚EventFetcher虽然也是Fetcher,但是提取的是event,不是数据本身。我们可以认为它只是对Fetcher过程的一个事件的控制。 Fetcher线程的数量也不一定,Uber模式下,MapTask和ReduceTask在同一个节点上,并且只有一个MapTask,所以只有一个Fetcher就能够完成,而且这个Fetcher是localFetcher。如果不是Uber模式可能会有很多MapTask并且一般和ReduceTask不在同一个节点。这时Fetcher的数量可以进行配置,默认有5个。数组fetchers就相当于Fetcher的线程池。 创建了EventFetcher和Fetcher线程池后,进入了while循环,但是while循环什么都不做,一直等待,所以实际的操作都是在线程完成的,也就是通过EventFetcher和若干的Fetcher完成。EventFetcher起到了非常关键的枢纽的作用。 我们查看EventFetcher的源代码摘要,我们提取关键的东西: class EventFetcher extends Thread { private final TaskAttemptID reduce; private final TaskUmbilicalProtocol umbilical; private final ShuffleScheduler scheduler; private final int maxEventsToFetch; public void run() { int failures = 0; LOG.info(reduce + " Thread started: " + getName()); try { while (!stopped && !Thread.currentThread().isInterrupted()) {//线程没有被打断 try { int numNewMaps = getMapCompletionEvents(); //获取Map的完成的事件,接着我们看getMapCompletionEvents源代码: protected int getMapCompletionEvents() throws IOException, InterruptedException { int numNewMaps = 0; TaskCompletionEvent events[] = null; do { MapTaskCompletionEventsUpdate update = umbilical.getMapCompletionEvents( (org.apache.hadoop.mapred.JobID)reduce.getJobID(), fromEventIdx, maxEventsToFetch, (org.apache.hadoop.mapred.TaskAttemptID)reduce); //汇报umbilical从MRAppMaster获取Map完成的时间的报告 events = update.getMapTaskCompletionEvents(); //获取有关具体的MapTask结束运行的情况 LOG.debug("Got " + events.length + " map completion events from " + fromEventIdx); assert !update.shouldReset() : "Unexpected legacy state"; //做了一个断言 获取更多大数据视频资料请加QQ群:947967114 // Update the last seen event ID fromEventIdx += events.length; // Process the TaskCompletionEvents: // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop // fetching from those maps. // 3. Remove TIPFAILED maps from neededOutputs since we don't need their // outputs at all. for (TaskCompletionEvent event : events) { //对于获取的每个事件的报告 scheduler.resolve(event); //这里使用了ShuffleSchedullerImpl.resolve函数,源代码如下: public void resolve(TaskCompletionEvent event) { switch (event.getTaskStatus()) { case SUCCEEDED://如果成功 URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//获取其URI addKnownMapOutput(u.getHost() + ":" + u.getPort(), u.toString(), event.getTaskAttemptId()); //记录这个MapTask的节点主机记录下来,供Fetcher使用,getBaseURI的源代码: static URI getBaseURI(TaskAttemptID reduceId, String url) { StringBuffer baseUrl = new StringBuffer(url); if (!url.endsWith("/")) { baseUrl.append("/"); } baseUrl.append("mapOutput?job="); baseUrl.append(reduceId.getJobID()); baseUrl.append("&reduce="); baseUrl.append(reduceId.getTaskID().getId()); baseUrl.append("&map="); URI u = URI.create(baseUrl.toString()); return u; 获取各种信息,然后添加都URI对象中。 } 回到源代码 maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime()); //最大的尝试时间 break; case FAILED: case KILLED: case OBSOLETE://如果MapTask运行失败 obsoleteMapOutput(event.getTaskAttemptId());//获取TaskId LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() + "'");//写日志 break; case TIPFAILED://如果失败 tipFailed(event.getTaskAttemptId().getTaskID()); LOG.info("Ignoring output of failed map TIP: '" + event.getTaskAttemptId() + "'");//写日志 break; } } 回到源代码 if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功 ++numNewMaps;//增加map数量 } } } while (events.length == maxEventsToFetch); return numNewMaps; } 回到源代码 failures = 0; if (numNewMaps >0) {
LOG.info (reduce + ":" + "Got" + numNewMaps + "new map-outputs")
}
LOG.debug ("GetMapEventsThread about to sleep for" + SLEEP_TIME)
If (! Thread.currentThread (). IsInterrupted ()) {
Thread.sleep (SLEEP_TIME)
}
} catch (InterruptedException e) {
LOG.info ("EventFetcher is interrupted.. Returning")
Return
} catch (IOException ie) {
LOG.info ("Exception in getting events", ie)
/ / check to see whether to abort
If (+ + failures > = MAX_RETRIES) {
Throw new IOException ("too many failures downloading events", ie); / / the number of failures is greater than the number of retries
}
/ / sleep for a bit
If (! Thread.currentThread (). IsInterrupted ()) {
Thread.sleep (RETRY_PERIOD)
}
}
}
} catch (InterruptedException e) {
Return
} catch (Throwable t) {
ExceptionReporter.reportException (t)
Return
}
}
There is no direct relationship between MapTask and ReduceTask. MapTask does not know which nodes ReduceTask is on, it just reports the time of progress to MRAppMaster. ReduceTask performs the getMapCompletionEvents operation through the "umbilical cord" to MRAppMaster to obtain the time report of the end of the MapTask. There are individual MapTask may fail, but the vast majority will succeed, as long as the success of the Fetcher to obtain the output data, this information is completed through shcheduler, that is, ShuffleSchedulerImpl object, ShuffleSchedulerImpl object is not many, just an ordinary object.
Fetchers is like a thread pool with several threads (5 by default) that wait for notification from EventFetcher and fetch data as soon as MapTask is complete.
For more video information of big data, please add QQ group: 947967114
Let's look at the run method of the fetch thread class:
Public void run () {
Try {
While (! stopped &! Thread.currentThread () .isInterrupted ()) {
MapHost host = null
Try {
/ / If merge is on, block
Merger.waitForResource ()
/ / Get a host to shuffle from
Host = scheduler.getHost ()
/ / get a node of a successfully completed MapTask from scheduler.
Metrics.threadBusy ()
/ / Thread becomes busy
/ / Shuffle
CopyFromHost (host)
/ / start copying the data of this node
} finally {
If (host! = null) {/ / maphost and running
Scheduler.freeHost (host)
/ / the status is set to idle, waiting for it to complete.
Metrics.threadFree ()
}
}
}
} catch (InterruptedException ie) {
Return
} catch (Throwable t) {
ExceptionReporter.reportException (t)
}
}
The focus here is on the functions that copyFromHost uses to get the data.
Protected void copyFromHost (MapHost host) throws IOException {
/ / reset retryStartTime for a new host
/ / this is running on the node of ReduceTask
RetryStartTime = 0
/ / Get completed maps on 'host'
List maps = scheduler.getMapsForHost (host)
/ / get the MapTask collection on the target node.
/ / Sanity check to catch hosts with only 'OBSOLETE' maps
/ / especially at the tail of large jobs
If (maps.size () = = 0) {
Direct return of return;// that is not completed
}
If (LOG.isDebugEnabled ()) {
LOG.debug ("Fetcher" + id + "going to fetch from" + host + "for:"
Maps)
}
/ / List of maps to be fetched yet
Set remaining = new HashSet (maps)
/ / completed, waiting for the MapTask collection of shuffle.
/ / Construct the url and connect
DataInputStream input = null
URL url = getMapOutputURL (host, maps)
/ / generate the URL of the node where the MapTask resides. Here's the getMapOutputURL source code:
Private URL getMapOutputURL (MapHost host, Collection maps
) throws MalformedURLException {
/ / Get the base url
StringBuffer url = new StringBuffer (host.getBaseUrl ())
Boolean first = true
For (TaskAttemptID mapId: maps) {
If (! first) {
Url.append (,)
}
Url.append (mapId); / / add mapid after URL
First = false
}
LOG.debug ("MapOutput URL for" + host + "- >" + url.toString ())
/ / write a log
Return new URL (url.toString ())
/ / return URL
}
Go back to the main code:
Try {
SetupConnectionsWithRetry (host, remaining, url)
/ / establish a HTTP connection with the other host, and setupConnectionsWithRetry uses the openConnectionWithRetry function to open the link.
OpenConnectionWithRetry (host, remaining, url)
This source code has the use of openConnection (url); way, continue to check.
The following is the main process of linking:
Protected synchronized void openConnection (URL url)
Throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection ()
/ / HTTPURL is used to connect
If (sslShuffle) {/ / if there is a trust certificate
HttpsURLConnection httpsConn = (HttpsURLConnection) conn
/ / strongly convert conn type
Try {
HttpsConn.setSSLSocketFactory (sslFactory.createSSLSocketFactory ()); / / add a factory for certificate socket
} catch (GeneralSecurityException ex) {
Throw new IOException (ex)
}
HttpsConn.setHostnameVerifier (sslFactory.getHostnameVerifier ())
}
Connection = conn
}
Continue to write in setupConnectionsWithRetry:
SetupShuffleConnection (encHash)
/ / Shuffle link has been established
Connect (connection, connectionTimeout)
/ / verify that the thread wasn't stopped during calls to connect
If (stopped) {
Return
}
VerifyConnection (url, msgToEncode, encHash)
}
/ / so far the connection has passed.
If (stopped) {
AbortConnect (host, remaining)
/ / this is to close the connection. You can click in to have a look and meet the two conditions of list and waiting.
Return
}
} catch (IOException ie) {
Boolean connectExcpt = ie instanceof ConnectException
IoErrs.increment (1)
LOG.warn ("Failed to connect to" + host + "with" + remaining.size () +
"map outputs", ie)
Go back to the main code
Input = new DataInputStream (connection.getInputStream ())
/ / instance an input stream object.
Try {
/ / Loop through available map-outputs and fetch them
/ / On any error, faildTasks is not null and we exit
/ / after putting back the remaining maps to the
/ / yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID [] failedTasks = null
While (! remaining.isEmpty () & & failedTasks = = null) {
/ / if the list of required fetcher is not empty and the number of failed task is not
Try {
FailedTasks = copyMapOutput (host, input, remaining, fetchRetryEnabled)
/ / the source code of copyMapOutput for copying data is as follows:
Try {
ShuffleHeader header = new ShuffleHeader ()
Header.readFields (input)
MapId = TaskAttemptID.forName (header.mapId)
/ / obtain mapID
CompressedLength = header.compressedLength
DecompressedLength = header.uncompressedLength
ForReduce = header.forReduce
} catch (IllegalArgumentException e) {
BadIdErrs.increment (1)
LOG.warn ("Invalid map id", e)
/ / Don't know which one was bad, so consider all of them as bad
Return remaining.toArray (new TaskAttemptID [remaining.size ()])
}
InputStream is = input
Is = CryptoUtils.wrapIfNecessary (jobConf, is, compressedLength)
CompressedLength-= CryptoUtils.cryptoPadding (jobConf)
DecompressedLength-= CryptoUtils.cryptoPadding (jobConf)
/ / if you need to decompress or decrypt
/ / Do some basic sanity verification
If (! verifySanity (compressedLength, decompressedLength, forReduce)
Remaining, mapId)) {
Return new TaskAttemptID [] {mapId}
}
If (LOG.isDebugEnabled ()) {
LOG.debug ("header:" + mapId + ", len:" + compressedLength + "
", decomp len:" + decompressedLength)
}
Try {
MapOutput = merger.reserve (mapId, decompressedLength, id)
/ / Reserve a MapOutput for merge: memory or disk.
} catch (IOException ioe) {
/ / kill this reduce attempt
IoErrs.increment (1)
Scheduler.reportLocalError (ioe)
/ / report errors
Return EMPTY_ATTEMPT_ID_ARRAY
}
/ / Check if we can shuffle now...
If (mapOutput = = null) {
LOG.info ("fetcher#" + id + "- MergeManager returned status WAIT...")
/ / Not an error but wait to process data.
Return EMPTY_ATTEMPT_ID_ARRAY
}
/ / The codec for lz0,lz4,snappy,bz2,etc. Throw java.lang.InternalError
/ / on decompression failures. Catching and re-throwing as IOException
/ / to allow fetch failure logic to be processed
Try {
/ / Go!
LOG.info ("fetcher#" + id + "about to shuffle output of map"
MapOutput.getMapId () + "decomp:" + decompressedLength
"len:" + compressedLength + "to" + mapOutput.getDescription ()
MapOutput.shuffle (host, is, compressedLength, decompressedLength
Metrics, reporter)
/ / copy the file contents of Mapper to reduce memory or disk across nodes.
} catch (java.lang.InternalError e) {
LOG.warn ("Failed to shuffle for fetcher#" + id, e)
Throw new IOException (e)
}
/ / Inform the shuffle scheduler
Long endTime = Time.monotonicNow ()
/ / Reset retryStartTime as map task make progress if retried before.
RetryStartTime = 0
Scheduler.copySucceeded (mapId, host, compressedLength
StartTime, endTime, mapOutput)
/ / tells the scheduler that the file copy of the Map output of a node has been completed.
Remaining.remove (mapId)
/ / the output of this MapTask has been shuffle completed
Metrics.successFetch ()
We don't care about the exception failure message after return null;.
The mapOutput here is used to hold the storage space of the MapTask output file, which can be the Output of memory or DiskOutput depending on the content size and memory of the output file. If it is memory, you need to make a reservation because there is more than one Fetcher. Let's take InMemoryMapOutput as an example.
Code structure
Fetcher.run-- > copyFromHost-- > copyMapOutput-- > merger.reserve (MergeManagerImpl.reserve)-- > InmemoryMapOutput.shuffle
Public void shuffle (MapHost host, InputStream input
Long compressedLength, long decompressedLength
ShuffleClientMetrics metrics
Reporter reporter) throws IOException {
/ / copy spill files from Mapper across nodes
IFileInputStream checksumIn =
New IFileInputStream (input, compressedLength, conf)
/ / input stream of checksum
Input = checksumIn
/ / Are map-outputs compressed?
If (codec! = null) {
/ / if compression is involved
Decompressor.reset ()
/ / restart the decompressor
Input = codec.createInputStream (input, decompressor)
/ / add the input stream of the decompressor
}
Try {
IOUtils.readFully (input, memory, 0, memory.length)
/ / read specific Partition data into the memory buffer of Reducer from Mapper.
Metrics.inputBytes (memory.length)
Reporter.progress (); / / report progress
LOG.info ("Read" + memory.length + "bytes from map-output for" +
GetMapId ()
/ * *
We've gotten the amount of data we were expecting. Verify the
Decompressor has nothing more to offer. This action also forces the
Decompressor to read any trailing bytes that weren't critical
For decompression, which is necessary to keep the stream
In sync.
, /
If (input.read () > = 0) {
Throw new IOException ("Unexpected extra bytes from input stream for" +
GetMapId ()
}
} catch (IOException ioe) {
/ / Close the streams
IOUtils.cleanup (LOG, input)
/ / Re-throw
Throw ioe
} finally {
CodecPool.returnDecompressor (decompressor)
/ / release decompressor
}
}
Copy the data belonging to this partition in the spill file from the other party, go back to copyFromHost, inform scheduler through scheduler.copySuccessed, and delete the ID of this MapTask from the remaining collection, enter the next loop, and copy the next MapTask data. Until all the data belonging to this Partition is copied.
This is the process of Fetcher on the Reducer side, which sends a HTTP GET request to the Mapper side to download the file. There is a corresponding Server in MapTask. The source code of this network protocol is not studied in depth. I am interested in studying it myself after class. For more video information of big data, please add QQ group: 947967114
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.