Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Peak showdown between Cache and Persist in Spark

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Cache generation background

Let's start with a simple test reading a local file and doing a collect operation:

val rdd=sc.textFile("file:///home/hadoop/data/input.txt")val rdd=sc.textFile("file:///home/hadoop/data/input.txt")

Submitting ResultStage 0 (file://home/hadoop/data/input.txt MapPartitionsRDD[1] at textFile at :25), which has no missing parents, each time we have to read the input.txt file locally, can you think of any problems here? If I have a large file and I do the same action every time on the same RDD, then I have to read the file locally every time and get the same result. Repeating such operations over and over again is a waste of resources and time. At this point we might wonder if we could store the RDD in memory? The answer is yes, and that's what we're going to learn about cache.

Cache's role

From the above explanation, we know that sometimes the same RDD will be used in many places, so each place will encounter the Action operation will calculate the same operator many times, which will cause inefficiency. RDD can be persisted to memory or disk through cache operations.

Now we use the example above to cache rdd

rdd.cache At this time, we open the 192.168.137.130:4040 interface to check if there is any file we just cached in the storage interface, and found that there is none. At this point we perform an action operation rdd.count. Continue to check if there is anything in storage, ha

It also lists a lot of information for us, storage level (explained later), size (you will find it larger than the source file, which is also a tuning point), etc.

Speaking of which, what can my little friend think of? Is Cacha a transformation or an Action? I'm sure everyone knows.

cache This method is also a Tranformation, when the first encounter Action operator will be persisted, so we said that the first cache operation in the ui did not see the result, the count operation only has.

Detailed analysis of source code

Spark version: 2.2.0

source code analysis

/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()

From the source code, it is obvious that cache() calls persistent (). To know the difference between the two, you need to look at the persistent function: (here comment cache storage level)

/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

You can see that persistent () calls persistent (StorageLevel.MEMORY_ONLY) internally, which is not the same as the previous one. Here we can get the difference between cache and persistent: cache has only one default cache level MEMORY_ONLY, while persistent can set other cache levels according to the situation.

I'm sure my friends are curious about how many cache levels there are. Let's continue with the source code:

/** * :: DeveloperApi :: * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether * to replicate the RDD partitions on multiple nodes. * * The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). */@DeveloperApiclass StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable

Let's first look at the storage type. In the source code, we can see that there are five parameters, which represent:

useDisk: Use hard disk (external storage);

useMemory: use memory;

useOffHeap: Use off-heap memory, which is a concept inside the Java virtual machine, off-heap memory means allocating memory objects outside the heap of the Java virtual machine, which is directly managed by the operating system (not the virtual machine). The result of this is to keep the heap smaller, reducing the impact of garbage collection on the application. This memory is also frequently used and can lead to OOM, which is referenced by DirectByteBuffer objects stored in the heap to avoid copying data back and forth between the heap and the out-of-heap;

deserialized: deserialization is a mechanism provided by java to represent objects as a series of bytes; deserialization represents the process of restoring bytes to objects. Serialization is a mechanism for object persistence, which preserves objects and their attributes and restores them directly after deserialization.

replication: Number of backups (backups on multiple nodes, default 1).

Let's look at cache levels:

/** * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating * new storage levels. */object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

You can see that there are 12 cache levels listed here, but what's the difference? You can see that each cache level is followed by a StorageLevel constructor, which contains 4 or 5 parameters corresponding to the storage type mentioned above. The four parameters are because one has a default value.

Okay, here I want to ask my friends another question. What do these storage methods mean? How should I choose?

A detailed explanation was given on the official website. Here I introduce an interested classmate can go to the official website to see ha.

MEMORY_ONLY

Store data in memory using deserialized Java object format. If there is not enough memory to hold all the data, some partitions will not be cached and will be recalculated as needed. This is the default level.

MEMORY_AND_DISK

Use deserialized Java object formats to preferentially try to keep data in memory. If there is not enough memory to store all the data, the data is written to a disk file, and the next time the operator is executed on this RDD, the data persisted in the disk file is read out for use.

MEMORY_ONLY_SER((Java and Scala))

The basic meaning is the same as MEMORY_ONLY. The only difference is that the data in the RDD is serialized, and each partition of the RDD is serialized into a byte array. This method saves more memory, but increases the CPU load.

A simple example of the difference in memory levels of perception in sensory rows:

19M page_views.datval rdd1=sc.textFile("file:///home/hadoop/data/page_views.dat")rdd1.persist().count

ui View cache size:

Is it obviously bigger? Let's delete the cache rdd1.unpersist() first.

Use MEMORY_ONLY_SER level

import org.apache.spark.storage.StorageLevelrdd1.persist(StorageLevel.MEMORY_ONLY_SER)rdd1.count

I'm going to compare these two ways here, and you can try something else.

How do you choose? The official website also said.

You can make different choices between memory usage and CPU efficiency.

By default, the highest performance is MEMORY_ONLY, of course, but only if your memory is large enough to store all the data of the entire RDD. Because no serialization and deserialization operations are performed, this part of the performance overhead is avoided; the subsequent operator operations on this RDD are based on pure memory data, and there is no need to read data from disk files, and the performance is also very high; and there is no need to copy a copy of the data and transmit it remotely to other nodes. However, it must be noted here that in the actual production environment, I am afraid that the scenarios that can directly use this strategy are still limited. If there is a lot of data in the RDD (such as billions), directly using this persistence level will cause the JVM OOM memory overflow exception.

If a memory overflow occurs when using the MEMORY_ONLY level, it is recommended that you try using the MEMORY_ONLY_SER level. This level serializes the RDD data and stores it in memory, where each partition is just an array of bytes, greatly reducing the number of objects and memory footprint. This level of performance overhead is more than MEMORY_ONLY, mainly serialization and deserialization overhead. However, subsequent operators can operate on pure memory, so the overall performance is still relatively high. In addition, the same problem may occur, if the amount of data in the RDD is too large, it may also cause OOM memory overflow exceptions.

Don't leak to disk unless you're spending a lot of money on in-memory calculations, or you can filter a lot of data and keep some of the more important ones in memory. Otherwise, storage on disk will be slow to compute and performance will decrease dramatically.

Level with suffix_2, all data must be copied and sent to other nodes. Data replication and network transmission cause large performance overhead and are not recommended unless high availability of jobs is required.

Delete data from cache

Spark automatically monitors cache usage on each node and discards old data partitions in a least-recently-used (LRU) fashion. If you want to delete the RDD manually, rather than waiting for it to fall out of cache, use the RDD.unpersist() method.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report