In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article shows you how to use Shuffle memory in Spark. The content is concise and easy to understand. It will definitely make your eyes shine. I hope you can gain something through the detailed introduction of this article.
Spark memory management and consumption model
Before analyzing Spark Shuffle memory usage. Let's start with the following question: What is the general model of Spark's memory management and consumption when a Spark Task is assigned to run on an Executor? (Note: Since OOM mainly occurs on the Executor side, the following discussion focuses on memory management and usage on the Executor side.)
In Spark, the abstract class MemoryConsumer is used to represent consumers who need to use memory. This class defines methods or interfaces for allocating, freeing, and spilling memory data to disk. Specific consumers can inherit MemoryConsumer to implement specific behaviors. Therefore, during the execution of Spark Task, there will be various types and numbers of specific consumers. ExternalAppendOnlyMap, ExternalSorter, etc. used in Spark Shuffle (details will be analyzed later).
2, MemoryConsumer will request, release the relevant memory work to be performed by TaskMemoryManager. When a Spark Task is assigned to run on an Executor, a TaskMemoryManager is created. Before TaskMemoryManager can allocate memory, a request is made to MemoryManager, which then performs the actual memory allocation via MemoryAllocator.
3. MemoryManager in Executor will manage memory usage uniformly. Because each TaskManager makes a request to MemoryManager first before performing the actual memory allocation. MemoryManager therefore has a global view of memory usage by the current process.
MemoryManager, MemoryTaskManager and MemoryConsumer correspond to each other, as shown below. In general, a MemoryManager corresponds to at least one TaskMemoryManager (specified by the executor-core parameter), and a TaskMemoryManager corresponds to multiple MemoryConsumers (depending on the task).
After understanding the overall process of memory consumption above, there are two problems to pay attention to:
1, when there are multiple Tasks executing simultaneously on the Executor, there will be multiple TaskManagers sharing MemoryManager-managed memory. How is MemoryManager allocated? The answer is that the memory range that can be allocated to each task is [1 / (2 * n), 1 / n], where n is the number of tasks running. Therefore, having multiple concurrent Tasks reduces the amount of memory available to each Task.
2. As mentioned earlier, there is Spill method in MemoryConsumer. When MemoryConsumer does not apply for enough memory, you can Spill the current memory to disk, thus avoiding uncontrolled use of memory. However, the request and release of memory from the heap is actually managed by the JVM. Therefore, when counting the specific memory usage in the heap, considering various reasons such as performance, Spark currently uses sampling statistics to calculate the memory used by MemoryConsumer, resulting in the actual memory usage in the heap is not particularly accurate. Therefore, it is possible to cause OOM due to failure to Spill in time.
The Spark Shuffle Process
Spark Shuffle is divided into two stages: Shuffle Write and Shuffle Read.
The Write phase generally goes through sorting (the minimum requirement is that you need to sort by partition), possibly combining and merging (in the case of multiple file spill disks), and eventually each write Task produces two files, data and index. The data file is stored in partitions, i.e., the data of the same partition is continuous in the file, and the index file records the starting and ending positions of each partition in the file.
For Shuffle Read, it may first be necessary to obtain the data of a given partition from each Write task node through the network, that is, a certain continuous area in the data file, and then through sorting, merging and other processes to finally form the calculation result.
For Shuffle Write, Spark currently has three implementations, specifically BypassMergeSortShuffleWriter, UnsafeShuffleWriter and SortShuffleWriter (there is a judgment condition for which implementation to use, not shown here). Shuffle Read has only one implementation.
2.1 Shuffle Write Phase Analysis 2.1.1 Bypass MergeSortShuffleWriter Analysis
For the implementation of BypassMergeSortShuffleWriter, the general implementation process is to first create a temporary partition file for each partition, write data to the corresponding partition file, and finally merge all partition files into a data file and generate an index file. Since this process does not do sorting, combine (which is not used if combined is required), etc., it is generally less memory-intensive for BypassMergeSortShuffleWriter.
2.1.2 SortShuffleWriter analysis
SortShuffleWriter is the most common implementation and the most frequently used one. SortShuffleWriter mainly delegates ExternalSorter to do data insertion, sorting, Merge, Combine, and finally write data and index files. ExternalSorter implements the MemoryConsumer interface mentioned earlier. Here's how each process uses memory:
1. For data writes, the data will be inserted into the PartitionedAppendOnlyMap or PartitionedPairBuffer array, depending on whether you want to combine it. Every once in a while, when there is not enough memory requested from MemoryManager, or when the amount of data exceeds the spark.shuffle.spill.numElementsForceSpillThreshold threshold (default is the maximum value of Long, does not work), it will spill memory data to files. Assuming a steady stream of applications to memory, then all the data in the Write phase will always be stored in memory, so it can be seen that PartitionedAppendOnlyMap or PartitionedPairBuffer is more memory-hungry.
2, whether PartitionedAppendOnlyMap or PartitionedPairBuffer, the sorting algorithm used is TimSort. The temporary extra space used in the normal case using this algorithm is small, but in the worst case it is n / 2, where n is the length of the array to be sorted (see TimSort implementation for details).
3. When inserting data because there is not enough memory to apply, the data will be spilled to disk. Before writing the final sorting result to the data file, it is necessary to merge PartitionedAppendOnlyMap or PartitionedPairBuffer in memory with SpillFiles that have been spilled to disk. The general process of Merge is shown below.
As you can see from the image above, it's pretty much a merge sort process, so it doesn't consume much extra memory. The aggregation calculation in the merging process is generally the same process. The only thing to pay attention to is the case of key collision, that is, the hash value of the key value of each ordered queue currently input is the same, but the actual key value is different. In this case, extra space is needed to hold all intermediate results with different key values but the same hash value. But overall, the probability of this happening is not particularly high.
4. The process of writing data files involves the conversion between different data streams, and in the process of writing streams, there is generally a cache, which is mainly controlled by the parameters spark.shuffle.file.buffer and spark.shuffle.spill.batchSize. Overall, this part of the overhead is not large.
From the above analysis of the main process of SortShuffleWriter write phase, we can see that the main memory consumption is in the write PartitionedAppendOnlyMap or PartitionedPairBuffer phase.
2.1.3 UnsafeShuffleWriter
UnsafeShuffleWriter is an optimization of SortShuffleWriter, which is similar to SortShuffleWriter in general, and will not be repeated here. From the perspective of memory usage, the main differences are as follows:
On the one hand, in PartitionedAppendOnlyMap or PartitionedPairBuffer of SortShuffleWriter, what is stored is the specific type of key or value, that is, Java object, which is the deserialized data. In the ShuffleExternalSorter of UnsafeShuffleWriter, the data is serialized and stored in the actual Page, and additional length information is written during the writing process. in general, that size of the serialize data is much smaller than the data before serialization.
UnsafeShuffleWriter, on the other hand, requires an extra storage record (LongArray) that holds partition information and pointers to the actual serialized data (encoded Page num and Offset). The overhead of storing this part of UnsafeShuffleWriter is extra compared to SortShuffleWriter.
2.2 Shuffle Read Phase Analysis
Spark Shuffle Read mainly goes through the process of acquiring data, serializing streams, adding metrics statistics, possible aggregation calculations, and sorting. The general process is as shown in the figure below.
These calculations are mostly iterative. In the above steps, the more complex operations are getting data from remote, aggregating, and sorting operations. Next, analyze the memory usage of these three steps in turn.
1. Data acquisition is divided into remote acquisition and local acquisition. Local fetch will fetch data directly from the local BlockManager, while for remote data, you need to go through the network. During remote fetching, there are parameters that control the size of the data fetched concurrently from the remote, the number of requests that are fetching data, and whether a single block request is placed in memory. Specific parameters include spark.reducer.maxSizeInFlight (default 48M), spark.reducer.maxReqsInFlight, spark.reducer.maxBlocksInFlightPerAddress and spark.maxRemoteBlockSizeFetchToMem.
Considering the scenario of data skew, if there is a Block data that is particularly large in the Map phase, by default, since spark.maxRemoteBlockSizeFetchToMem does not impose restrictions, the entire Block data that needs to be obtained needs to be placed in the memory of the Reduce side at this stage, which is very memory-consuming at this time. You can set the spark.maxRemoteBlockSizeFetchToMem value, if this threshold is exceeded, you can drop the disk, avoiding OOM in this case. In addition, after obtaining the data, the obtained data will be verified by default (parameter spark.shuffle. detectCorruption control), this process also increases a certain memory consumption.
2. For aggregation and sorting cases, this process is implemented with ExternalAppendOnlyMap. The whole insertion, Spill, and Merge process is similar to the Write phase. In general, this piece is also relatively memory consuming, but because of the Spill operation, when the memory is insufficient, the memory data can be brushed to the disk, thereby freeing up memory space.
Spark Shuffle OOM Possibility Analysis
Around memory usage, the previous section analyzed Spark memory management in more detail and where Shuffle processes may use more memory. The main points summarized below are as follows:
1. First of all, it is necessary to pay attention to the task concurrency on the Executor side. Multiple tasks running at the same time will share the memory on the Executor side, so that the memory available for a single Task is reduced.
2, whether in Map or in Reduce, insert data into memory, sort, merge are compared are compared to occupy memory. Because of Spill, there is theoretically no OOM caused by data skew. However, since the allocation and release of objects in the heap is managed by the JVM, and Spark obtains the memory that has been used by sampling, it is possible that the sampling is inaccurate and cannot be Spill in time, resulting in OOM.
3. When Reduce acquires data, due to data skew, it may cause the data of a single Block to be very large. By default, sufficient memory is required to store the data of a single Block. Therefore, it is highly likely that OOM will occur due to data skew at this time. You can set the spark.maxRemoteBlockSizeFetchToMem parameter. After setting this parameter, if a certain threshold is exceeded, the data will be automatically Spill to the disk. In this case, OOM caused by data tilt can be avoided. This was also demonstrated in our production environment, where task OOM was significantly reduced after setting this parameter to a reasonable threshold.
4. After Reduce obtains data, decompression check will be performed on the data stream by default (parameter spark.shuffle.detectCorrupt). As mentioned in the code comments, since there are no Spill-to-Disk operations in this section, there is also a large possibility that OOM will result. In our production environment we also encounter OOM due to inspection.
That's how to use Shuffle memory in Spark. Have you learned anything or skills? If you want to learn more skills or enrich your knowledge reserves, please pay attention to the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.