Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the development tuning in Spark performance optimization?

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces what the development tuning in Spark performance optimization is like, the content is very detailed, interested friends can refer to, hope to be helpful to you.

1. Preface

In big data's computing field, Spark has become one of the more and more popular computing platforms. The function of Spark covers various types of computing operations in big data's field, such as offline batch processing, SQL class processing, streaming / real-time computing, machine learning, graph computing and so on. In Meituan and Dianping, many students have tried to use Spark in various projects. The reason why most students (including the author) first tried to use Spark is very simple, mainly in order to make big data's computing homework faster and higher performance.

However, it is not so simple to develop a high-performance big data computing job through Spark. If the Spark job is not properly tuned, the execution speed of the Spark job may be very slow, which completely does not reflect the advantage of Spark as a fast big data computing engine. Therefore, if you want to make good use of Spark, you must optimize its performance reasonably.

The performance tuning of Spark actually consists of many parts, and it is not possible to improve job performance immediately by adjusting a few parameters. We need to make a comprehensive analysis of Spark jobs according to different business scenarios and data, and then adjust and optimize in many aspects in order to achieve * performance.

According to the previous Spark job development experience and practical accumulation, the author summed up a set of Spark job performance optimization scheme. The whole scheme is mainly divided into several parts: development tuning, resource tuning, data tilt tuning and shuffle tuning. Development tuning and resource tuning are some basic principles that all Spark jobs need to pay attention to and follow, and they are the basis of high-performance Spark jobs; data tilt tuning mainly explains a complete set of solutions to solve the data tilt of Spark jobs; shuffle tuning is for students who have a deep grasp and research on the principles of Spark, and mainly explains how to tune the shuffle operation process and details of Spark assignments.

As the basis of the Spark performance optimization guide, the following focuses on development tuning and resource tuning.

2. Development and optimization

The step of Spark performance optimization is to pay attention to and apply some basic principles of performance optimization in the process of developing Spark jobs. Development and tuning is to let you understand the following basic Spark development principles, including: RDD lineage design, reasonable use of operators, optimization of special operations, and so on. In the process of development, we should pay attention to the above principles all the time, and apply these principles flexibly to our own Spark assignments according to the specific business and actual application scenarios. The step of Spark performance optimization is to pay attention to and apply some basic principles of performance optimization in the process of developing Spark jobs. Development and tuning is to let you understand the following basic Spark development principles, including: RDD lineage design, reasonable use of operators, optimization of special operations, and so on. In the process of development, we should pay attention to the above principles all the time, and apply these principles flexibly to our own Spark assignments according to the specific business and actual application scenarios.

Principle 1: avoid creating duplicate RDD

Generally speaking, when we develop a Spark job, we first create an initial RDD; based on a data source (such as a Hive table or HDFS file), then perform an operator operation on this RDD, then get the next RDD;, and so on, until we calculate the final result we need. In this process, multiple RDD will be strung together through different operator operations (such as map, reduce, etc.). This "RDD string" is RDD lineage, that is, "RDD consanguinity chain".

We should pay attention in the development process: for the same data, only one RDD should be created, not multiple RDD should be created to represent the same data.

When some Spark beginners start to develop Spark jobs, or experienced engineers develop RDD lineage's extremely lengthy Spark jobs, they may forget that they have previously created a RDD for a piece of data, resulting in the creation of multiple RDD for the same data. This means that our Spark job will perform repeated calculations to create multiple RDD representing the same data, which in turn increases the performance overhead of the job.

A simple example.

/ / that is, you need to perform two operator operations on one piece of data. / / wrong practice: create multiple RDD when performing multiple operator operations on the same piece of data. / / the textFile method is executed twice, two RDD are created for the same HDFS file, and / / an operator operation is performed on each RDD. / / in this case, Spark needs to load the contents of the hello.txt file twice from HDFS and create two separate RDD; / / second load HDFS files, as well as the performance overhead of creating RDD, which is obviously wasted. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") rdd1.map (...) Val rdd2 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") rdd2.reduce (...) / / correct usage: when performing multiple operator operations on a piece of data, only one RDD is used. / / this is obviously much better than the previous one, because we only created a RDD for the same piece of data, / / and then performed multiple operator operations on that RDD. / / but note that the optimization is not finished so far. Because rdd1 has been performed two operator operations, and the second time the reduce operation is performed, / / the rdd1 data will be recalculated from the source again, so there will still be performance overhead of double calculation. / / to solve this problem thoroughly, we must combine "principle 3: persistence of RDD that is used many times" to ensure that a RDD is calculated only once when it is used multiple times. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt") rdd1.map (...)

