Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the concrete implementation

2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to analyze the specific implementation, the content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

I. Overview

Here we analyze the principle and specific implementation details of BypassMergeSortShuffleWriter implementation strategy from the point of view of source code.

The specific implementation of BypassMergeSortShuffleWriter is in the write () function of the corresponding class. Let's directly look at the source code for analysis.

1. Let's first look at constructor initialization.

BypassMergeSortShuffleWriter (BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, BypassMergeSortShuffleHandle handle, int mapId, TaskContext taskContext, SparkConf conf) {/ / gets the spark.shuffle.file.buffer parameter value. The default is 32k. Here is an important parameter. / / this parameter is used to set the buffer buffer size of the BufferedOutputStream of shuffle write task. / / the data will be written to the buffer buffer before writing to the disk file, and will not be overwritten to the disk until the buffer is full / / if the available memory resources for the job are sufficient, you can appropriately increase the size of this parameter (such as 64k), so as to reduce the number of overwriting disk files during shuffle write, and / / you can also reduce the number of disk IO. Further improve the performance this.fileBufferSize = (int) conf.getSizeAsKb ("spark.shuffle.file.buffer", "32k") * 1024 / / whether to use NIO's copy method from file to file stream. The default value is that true generally does not need to modify this.transferToEnabled = conf.getBoolean ("spark.file.transferTo", true); this.blockManager = blockManager; / / gets the ShuffleDependency object in shufflehandle, through which data such as divider and number of partitions are obtained. Final ShuffleDependency dep = handle.dependency (); this.mapId = mapId; this.shuffleId = dep.shuffleId (); this.partitioner = dep.partitioner (); this.numPartitions = partitioner.numPartitions (); this.writeMetrics = taskContext.taskMetrics (). ShuffleWriteMetrics () / / set the serialization tool object, and the shuffleBlockResolver object, / / the object used to create and maintain the mapping between the logical block of shuffle data and the physical file location this.serializer = dep.serializer (); this.shuffleBlockResolver = shuffleBlockResolver;}

two。 If you look at the write () function, the source code is as follows:

/ / the general idea here is to create temporary files on disk for each partition and give each writer

The general idea of the above code is as follows:

a. Determine the number of partitions, and then create DiskBlockObjectWriter and temporary files for each partition

b. Loop partitions record through Partitioner and writes to the corresponding partition temporary file

c. Brush partition data to disk

d. Based on shuffleId and mapId, build ShuffleDataBlockId and create a temporary file that merges the file data and the merged file in the following format:

Shuffle_ {shuffleId} _ {mapId} _ {reduceId} .data

e. Merge the partition file into a total temporary file, which is renamed to the final output file name and returns an array of corresponding partition file lengths

f. Create index files index and index temporary files, write the length and offset of each partition to the index file, etc., and rename temporary data files and temporary index files

g. Encapsulate some information into MapStatus to return

There is a problem:

This Writer creates a temporary file for each partition, and if there are too many partitions, it creates a lot of output output streams and temporary file objects, which takes up too much resources and degrades performance.

Focus on:

Parameter: spark.shuffle.file.buffer default 32k

By default, when the map task of shuffle is output to a disk file, Uni-President will first write to a memory buffer associated with each task, and each time when the memory buffer overflows, it will be overwritten to disk. If memory conflicts can be appropriately increased to reduce the number of overwrites to disk files during the shuffle write process, the number of disk IO can be reduced, thus improving performance. In practice, it is found that by adjusting this parameter reasonably, the performance will be improved by 1% to 5%.

On how to analyze the specific implementation to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report