Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Big data: Map termination and Spill file merging

2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

When there is no data input in Mapper, the while loop in mapper.run will call context.nextKeyValue and return false, so it will return to runNewMapper, where the program will close the input channel and output channel. Closing the output channel here does not close collector, so you must flush first.

For more video information of big data, please add QQ group: 947967114 code structure:

Maptask.runNewMapper- > NewOutputCollector.close- > MapOutputBuffer.flush

Let's see what flush has done for us and why flush.

Public void flush () throws IOException, ClassNotFoundException, InterruptedException {LOG.info ("Starting flush of map output"); spillLock.lock (); try {while (spillInProgress) {reporter.progress (); spillDone.await ()

/ / check the spillInProgress status here, wait for completion if there is a spill, and report the status.

} checkSpillException (); final int kvbend = 4 * kvend

/ / kvend is the end of the metadata block, and the metadata extends downward.

/ / kvend is an array subscript in integers, and kvbend is an array subscript in bytes

If ((kvbend + METASIZE)% kvbuffer.length! = equator-(equator% METASIZE)) {

/ / this condition indicates that there was data in the buffer, but now that spill is complete, space needs to be freed. For more video information of big data, please add QQ group: 947967114

/ / spill finished

/ / spill needs to adjust some parameters at a time to free up space, which is done through resetSpill

ResetSpill ()

Private void resetSpill () {

Final inte = equator; bufstart = bufend = e; final int aligned = e-(e% METASIZE); / / set start/end to point to first meta record / / Cast one of the operands to long to avoid integer overflow kvstart = kvend = (int) ((long) aligned-METASIZE + kvbuffer.length)% kvbuffer.length) / 4 LOG.info ("(RESET) equator" + e + "kv" + kvstart + "(" + (kvstart * 4) + ")" + "kvi" + kvindex + "(" + (kvindex * 4) + ""))

}

/ / this is actually adjusting the position of each parameter. Such as the original position, kvstart and so on.

} if (kvindex! = kvend) {

/ / to determine whether the buffer is empty. If not, it does not meet the spill condition (80%), but there is no data input after map processing.

Kvend = (kvindex + NMETA)% kvmeta.capacity (); bufend = bufmark; LOG.info ("Spilling map output"); LOG.info ("bufstart =" + bufstart + "; bufend =" + bufmark + "; bufvoid =" + bufvoid); LOG.info ("kvstart =" + kvstart + "(" + (kvstart * 4) + ") Kvend = "+ kvend +" ("+ (kvend * 4) +"); length = "+ (distanceTo (kvend, kvstart, kvmeta.capacity ()) + 1) +" / "+ maxRec); sortAndSpill ()

/ / call the sortAndSpill procedure once. For more video information of big data, please add QQ group: 947967114

}} catch (InterruptedException e) {throw new IOException ("Interrupted while waiting for the writer", e);} finally {spillLock.unlock ();}

/ / at this point, all data has been overwritten, the buffer is empty, and all data is spill to the file.

Assert! spillLock.isHeldByCurrentThread (); / / shut down spill thread and wait for it to exit. Since the preceding / ensures that it is finished with its work (and sortAndSpill did not / / throw), we elect to use an interrupt instead of setting a flag. / / Spilling simultaneously from this thread while the spill thread / / finishes its work might be both a useful way to extend this and also / / sufficient motivation for the latter approach. Try {spillThread.interrupt ()

/ / stop the spill thread from running

SpillThread.join ()

/ / end the spill thread

} catch (InterruptedException e) {throw new IOException ("Spill failed", e);} / / release sort buffer before the merge kvbuffer = null; mergeParts ()

/ / merge spill files

Path outputPath = mapOutputFile.getOutputFile (); fileOutputByteCounter.increment (rfs.getFileStatus (outputPath). GetLen ())

}