Principle 2: reuse the same RDD as much as possible

In addition to avoiding creating multiple RDD for the same piece of data during the development process, reuse one RDD as much as possible when performing operator operations on different data. For example, if one RDD's data format is of type key-value and the other is of a single value type, the value data of the two RDD are exactly the same. Then we can only use that RDD of type key-value at this time, because it already contains the data of another. For cases like this where multiple RDD data overlap or contain, we should reuse one RDD as much as possible, so that the number of RDD can be reduced as much as possible, thus minimizing the number of operator execution.

A simple example.

/ / wrong practice. / / there is a format of RDD, that is, rdd1. / / then, due to business needs, a map operation is performed on rdd1 and a rdd2 is created, and the data in rdd2 is only the value value in rdd1, that is, rdd2 is a subset of rdd1. JavaPairRDD rdd1 =... JavaRDD rdd2 = rdd1.map (...) / / performs different operator operations on rdd1 and rdd2, respectively. Rdd1.reduceByKey (...) Rdd2.map (...) / / the right thing to do. / / in the above case, the only difference between rdd1 and rdd2 is that the data format is different. The data of rdd2 is a subset of rdd1, but two rdd are created and an operator operation is performed on both rdd. / / at this point, the operator operation will be performed one more time because the map operator is executed on the rdd1 to create the rdd2, thus increasing the performance overhead. / / in fact, the same RDD can be reused in this case. / / We can use rdd1 to do both reduceByKey and map operations. / / in the second map operation, only the tuple._2 of each data, that is, the value in rdd1, is used. JavaPairRDD rdd1 =... Rdd1.reduceByKey (...) Rdd1.map (tuple._2...) / / the second method significantly reduces the computational overhead of a rdd2 compared with the * method. / / but so far, the optimization is not over, we have performed two operator operations on rdd1, and rdd1 will actually be evaluated twice. / / therefore, it also needs to be used in conjunction with "principle 3: persistence of RDD that is used multiple times" to ensure that a RDD is calculated only once when it is used multiple times.

Principle 3: persist the RDD that is used many times

When you have done operator operations on a RDD many times in your Spark code, congratulations, you have implemented the optimization of the Spark job * * step, that is, to reuse RDD as much as possible. On this basis, it is time for the second step of optimization, that is, to ensure that when multiple operator operations are performed on a RDD, the RDD itself is calculated only once.

The default principle in Spark for executing an operator multiple times for a RDD is this: every time you perform an operator operation on an RDD, you will recalculate from the source, calculate that RDD, and then perform your operator operation on the RDD. The performance of this approach is very poor.

So in this case, our advice is to persist RDD that is used many times. At this point, Spark will save the data in RDD to memory or disk according to your persistence strategy. Each time you perform an operator operation on this RDD, the persistent RDD data is extracted directly from memory or disk, and then the operator is executed without recalculating the RDD from the source and then performing the operator operation.

Code example for persisting RDD that is used many times

/ / if you want to persist a RDD, simply call cache () and persist () on the RDD. / / the right thing to do. The / / cache () method says that all the data in the RDD is attempted to be persisted to memory in a non-serialized way. / / when performing two more operator operations on rdd1, the rdd1 will be calculated from the source only if the map operator is executed * times. / / when the reduce operator is executed for the second time, the data is extracted directly from memory for calculation, and a rdd is not repeatedly calculated. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt"). Cache () rdd1.map (...) The rdd1.reduce (...) / / persist () method indicates that the persistence level is manually selected and persisted in the specified manner. / / for example, StorageLevel.MEMORY_AND_DISK_SER says that when there is enough memory, it is preferred to persist to memory, and when there is not enough memory, it is persisted to disk files. / / and the _ SER suffix indicates that RDD data is saved by serialization, in which case each partition in RDD is serialized into a large byte array and then persisted to memory or disk. / / serialization can reduce the memory / disk footprint of persistent data, thereby preventing memory from being occupied too much by persistent data, resulting in frequent GC. Val rdd1 = sc.textFile ("hdfs://192.168.0.1:9000/hello.txt"). Rdd1.map (StorageLevel.MEMORY_AND_DISK_SER) rdd1.map (...) Rdd1.reduce (...)

