Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the concrete implementation of SortShuffleWriter with source code in Spark2.x

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces how to use the source code to analyze the specific implementation of SortShuffleWriter in Spark2.x, the content is very detailed, interested friends can refer to, hope to be helpful to you.

I. Overview

The third implementation of Spark ShuffleWrite, SortShuffleWriter, is explained here. In the ShuffleWrite phase, if the two conditions of UnsafeShuffleWriter and BypassMergeSortShuffleWriter are not met, the code executes SortShuffleWriter. Let's take a look at his implementation:

II. Concrete realization

Look directly at the Write () function here, and the code is as follows:

/ * * Write a bunch of records to this task's output * / override def write (records: Iterator [Product2 [K, V]]): Unit = {/ / initialize ExternalSorter / / ExternalSorter initialization corresponding parameters based on whether data merging is performed on the map side / / aggregator: aggregator / / partitioner used by map/reduce-side in RDD shuffle: output to shuffle, which partitioner is used to partition the data For example, hashPartitioner or rangePartitioner / / ordering: sort by which key / / serializer: which serialization to use. If it is not specified, the spark.serializer parameter value sorter = if (dep.mapSideCombine) {require (dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") is used by default. New ExternalSorter [K, V, C] (context, dep.aggregator, Some (dep.partitioner), dep.keyOrdering, dep.serializer)} else {/ / if there is no map-side aggregation, then when creating the sorter object Aggregator and ordering will not pass in the corresponding values new ExternalSorter [K, V, V] (context, aggregator = None, Some (dep.partitioner), ordering = None, dep.serializer)} / / write data to buffer sorter.insertAll (records) first through the insertAll method.

/ / construct the final output file instance, where the file name is (reduceId is 0): / / "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId; val output = shuffleBlockResolver.getDataFile (dep.shuffleId, mapId) / / add uuid after the output file name to indicate that the file is being written, and rename val tmp = Utils.tempFileWith (output) after the end

Try {val blockId = ShuffleBlockId (dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)

/ / write the sorted record to the output file val partitionLengths = sorter.writePartitionedFile (blockId, tmp) / / generate an index file, that is, each reduce knows which data it belongs to through the index file shuffleBlockResolver.writeIndexFileAndCommit (dep.shuffleId, mapId, partitionLengths, tmp) / / construct MapStatus to return the result It contains the location information of ShuffleWriter output mapStatus = MapStatus (blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists () & &! tmp.delete ()) {logError (s "Error while deleting temp file ${tmp.getAbsolutePath}")}

Where ExternalSorter is a sort class of SortShuffleWriter, this class is used to sort some (K, V) type key-value pairs, merge if necessary, and the result is some (K, C) type key-combiner pairs. Combiner is the result of merging value of the same key. It first uses a Partitioner to divide the key into different partition, and then, if necessary, sorts the key within each partition according to a specific Comparator. It can output only one partitioned file, where different partition are located in different areas of the file (each partition is contiguous at the byte level), so it is suitable for fetching data during shuffle.

two。 Here, let's take a look at the sorter.insertAll (records) function in line 14 above. It actually does a lot of things. The code is as follows:

Def insertAll (records: Iterator [Product2 [K, V]]): Unit = {/ / whether Map is aggregated here identifies val shouldCombine = aggregator.isDefined / / decides whether to use map or buffer depending on whether to aggregate on the Map side, / / if you need to do map-side aggregation through key, use PartitionedAppendOnlyMap / / if not, use PartitionedPairBuffer if (shouldCombine) {/ / use AppendOnlyMap to combine / / get the mergeValue function of aggregator in memory first, and use it to merge the new value to the aggregate record val mergeValue = aggregator.get.mergeValue / / get the createCombiner function of aggregator Initial value for creating aggregation val createCombiner = aggregator.get.createCombiner var kv: Product2 [K, V] = null val update = (hadValue: Boolean, oldValue: C) = > {/ / create update function If there is no value for mergeValue, then createCombiner if (hadValue) mergeValue (oldValue, kv._2) else createCombiner (kv._2)} while (records.hasNext) {addElementsRead () kv = records.next () / partitionID is calculated by key, and data is sorted by partitionID / / the partitionID here is actually the number of Reduce / / a pair of key computing partitions Then start merge map.changeValue ((getPartition (kv._1), kv._1), update) / / if you need to overflow memory data to disk maybeSpillCollection (usingMap = true)}} else {/ / Stick values into our buffer while (records.hasNext) {addElementsRead () val kv = records.next () / / calculate partition ID through key Sort the data through partitionID / / the partitionID here is actually the number of Reduce buffer.insert (getPartition (kv._1), kv._1, kv._ 2.asInstanceOf [C]) / / when the buffer reaches the memory limit (the default size of buffer is 32k, determined by the spark.shuffle.file.buffer parameter), the data in buffer will be spill to maybeSpillCollection (usingMap = false)}

3. Let's continue to trace how the maybeSpillCollection () function overwrites the in-memory data, as follows:

Private def maybeSpillCollection (usingMap: Boolean): Unit = {var estimatedSize = 0L / / if it is map, that is, if aggregation is required on the Map side, if (usingMap) {/ / estimate a value here to determine whether overwriting is required, / / if so After the overflow is completed, reinitialize a map estimatedSize = map.estimateSize () if (maybeSpill (map, estimatedSize)) {map = new PartitionedAppendOnlyMap [K, C]} / / where the map executed here does not need aggregation} else {/ / estimate a value here to determine whether an overwrite is required, / / if so Reinitialize a buffer estimatedSize = buffer.estimateSize () if (maybeSpill (buffer, estimatedSize)) {buffer = new PartitionedPairBuffer [K, C]}} if (estimatedSize > _ peakMemoryUsedBytes) {_ peakMemoryUsedBytes = estimatedSize}} after the overflow is complete

4. The above involves the overflow write judgment function maybeSpill. Let's take a look at how he makes the judgment. The code is as follows:

/ / maybeSpill function judgment is roughly divided into three steps / / 1. Try to get the amountToRequest size of memory for the current thread (amountToRequest = 2 * currentMemory-myMemoryThreshold). / / 2. If the memory obtained is still insufficient (myMemoryThreshold = myMemoryThreshold) {/ / Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory-myMemoryThreshold / / the underlying layer calls the acquireExecutionMemory method of TaskMemoryManager to allocate memory val granted = acquireMemory (amountToRequest) / / update the current memory threshold myMemoryThreshold + = granted / / determine again whether the current memory is greater than the threshold If it is still greater than the threshold, spill shouldSpill = currentMemory > = myMemoryThreshold} shouldSpill = shouldSpill | | _ elementsRead > numElementsForceSpillThreshold / / Actually spill if (shouldSpill) {_ spillCount + = 1 logSpillage (currentMemory) / / spill, so the overflow must be written to the buffer first and then to disk. / / there is an important parameter spark.shuffle.file.buffer defaults to 32k, and spill (collection) _ elementsRead = 0 _ memoryBytesSpilled + = currentMemory releaseMemory ()} shouldSpill} is often adjusted for optimization.

There is a deeper level of code, there is no longer tracking here, as long as you understand the whole general train of thought, you can follow it yourself if you are interested.

To facilitate your understanding, the following is a flow chart of SorteShuffleWriter implementation. The processing flow of BypassMergeSortShuffleWriter and UnsafeShuffleWriter is basically the same as this process, but the specific implementation is slightly different, and the level is limited, for reference only:

On how to use the source code in Spark2.x to analyze the specific implementation of SortShuffleWriter is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report