The purpose of flush, first of all, let all the KV of the buffer enter the spill file, because each spill will generate a spill file, all spill files may be more than one, so merge the spill file into a single file and distribute it to reduce.

So if there is a spill in progress and you have to wait for it to complete, there may be no spill but the buffer is not empty and you need to sortAndSpill again, in short, until the buffer is empty. All the data can be mergeParts after the spill is completed.

Code structure:

Maptask.runNewMapper--- > NewOutputCollector.close--- > MapOutputBuffer.flush--- > MapOutputBuffer.mergeParts

The source code is as follows:

Private void mergeParts () throws IOException, InterruptedException, ClassNotFoundException {/ / get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; final Path [] filename = new Path [numSpills]

/ / there is a file for each overflow, so the size of the array is numSpills. For more video information of big data, please add QQ group: 947967114

Final TaskAttemptID mapId = getTaskID (); for (int I = 0; I

< numSpills; i++) { //统计所有这些文件合并之后的大小 filename[i] = mapOutputFile.getSpillFile(i); //通过spill文件的编号获取到指定的spill文件路径 finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();//获取文件大小 } if (numSpills == 1) { //合并输出有俩文件一个是output/file.out,一个是output/file.out.index sameVolRename(filename[0], mapOutputFile.getOutputFileForWriteInVolume(filename[0])); //换个文件名,在原文件名上加个file.out if (indexCacheList.size() == 0) { //索引块缓存indexCacheList已空 sameVolRename(mapOutputFile.getSpillIndexFile(0), mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));//spillIndexFile改名。 } else { //索引块缓存indexCacheList中还有索引记录,要写到索引文件 indexCacheList.get(0).writeToFile( //写入文件 mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job); } sortPhase.complete(); return; //如果只有一个spill合并已经完成。 获取更多大数据视频资料请加QQ群:947967114 } // read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { //如果spill文件不止一个,需要合并 Path indexFileName = mapOutputFile.getSpillIndexFile(i); indexCacheList.add(new SpillRecord(indexFileName, job)); //先把所有的SpillIndexFile收集在一起。 } //make correction in the length to include the sequence file header //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; //每个partition都有header finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; //IndexFile,每个partition一个记录。 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); //创建合并,最终输出。 if (numSpills == 0) { //要是没有SipillFile生成,也创建一个空文件 //create dummy files IndexRecord rec = new IndexRecord(); //创建索引记录 SpillRecord sr = new SpillRecord(partitions); //创建spill记录 try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); //创建后马上关闭,形成空文件。 rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); //所以记录写入索引文件 } finally { finalOut.close(); } sortPhase.complete(); return; } { sortPhase.addPhases(partitions); // Divide sort phase into sub-phases IndexRecord rec = new IndexRecord(); final SpillRecord spillRec = new SpillRecord(partitions); for (int parts = 0; parts < partitions; parts++) { //finalOut最终输出文件。循环分区获得所有spill文件的该分区数据,合并写入finalOut //create the segments to be merged List segmentList = new ArrayList(numSpills); //创建Segment,数据段 for(int i = 0; i < numSpills; i++) { //准备合并所有的Spill文件 IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment s = new Segment(job, rfs, filename[i], indexRecord.startOffset, indexRecord.partLength, codec, true); segmentList.add(i, s); //把每个Spill文件中相同partition的区段位置收集起来。 获取更多大数据视频资料请加QQ群:947967114 if (LOG.isDebugEnabled()) { LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i + "(" + indexRecord.startOffset + "," + indexRecord.rawLength + ", " + indexRecord.partLength + ")"); } } int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100); //做merge操作时同时操作的stream数上限 boolean sortSegments = segmentList.size() >

MergeFactor / / sort segment @ SuppressWarnings ("unchecked") RawKeyValueIterator kvIter = Merger.merge (job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path (mapId.toString ()), job.getOutputKeyComparator (), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase () TaskType.MAP)

/ / merging the contents of the same partition in all spill files may also require sort, and the merged structure is a sequence.

/ / write merged output to disk long segmentStart = finalOut.getPos (); FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary (job, finalOut); Writer writer = new Writer (job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter) If (combinerRunner = = null | | numSpills < minSpillsForCombine) {/ / minSpillsForCombine is initialized in the MapOutputBuffer constructor, and numSpills is the number of mapTask files that have been overwritten to disk Merger.writeFile (kvIter, writer, reporter, job)

/ / write the merged results directly to the file. Let's take a look at the source code of writeFile

Public static

Void writeFile (RawKeyValueIterator records, Writer writer

Progressable progressable, Configuration conf)

Throws IOException {

Long progressBar = conf.getLong (JobContext.RECORDS_BEFORE_PROGRESS, 10000); long recordCtr = 0scape while (records.next ()) {writer.append (records.getKey (), records.getValue ())

/ / output to writer by append

If ((recordCtr++)% progressBar) = = 0) {progressable.progress ();}

}

Go back to the main code:

} else {

/ / there is combiner

CombineCollector.setWriter (writer)

/ / just insert the combiner link

CombinerRunner.combine (kvIter, combineCollector)

/ / write the merged result to the file after combiner

} / / close writer.close (); / / close writer channel sortPhase.startNextPhase (); / / record offsets rec.startOffset = segmentStart

/ / start from the beginning of the current segment

Rec.rawLength = writer.getRawLength () + CryptoUtils.cryptoPadding (job); rec.partLength = writer.getCompressedLength () + CryptoUtils.cryptoPadding (job); spillRec.putIndex (rec, parts);} spillRec.writeToFile (finalIndexFile, job)

/ / write spillFile to the merged indexFle

FinalOut.close ()

/ / close the final output stream

For (int I = 0; I < numSpills; iTunes +) {rfs.delete (filename [I], true)

/ / Delete all spill files

}}

}