For the persist () method, we can choose different persistence levels according to different business scenarios.

Persistence level of Spark

How to choose the most appropriate persistence strategy

By default, the performance is MEMORY_ONLY, of course, but only if your memory is large enough to hold all the data for the entire RDD. Because there is no serialization and deserialization operation, this part of the performance overhead is avoided; the subsequent operator operations on this RDD are all based on data operations in pure memory, do not need to read data from disk files, and have high performance; and do not need to copy a copy of the data and transfer it to other nodes remotely. However, it must be noted that in the actual production environment, there may be limited scenarios in which this strategy can be used directly. If there is a lot of data in RDD (for example, billions), using this persistence level directly will lead to an exception of OOM memory overflow in JVM.

If a memory overflow occurs when using the MEMORY_ONLY level, it is recommended that you try the MEMORY_ONLY_SER level. This level serializes RDD data and then saves it in memory, where each partition is just an array of bytes, greatly reducing the number of objects and memory footprint. The main performance overhead of this level over MEMORY_ONLY is the cost of serialization and deserialization. However, the subsequent operators can operate based on pure memory, so the overall performance is still relatively high. In addition, the problem that may occur is the same as above. If there is too much data in the RDD, it is also an exception that may cause the OOM memory overflow.

If the level of pure memory is not available, it is recommended that you use the MEMORY_AND_DISK_SER policy instead of the MEMORY_AND_DISK policy. Because now that we have reached this stage, it means that the amount of data in RDD is so large that the memory can not be put down completely. There is less data after serialization, which can save memory and disk space. At the same time, the strategy will give priority to try to cache the data in memory as far as possible, and the memory cache will not be written to disk.

DISK_ONLY and levels with a suffix of _ 2 are generally not recommended: reading and writing data entirely based on disk files can lead to sharp performance degradation, and sometimes it is better to recalculate all RDD. At the level with a suffix of _ 2, all data must be copied and sent to other nodes. Data replication and network transmission can cause greater performance overhead, which is not recommended unless the high availability of the job is required.

Principle 4: avoid using shuffle class operators as much as possible

If possible, try to avoid using shuffle class operators. Because the most performance-consuming part of a Spark job is the shuffle process. The shuffle process, to put it simply, is to pull the same key distributed on multiple nodes in the cluster to the same node for operations such as aggregation or join. For example, operators such as reduceByKey and join will trigger shuffle operations.

During the shuffle process, the same key on each node is first written to the local disk file, and then other nodes need to pull the same key from the disk file on each node through network transfer. Moreover, when the same key is pulled to the same node for aggregation operation, there may be too much key processed on one node, resulting in insufficient memory and overwriting to the disk file. Therefore, in the shuffle process, there may be a large number of disk file read and write IO operations, as well as data network transmission operations. Disk IO and network data transmission are also the main reasons for the poor performance of shuffle.

Therefore, in our development process, to avoid as much as possible to avoid using reduceByKey, join, distinct, repartition and other shuffle operators, try to use the map class of non-shuffle operators. In this way, Spark jobs with no shuffle operations or only fewer shuffle operations can greatly reduce performance overhead.

Join code example for Broadcast and map

/ / traditional join operations will result in shuffle operations. / / because in both RDD, the same key needs to be pulled to a node through the network, and the join operation is performed by a task. The join operation of val rdd3 = rdd1.join (rdd2) / / Broadcast+map does not cause the shuffle operation. / / use Broadcast to use a RDD with a small amount of data as a broadcast variable. Val rdd2Data = rdd2.collect () val rdd2DataBroadcast = sc.broadcast (rdd2Data) / / in the rdd1.map operator, you can get all the data for rdd2 from rdd2DataBroadcast. / / then traverse, and if it is found that the key of a piece of data in rdd2 is the same as the key of the current data of rdd1, then it is determined that join can be carried out. / / at this point, you can splice the current data of rdd1 with the data that can be connected in rdd2 (String or Tuple) in any way you need. Val rdd3 = rdd1.map (rdd2DataBroadcast...) / / Note that the above operations are recommended only when the amount of data in rdd2 is relatively small (for example, a few hundred megabytes, or one or two gigabytes). / / because a copy of the full data of Executor resides in the memory of each rdd2.

