Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Uncover the Spark application performance tuning

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Introduction: distributing and processing data across multiple machines is the core capability of Spark, which is what we call large-scale data set processing. In order to take full advantage of Spark features, some tuning techniques should be considered. Each section of this article is about tuning techniques, and gives the necessary steps to achieve tuning.

This article is selected from "Spark GraphX actual combat".

1 accelerate Spark with caching and persistence

We know that Spark can realize the principle of calculation chain through RDD: the conversion function is included in the RDD chain, but only after calling the action function will the actual evaluation process be triggered, the distributed operation will be performed, and the operation result will be returned. What happens if I call action repeatedly on the same RDD?

RDD persistence

In general, RDD does not retain the results of the operation, and if the action function is called again, the entire RDD chain will be re-calculated. In some cases this will not be a problem, but for many machine learning tasks and graph processing tasks, this is a big problem. Algorithms that usually require multiple iterations are executed many times on the same RDD, and repeatedly reloading data and recalculating can lead to a waste of time. To make matters worse, these algorithms usually require long RDD chains.

It seems that we need another way to make full use of the available memory of the cluster to save the results of RDD operations. This is Spark caching (caching is also a persistence type supported by Spark).

To cache a RDD in memory, call the cache function of the RDD object. The following code executed in spark- shell calculates the total number of lines in the file and outputs the contents of the file:

Val filename = "..." val rdd1 = sc.textFile (filename) .cacherdd1.countrdd1.collect