This method merges all temporary files into one large file and saves it to output/file.out, while generating the corresponding index file output/file.out.index. During file merge, Map Task merges on a partition-by-partition basis. For a partition, it will use multiple rounds of recursive merging: each round of merging io.sort.factor, the default is 100,100 files, and the resulting text will be re-added to the list to be merged, after sorting the files, repeat the above process until there is only one file. Producing only one file can avoid the overhead of random reading caused by opening a large number of files and reading a large number of small files at the same time. Finally, all spill files are deleted.

In addition, it should be noted that mergeParts () also has the operation of combiner, but certain conditions need to be met: 1, the number of combiner;2 and spill files set by the user exceeds the value of minSpillsForCombine, and the corresponding configuration item "min.num.spills.for.combine" can be set by itself. The default is 3. Both of these must be available at the same time to start the local aggregation operation of combiner here. So it is possible that the combiner will be executed twice in the Map phase, so it is possible that the output data will not meet expectations after your combiner has been executed twice.

This way, the task of the Map phase is complete. Mainly read the data and then write to the memory buffer, the cache meets the conditions will be quickly lined up and set partition, spill to the local file and index file; if there is a combiner,spill will also do an aggregation operation, wait for the data to run after merging all spill files and index files, if there is combiner, before the merge will do a comprehensive aggregation operation after meeting the conditions. The results of the map phase are stored locally (if there is a reducer), not HDFS.

After Mapper finishes processing all the input files and writes the buffer data to the spill file, there are only three possibilities for the existence of the spill file: no spill, one spill, and multiple spill. For all three, a final output file is required, regardless of the content or not. This final file is the same as a single spill file, divided into segments according to partition, followed by sorted KV data, and this merge operation is combined with the previous spill file for sort. It constitutes a mergeSort, this mergeSort is only for multiple spill files of the same Mapper, and later there will be Merge for different Mapper files in Reducer.