Principle 5: shuffle operations using map-side pre-aggregation

If you must use shuffle operations because of business needs and cannot be replaced by operators of the map class, try to use operators that can be pre-aggregated by map-side.

The so-called map-side preaggregation refers to an aggregation operation for the same key locally on each node, similar to the local combiner in MapReduce. After map-side pre-aggregation, there is only one same key locally for each node, because multiple identical key are aggregated. When other nodes pull the same key on all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing the disk IO and network transmission overhead. Generally speaking, where possible, it is recommended to use reduceByKey or aggregateByKey operators instead of groupByKey operators. Because both the reduceByKey and aggregateByKey operators preaggregate the same key locally to each node using user-defined functions. The groupByKey operator will not carry out pre-aggregation, and the full amount of data will be distributed and transmitted among the nodes of the cluster, so the performance is relatively poor.

For example, the following two pictures are typical examples of word counting based on reduceByKey and groupByKey, respectively. The * figure is the schematic diagram of groupByKey. You can see that when there is no local aggregation, all data will be transferred between cluster nodes. The second figure is the schematic diagram of reduceByKey. You can see that the same key data locally in each node is pre-aggregated before being transferred to other nodes for global aggregation.

Principle 6: use high-performance operators

In addition to the shuffle-related operators have optimization principles, other operators also have corresponding optimization principles.

Use reduceByKey/aggregateByKey instead of groupByKey

For details, see "principle 5: shuffle operations using map-side preaggregation".

Using mapPartitions instead of normal map

MapPartitions class operators, a function call will handle all the data of a partition, rather than a function call to handle one, the performance will be relatively higher. But sometimes there is an OOM (memory overflow) problem when using mapPartitions. Because a single function call will dispose of all the data of a partition, if there is not enough memory, it is impossible to collect too many objects during garbage collection, and an OOM exception is likely to occur. So be careful when using this kind of operation!

Use foreachPartitions instead of foreach

The principle is similar to "using mapPartitions instead of map". It also processes all the data of one partition at a time, rather than one piece of data at a time. In practice, it is found that the operator of foreachPartitions class is very helpful to improve the performance. For example, in the foreach function, if all the data in RDD is written to MySQL, then if it is an ordinary foreach operator, it will be written piece by piece of data, and each function call may create a database connection. At this time, database connections will be created and destroyed frequently, and the performance is very low. However, if you use the foreachPartitions operator to process the data of one partition at a time, then for each partition, just create a database connection, and then perform a batch insert operation, at this time the performance is relatively high. In practice, it is found that the performance of MySQL can be improved by more than 30% for about 10, 000 pieces of data.

Coalesce operation after using filter *

Usually, after the filter operator is executed on a RDD to filter out more data in the RDD (such as more than 30% of the data), it is recommended to use the coalesce operator to manually reduce the number of partition in the RDD and compress the data in the RDD into less partition. Because after the filter, a lot of data will be filtered out in each partition of the RDD. If the subsequent calculation is carried out as usual, the amount of data in the partition processed by each task is not very large, which is a bit of a waste of resources, and the more task processed at this time, the slower the speed may be. So reduce the number of partition with coalesce, compress the data in RDD into less partition, and you can process all the partition with less task. In some scenarios, it will be helpful to improve performance.

Use repartitionAndSortWithinPartitions instead of repartition and sort class operations

RepartitionAndSortWithinPartitions is an operator recommended by the official website of Spark. Officials suggest that if you need to sort after repartition re-partition, you should use repartitionAndSortWithinPartitions operator directly. Because this operator can sort while performing the shuffle operation of re-partition. Shuffle and sort operate at the same time, which may have higher performance than first shuffle and then sort.

Principle 7: broadcast large variables

Sometimes in the development process, you will encounter scenarios where external variables need to be used in operator functions (especially large variables, such as large collections of more than 100m), so you should use Spark's Broadcast feature to improve performance.

