Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to tune RDD operator in Spark performance tuning

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to tune the RDD operator in Spark performance tuning, the content of the article is of high quality, so the editor shares it for you to do a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

Tuning of RDD operator with Spark tuning

No nonsense, just get to the point!

1. RDD reuse

When performing an operator on RDD, avoid repeated calculations of RDD under the same operator and computational logic, as shown in the following figure:

Double calculation of RDD

Modify the RDD computing architecture in the above figure to get the optimization results shown in the following figure:

RDD Architecture Optimization II. Filter as soon as possible

After getting the initial RDD, you should consider filtering out the unwanted data as soon as possible, so as to reduce the footprint of memory and improve the efficiency of Spark jobs.

3. Read a large number of small files-use wholeTextFiles

When we read a text file as RDD, each line we enter becomes an element of RDD.

You can also read multiple complete text files into a pairRDD at once, where the key is the file name and the value is the file content.

Val input:RDD [String] = sc.textFile ("dir/*.log")

If you pass a directory, all files in the directory are read as RDD. The file path supports wildcards.

However, it is not efficient to read a large number of small files, so wholeTextFiles should be used.

The return value is RDD [(String, String)], where Key is the name of the file and Value is the content of the file.

Def wholeTextFiles (path: String, minPartitions: Int = defaultMinPartitions): RDD [(String, String)])

WholeTextFiles reads small files:

Val filesRDD: RDD [(String, String)] =

Sc.wholeTextFiles ("D:\\ data\\ files", minPartitions = 3)

Val linesRDD: RDD [String] = filesRDD.flatMap (_. _ 2.split ("\\ r\\ n"))

Val wordsRDD: RDD [String] = linesRDD.flatMap (_ .split (""))

WordsRDD.map ((_, 1)). ReduceByKey (_ + _). Collect (). Foreach (println)

4. MapPartition and foreachPartition

MapPartitions