When the Maptask is complete, it returns from the runNewMapper, and the next operation is done. That is, the finishing touches of MapTask. The end of MapTask involves how to output the generated data to ReduceTask. Both MapTask and ReduceTask extend from Task. But none of them defined the done function themselves, so they all called Task's done.

The program jumps out of runNewMapper here to get more big data video materials, please add QQ group: 947967114

If (useNewApi) {runNewMapper (job, splitMetaInfo, umbilical, reporter);} else {runOldMapper (job, splitMetaInfo, umbilical, reporter);} done (umbilical, reporter)

When we click on this done, we find that it is Task.done. The source code is as follows

Public void done (TaskUmbilicalProtocol umbilical

TaskReporter reporter) throws IOException, InterruptedException {LOG.info ("Task:" + taskId + "is done." + "And is in the process of committing"); updateCounters ()

/ / Update the container

Boolean commitRequired = isCommitRequired (); if (commitRequired) {int retries = MAX_RETRIES; setState (TaskStatus.State.COMMIT_PENDING); / / say the task tracker that task is commit pending while (true) {try {umbilical.commitPending (taskId, taskStatus); break

/ / if there is no exception in commitPending, quit, otherwise try again.

} catch (InterruptedException ie) {/ / ignore} catch (IOException ie) {LOG.warn ("Failure sending commit pending:" + StringUtils.stringifyException (ie)); if (--retries = = 0) {System.exit (67);}} / / wait for commit approval and commit commit (umbilical, reporter, committer);} taskDone.set (true); reporter.stopCommunicationThread () / / Make sure we send at least one set of counter increments. It's// ok to call updateCounters () in this thread after comm thread stopped.updateCounters (); sendLastUpdate (umbilical); / / signal the tasktracker that we are done

SendDone (umbilical)

The source code to implement sendDone:

Private void sendDone (TaskUmbilicalProtocol umbilical) throws IOException {

Int retries = MAX_RETRIES;while (true) {try {umbilical.done (getTaskID ())

/ / in fact, the TA_DONE event is sent to TaskAttemptImpl on MRAppMaster

LOG.info ("Task'" + taskId + "done."); return;} catch (IOException ie) {LOG.warn ("Failure signalling completion:" + StringUtils.stringifyException (ie)); if (--retries = = 0) {throw ie;}

}

Umbilical.done (getTaskID ()); for more video materials of big data, please add QQ group: 947967114

/ / in fact, the TA_DONE event is sent to the TaskAttemptImpl on MRAppMaster, and driven by the TA_DONE event, the state machine of the corresponding TaskAttemptImpl object executes CleanupContainerTransition.transition, and then changes to the SUCCESS_CONTAINER_CLEANUP state. Note that there is a TaskAttemptEventType.TA_DONE event that is emitted on the node where the specific MapTask resides, but does not cause the state machine to jump on the MRAppMaster node. For Maptask, there will be a umbilical, which represents MRAppMaster.

MPAppmaster receives the CONTAINER_REMOTE_CLEANUP event, and ContainerLauncher invokes the ContainerManagerImpl.stopContainers of the node where Maptask resides through the RPC mechanism. Make the container of this MapTask enter the KILLED_BY_APPMASTER state and become inactive. Sends a TO_CONTAINER_CLEANED event to the appropriate TaskAttemptImpl after a successful operation. If a TaskAttempt is successful, it means that the attempted task is also successful, so the state of the TaskAttempt is related to the TaskImpl object, the scanning and follow-up of the taskImpl, including sending TaskState.SUCCESSED events to the JobImpl object at the upper level. A SUCCESSED event sent to its own TaskImpl causes a TaskImpl.handleTaskAttemptCompletion operation.

A procedure setMapOutputServerAdress function is generated on the Mapper node, which sets the MapOutputServer address of this node to a Web address, which means that the data output left by MapTask (the merged spill file) can be obtained through the HTTP connection. At this point, all the processes of the Mapper are complete. For more video information of big data, please add QQ group: 947967114

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report