When an external variable is used in an operator function, by default, Spark makes multiple copies of the variable and transmits it over the network to task, where each task has a copy of the variable. If the variables themselves are relatively large (such as 100m or even 1G), the performance overhead of a large number of copies of variables transmitted in the network, as well as the frequent GC caused by excessive memory consumption in the Executor of each node, will greatly affect performance.

Therefore, for the above cases, if you use a large external variable, it is recommended to use the broadcast function of Spark to broadcast the variable. The broadcast variable ensures that only one copy of the variable resides in the memory of each Executor, and that the copy of the variable in the Executor is shared when the task in the Executor executes. In this way, the number of copies of variables can be greatly reduced, thus reducing the performance overhead of network transmission, reducing the overhead of Executor memory, and reducing the frequency of GC.

Code example for broadcasting large variables

/ / the following code uses external variables in the operator function. / / No special operations have been done at this time, and each task will have a copy of list1. Val list1 =... Rdd1.map (list1...) / / the following code encapsulates list1 as a broadcast variable of type Broadcast. / / in the operator function, when you use broadcast variables, you will first determine whether there is a copy of the variable in the Executor memory where the current task resides. / / if there is one, use it directly; if not, remotely pull a copy from Driver or other Executor nodes and put it in local Executor memory. / / only one copy of the broadcast variable resides in each Executor memory. Val list1 =... Val list1Broadcast = sc.broadcast (list1) rdd1.map (list1Broadcast...)

Principle 8: use Kryo to optimize serialization performance

In Spark, there are three main areas that involve serialization:

When an external variable is used in an operator function, the variable is serialized and transmitted over the network (see principle 7: broadcast large variables).

When you use a custom type as a generic type of RDD (for example, JavaRDD,Student is a custom type), all custom type objects are serialized. Therefore, in this case, the custom class must also implement the Serializable interface.

When using a serializable persistence strategy (such as MEMORY_ONLY_SER), Spark serializes each partition in RDD into a large array of bytes.

For all three places where serialization occurs, we can optimize the performance of serialization and deserialization by using the Kryo serialization class library. By default, Spark uses Java's serialization mechanism, ObjectOutputStream/ObjectInputStream API, for serialization and deserialization. But Spark also supports the use of Kryo serialization libraries, and the performance of Kryo serialization libraries is much higher than that of Java serialization libraries. Officially, the Kryo serialization mechanism is about 10 times better than the Java serialization mechanism. The reason why Spark does not use Kryo as the serialization class library by default is that Kryo requires * * to register all custom types that need to be serialized, so it is troublesome for developers.

The following is a code example that uses Kryo. We just need to set up the serialization class and register the custom type to serialize (for example, the external variable type used in the operator function, the custom type as the RDD generic type, and so on):

/ / create a SparkConf object. Val conf = new SparkConf () .setMaster (...) .setAppName (...) / / sets the serializer to KryoSerializer. Conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") / / registers the custom type to serialize. Conf.registerKryoClasses (Array (classOf [MyClass1], classof [MyClass2]))

Principle 9: optimize the data structure

In Java, there are three types that consume more memory:

Object, each Java object has additional information such as object headers, references, and so on, so it takes up memory space.

Strings, each with an array of characters and additional information such as length.

Collection types, such as HashMap, LinkedList, and so on, because some inner classes are usually used inside collection types to encapsulate collection elements, such as Map.Entry.

Therefore, Spark officially recommends that in the implementation of Spark coding, especially for the code in operator functions, try not to use the above three data structures, try to use strings instead of objects, use primitive types (such as Int, Long) instead of strings, and use arrays instead of collection types, so as to reduce memory footprint as much as possible, thus reducing GC frequency and improving performance.

However, in the author's coding practice, it is not easy to achieve this principle. Because we have to consider the maintainability of the code at the same time, if there is no object abstraction in a code, it is all the way of string concatenation, then it is undoubtedly a great disaster for subsequent code maintenance and modification. Similarly, if all operations are based on arrays instead of using collection types such as HashMap and LinkedList, it is also a great challenge to our coding difficulty and code maintainability. Therefore, the author suggests that, where possible and appropriate, use less memory-intensive data structures, but the premise is to ensure the maintainability of the code.

On Spark performance optimization in the development of tuning is what it is to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report