If the cache function is not called, when the two action functions, count and collect, are called, it will cause the execution to read the file from the storage system twice. When the cache function is called, the first action function (count function) keeps its results in memory, and when the second action function (collection function) is executed, the operation continues directly on the cached data without the need to recalculate the entire RDD chain. Even if a new RDD is generated by converting the cached RDD, the cached data is still available. The following code finds all the comment lines (row data that starts with #).

Val rdd2 = rdd1.filter (_ .startsWith ("#")) rdd2.collect

Because rdd2 originates from cached rdd1,rdd1 and caches the results of its operations in memory, rdd2 does not need to re-read data from the storage system.

Note: the cache method serves as a flag to indicate that the RDD should be cached, but not immediately. Caching occurs the next time the current RDD is to be calculated.

Persistence level

As mentioned above, caching is one of the persistence types. The following table lists all persistence levels supported by Spark.

Each persistence level is defined in the singleton object StorageLevel. For example, calling the rdd.persist (StorageLevel.MEMORY_AND_DISK) method sets the RDD to memory and disk caching. Rdd.persist (StorageLevel.MEMORY_ONLY) is also called inside the cache method.

Note: other persistence levels, such as MEMORY_ONLY2, MEMORY_AND_ DISK2, etc. are also available. They replicate the RDD to other nodes in the cluster to provide fault tolerance. These contents are beyond the scope of this book. Interested readers can take a look at Petar Zec' evic' and Marko.

Bonac' I (Manning, 2016)'s book Spark in Action, which provides a more in-depth introduction to Spark fault tolerance.

Persistence of graphs

Whenever functions such as mapVertices or aggregateMessages are called through the Graph object, these operations are based on the underlying RDD.

Graph objects provide convenient caching and persistence methods based on vertex RDD and edge RDD.

Anti-persistence at the right time

Although it seems that caching is a good thing that should be used everywhere, using too much can make people rely on it too much.

As more and more RDD are cached, the available memory is reduced. Eventually Spark will drive the partition data out of memory (using the least recently used algorithm, LRU). At the same time, too many Java objects are cached, and the high consumption of JVM garbage collection is inevitable. This is why it is necessary to call the un- persist method when the cache is no longer used. For iterative algorithms, the following method invocation patterns are commonly used in loops:

Call the cache or persist method of Graph.

Call the action function of Graph and trigger the RDD under Graph to be cached.

Executes the rest of the algorithm body.

In the last part of the loop body, anti-persistence, that is, releasing the cache.

Tip: the advantage of using Pregel API is that it already caches and frees the cache internally.

When not caching

RDD cannot be cached blindly in memory. To consider how many times the dataset is accessed and the cost of recalculating and caching each access, recalculating may also cost less than increasing memory.

There is no doubt that caching RDD makes no sense if you read the dataset only once, which also makes the job run more slowly, especially with serialized persistence levels.

2.checkpointing

A common pattern in the graph algorithm is to update the graph with the new data after each iteration. This means that the chain of vertex RDD or edge RDD that actually makes up the graph becomes longer and longer.

Definition: when a RDD is formed by an ancestral RDD chain inherited step by step, we say that the path from RDD to the root RDD is its pedigree.

The example shown in the following listing is a simple algorithm that generates a new set of vertices and updates the graph. The number of iterations of this algorithm is controlled by the variable iterations.

Each time the above code calls joinVertices, a new RDD is added to the vertex RDD chain. Obviously we need to use caching to ensure that the RDD chain is not recalculated during each iteration, but this does not change the fact that there is a growing list of object references from the child RDD to the parent RDD.

As a result, if you run too many iterations, the Stack- OverflowError stack overflow error will eventually explode in the running code. Stack overflows usually occur after 500 iterations.

A feature provided by RDD and inherited by Graph: checkpointing, can solve the problem of long RDD pedigree. The code in the following listing demonstrates how to use checkpointing so that you can continuously output vertices and update the result graph.

A RDD labeled checkpointing saves the RDD to a checkpoint directory, and the connection to the parent RDD is severed, cutting off the lineage pedigree. A Graph marked checkpointing causes the following vertex RDD and edge RDD to do checkpoint.

Call SparkContext.setCheckpointDir to set up the checkpoint directory and specify a file path to a shared storage system, such as HDFS.

As shown in the previous code listing, checkpoint must be called before any method of RDD is called, because checkpointing is a time-consuming process (after all, you need to write the graph to a disk file), you usually need to constantly checkpoint to avoid stack overflow errors, and you can generally do checkpoint every 100 iterations.

Note: one option to speed up checkpointing is to checkpoint to Tachyon (renamed to Alluxio) rather than checkpoint to standard file systems. Alluxio, from AMPLab, is a "memory-centric, fault-tolerant, distributed file system that enables cluster frameworks such as Spark to accelerate access to files shared in memory."

3 reduce memory pressure through serialization

Memory pressure (insufficient memory) is often one of the main reasons for poor performance and failure of Spark applications. These problems are usually characterized by frequent and time-consuming JVM garbage collection and "out of memory" errors. Checkpointing doesn't relieve memory pressure here either. When you encounter this problem, first consider serializing the Graph object.

Definition: data serialization, Data serialization, converts (serializes) the object instance represented in JVM into a byte stream; transfers the byte stream to another JVM process over the network; in another JVM process, the byte stream can be "deserialized" into an object instance. Spark uses serialization to transfer objects between networks and cache serialized byte streams in memory.

To use serialization, you can choose the following StorageLevels in persist:

StorageLevel.MEMORY_ONLY_SER

StorageLevel.MEMORY_AND_DISK_SER

Serialization saves space, while serialization and deserialization increase the overhead of CPU.

Serialization using Kryo

Spark uses JavaSerializer to serialize objects by default, which is an inefficient Java serialization framework, and a better option is to choose Kryo. Kryo is an open source Java serialization framework that provides fast and efficient serialization capabilities.

To use Kryo serialization in Spark, you only need to set the spark.serializer parameter to org. Apache.spark.serializer.KryoSerializer, set the command line parameters like this:

Spark-shell-conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

If you set the parameters like this every time, it will be very cumbersome. It can be found in $Spark_HOME/conf/spark-

In the defaults.conf configuration file, write parameters such as spark.serializer and their corresponding values to the configuration file using the standard properties file syntax (separated by Tab as a line), as shown below:

Spark.serializer org.apache.spark.serializer.KryoSerializer

To ensure the best performance, Kryo requires you to register the class to be serialized. If not, the class name will be serialized in the object bytecode, which has a great impact on performance. Fortunately, Spark automatically registers the classes used in its framework; however, if there are custom classes in the application code that happen to be serialized with Kryo, you need to call the SparkConf.registerKryoClasses function to register manually. The following listing shows how to register the custom class Person.

Check RDD size

When tuning your application, you often need to know the size of the RDD. This is tricky because the size of the objects in the file or data library does not have much to do with how much memory the objects take up in JVM.

A little trick is to cache the RDD in memory and then go to the Storage tab in Spark UI, where the size of the RDD is recorded. To measure the effectiveness of configured serialization, you can also use this method.

This article is selected from "Spark GraphX practice". Click this link to view the book on the official website of the blog.

For more wonderful articles in time, search for "blog viewpoints" on Wechat or scan the QR code below and follow.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report