Map (_... (.) Represents each element

MapPartitions (_... (.) An iterator that represents the data of each partition

The ordinary map operator operates on every element in RDD, while the mapPartitions operator operates on every partition in RDD.

If it is an ordinary map operator, assuming that a partition has 10, 000 pieces of data, then the function in the map operator has to execute 10, 000 times, that is, to operate on each element.

Map operator

If it is a mapPartition operator, because a task processes the partition of a RDD, then a task will only execute function,function once to receive all partition data, which is more efficient.

MapPartition operator

For example, when you want to write all the data in RDD through JDBC, if you use the map operator, you need to create a database connection for each element in RDD, which consumes a lot of resources. if you use the mapPartitions operator, you only need to establish a database connection for the data in a partition.

The mapPartitions operator also has some shortcomings: for ordinary map operations, if you deal with one piece of data at a time, if there is not enough memory after processing 2000 pieces of data, then the 2000 pieces of data that have been processed can be garbage collected from memory; but if you use the mapPartitions operator, but when the amount of data is very large, function processes the data of one partition at a time, and if there is not enough memory, it may cause OOM, that is, memory overflow.

Therefore, the mapPartitions operator is suitable for when the amount of data is not particularly large, at this time the use of mapPartitions operator to improve the performance is still good. (when the amount of data is large, once the mapPartitions operator is used, it will be directly OOM.)

In the project, you should first estimate the amount of data in RDD, the amount of data per partition, and the memory resources allocated to each Executor, and if resources permit, consider using the mapPartitions operator instead of map.

ForeachPartition

Rrd.foreache (_... (.) Represents each element

Rrd.forPartitions (_... (.) An iterator that represents the data of each partition

In the production environment, the foreachPartition operator is usually used to write the database. Through the characteristics of the foreachPartition operator, the performance of writing the database can be optimized.

If you use the foreach operator to complete the operation of the database, because the foreach operator traverses every piece of data in RDD, therefore, each piece of data will establish a database connection, which is a great waste of resources. Therefore, we should use the foreachPartition operator for writing to the database.

Very similar to the mapPartitions operator, foreachPartition takes each partition of RDD as a traversal object and processes the data of one partition at a time, that is, if operations related to the database are involved, the data of a partition only needs to create a database connection, as shown in the following figure:

ForeachPartition operator

After using the foreachPartition operator, the following performance improvements can be achieved:

For the function function we wrote, we process the data of an entire partition one at a time

Create a unique database connection for data within a partition

You only need to send the SQL statement and multiple sets of parameters to the database once.

In a production environment, all use the foreachPartition operator to complete database operations. There is a problem with the foreachPartition operator, which is similar to the mapPartitions operator. If the amount of data in a partition is particularly large, it may cause OOM, that is, memory overflow.

5. Filter+coalesce/repartition (reduce partitions)

In the Spark task, we often use the filter operator to filter the data in RDD. In the initial phase of the task, the amount of data loaded from each partition is similar, but once it has been filtered by filter, the amount of data in each partition may vary greatly, as shown in the following figure:

Partition data filtering results

Based on the above picture, we can find two problems:

The amount of data per partition becomes smaller. If the current data is processed according to the previous number of task equal to partition, it will be a waste of task computing resources.

The amount of data in each partition is different, which will lead to different amounts of data to be processed by each task when dealing with each task, which is likely to lead to data skew.

As shown in the figure above, there are only 100 pieces of data left after filtering in the second partition and 800pieces after filtering in the third partition. Under the same processing logic, the data volume handled by the task corresponding to the second partition and the task corresponding to the third partition has reached 8 times the difference, which may also lead to a gap of several times the running speed, that is, the problem of data skew.

In view of the above two problems, we analyze them respectively:

For the first problem, since the amount of data in the partition has become smaller, we hope to redistribute the partition data, for example, to convert the data from the original four partitions to two partitions, so that only the latter two task can be used for processing, thus avoiding the waste of resources.

For the second problem, the solution is very similar to the first problem, redistributing the partitioned data so that the amount of data in each partition is about the same, which avoids the problem of data skew.

So how should we achieve the above solution? We need the coalesce operator.

Both repartition and coalesce can be used for repartitioning. Repartition is only a simple implementation of shuffle as true in the coalesce API. Coalesce does not perform shuffle by default, but can be set by parameters.

Suppose we want to change the original number of partitions A to B by repartitioning, then there are several situations:

A > B (most partitions are merged into a few)

There is little difference between An and B

You can use coalesce at this point without the need for a shuffle process.

There is a big difference between An and B.

At this point, you can use coalesce and do not enable the shuffle process, but it will cause poor performance of the merge process, so it is recommended that you set the second parameter of coalesce to true, which is to start the shuffle process.

You can use repartition at this time. If you use coalesce, you need to set shuffle to true, otherwise coalesce is invalid.

After the filter operation, we can use the coalesce operator to compress the number of partition according to the different amount of data in each partition, and make the amount of data in each partition as uniform and compact as possible, so that the later task can calculate, which can improve the performance to some extent.

Note: local mode is an in-process simulation of cluster operation, which has been internally optimized for parallelism and the number of partitions, so there is no need to set parallelism and number of partitions.

6. Parallelism setting

Parallelism in Spark jobs refers to the number of task for each stage.

If the parallelism setting is unreasonable and the parallelism is too low, it will lead to a great waste of resources, for example, 20 Executor, each Executor allocates 3 CPU core, while the Spark job has 40 task, so the number of task allocated to each Executor is 2, which makes each Executor have one CPU core idle, resulting in a waste of resources.

The ideal parallelism setting should be to match the parallelism with the resources. To put it simply, if the resources permit, the parallelism should be set as large as possible so that the cluster resources can be fully utilized. Reasonable setting of parallelism can improve the performance and running speed of the whole Spark job.

Spark officially recommends that the number of task should be set to 2-3 times the total number of CPU core of Spark jobs. The reason why it is not recommended that the number of task is equal to the total number of CPU core is that the execution time of task is different, some task execution speed is fast and some task execution speed is slow. If the number of task is equal to the total number of CPU core, then when the fast task execution is completed, the CPU core will be idle. If the number of task is set to 2-3 times the total number of CPU core, then after the execution of one task, CPU core will immediately execute the next task, which reduces the waste of resources and improves the efficiency of Spark job operation.

The setting of Spark job parallelism is as follows:

Val conf = new SparkConf () .set ("spark.default.parallelism", "500")

Principle: make full use of cpu's core (number of cpu cores). If there are 100 core, then the parallelism can be set to 200. 300.

7. Repartition/coalesce adjusts parallelism

Although the adjustment policy of parallelism can be set in Spark, the setting of parallelism is not effective for Spark SQL, and the parallelism set by users is only valid for stage of all Spark except Spark SQL.

Users are not allowed to specify the parallelism of Spark SQL. By default, Spark SQL automatically sets the parallelism of the stage in which the Spark SQL resides based on the number of split in the HDFS file corresponding to the hive table. The parallelism specified by the user through the spark.default.parallelism parameter will only take effect in the stage without Spark SQL.

Since the parallelism of the stage where the Spark SQL is located cannot be set manually, if the amount of data is large, and the subsequent transformation operations in this stage have complex business logic, and the number of task automatically set by Spark SQL is very small, this means that each task has to deal with a large amount of data, and then have to execute very complex processing logic, which may show that the first stage with Spark SQL is very slow. The subsequent stage without Spark SQL runs very fast.

To solve the problem that Spark SQL cannot set the degree of parallelism and the number of task, we can use the repartition operator.

The comparison figure before and after using the repartition operator is as follows:

Comparison figure before and after using repartition operator

There is certainly no way to change the parallelism and the number of task in this step of Spark SQL. However, for the RDD queried by Spark SQL, immediately use the repartition operator to re-partition, so that you can re-partition to multiple partition. From the RDD operation after repartition, since Spark SQL is no longer involved, the parallelism of stage will be equal to the value you manually set. This avoids that the stage where Spark SQL is located can only use a small amount of task to process large amounts of data and execute complex algorithmic logic. The front and back pairs using the repartition operator are shown in the figure above.

8. ReduceByKey local prepolymerization

Compared with ordinary shuffle operations, a remarkable feature of reduceByKey is that it aggregates locally on the map side. The map side will first combine the local data, and then write the data to the file created by each task of the next stage, that is, on the map side, execute the reduceByKey operator function for the value corresponding to each key.

The execution of the reduceByKey operator is shown in the following figure:

ReduceByKey operator execution process

The performance gains with reduceByKey are as follows:

After local aggregation, the amount of data on the map side is reduced, the disk IO is reduced, and the disk space is also reduced.

After local aggregation, the amount of data pulled by the next stage becomes less, which reduces the amount of data transmitted by the network.

After local aggregation, the memory footprint of data caching on the reduce side is reduced.

After local aggregation, the amount of data aggregated on the reduce side is reduced.

Based on the local aggregation characteristics of reduceByKey, we should consider using reduceByKey instead of other shuffle operators, such as groupByKey.

The operation of groupByKey and reduceByKey is shown in figures 1 and 2:

Diagram 1:groupByKey schematic diagram 2:reduceByKey principle

According to the figure above, groupByKey does not aggregate the data on the map side. Instead, it shuffle all the data on the map side to the reduce side, and then aggregates the data on the reduce side. Because reduceByKey has the characteristic of aggregation at the map end, the amount of data transmitted by the network is reduced, so the efficiency is obviously higher than that of groupByKey.

9. Use persistence + checkpoint

Spark persistence is fine in most cases, but sometimes the data may be lost. If the data is lost, the lost data needs to be recalculated, cached and used after calculation. In order to avoid data loss, you can choose to checkpoint the RDD, that is, persist a copy of the data to a fault-tolerant file system (such as HDFS).

After a RDD cache is checkpoint, if it is found that the cache is missing, it will first check whether the checkpoint data exists, and if so, the checkpoint data will be used instead of recalculating. In other words, checkpoint can be regarded as the guarantee mechanism of cache, and if cache fails, the data of checkpoint is used.

The advantage of using checkpoint is to improve the reliability of the Spark job. Once there is a problem with the cache, there is no need to recalculate the data. The disadvantage is that the checkpoint needs to write the data to the file system such as HDFS, which consumes a lot of performance.

The persistence settings are as follows:

Sc.setCheckpointDir ('HDFS')

Rdd.cache/persist (memory_and_disk)

Rdd.checkpoint

10. Use broadcast variabl

By default, if an external variable is used in the operator in task, each task gets a copy of the variable, which consumes a lot of memory. On the one hand, if RDD is persisted later, RDD data may not be stored in memory, but can only be written to disk, and disk IO will seriously consume performance; on the other hand, when creating objects, task may find that heap memory cannot hold newly created objects, which will cause frequent GC,GC to stop worker threads, which will cause Spark to suspend work for a period of time, seriously affecting Spark performance.

Suppose the current task is configured with 20 Executor, specified 500m task, and a 20m variable is shared by all task. In this case, 500replicas are produced in 500task, which consumes 10GB of memory in the cluster. If broadcast variables are used, each Executor saves a copy, consuming a total of 400MB of memory, which reduces memory consumption by five times.

The broadcast variable holds a copy of each Executor, and all task of this Executor share this broadcast variable, which greatly reduces the number of copies produced by the variable.

In the initial phase, the broadcast variable has only one copy in Driver. When task is running, if you want to use the data in the broadcast variable, you will first try to obtain the variable in the BlockManager corresponding to your local Executor. If it is not available locally, BlockManager will remotely pull a copy of the variable from the BlockManager of Driver or other nodes and be managed by the local BlockManager. Then all task of this Executor will obtain the variable directly from the local BlockManager.

Data that may be shared by multiple Task can be broadcast to each Executor:

Val broadcast variable name = sc.broadcast (variables that will be used by each Task, that is, variables that need to be broadcast)

Broadcast variable name .value / / get broadcast variable

11. Serialization using Kryo

By default, Spark uses Java's serialization mechanism. The serialization mechanism of Java is easy to use and does not need additional configuration, and the variables used in the operator can implement the Serializable interface. However, the efficiency of Java serialization mechanism is not high, the serialization speed is slow and the serialized data still takes up a large amount of space.

Spark officially claims that the performance of the Kryo serialization mechanism is about 10 times higher than that of the Java serialization mechanism. Spark does not use Kryo as the serialization class library by default because it does not support the serialization of all objects. At the same time, Kryo requires users to register the types that need to be serialized before use, which is not convenient, but since version 2.0.0 of Spark Shuffling RDDs of simple types, arrays of simple types, and string types already use Kryo serialization by default.

The code of Kryo serialization registration method is as follows:

Public class MyKryoRegistrator implements KryoRegistrator {

@ Override

Public void registerClasses (Kryo kryo) {

Kryo.register (StartupReportLogs.class)

}

}

The code to configure the Kryo serialization method is as follows:

/ / create a SparkConf object

Val conf = new SparkConf () .setMaster (…) .setAppName (...)

/ / use the Kryo serialization library

Conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

/ / register a custom collection of classes in the Kryo serialization library

Conf.set ("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); on how to tune the RDD operator in Spark performance tuning is shared here, I hope the above content can be helpful to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report