Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze RDD operator tuning in Spark performance tuning

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to conduct Spark performance tuning RDD operator tuning analysis, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can get something.

Tuning of RDD operator with Spark tuning

No nonsense, just get to the point!

1. RDD reuse

When performing an operator on RDD, avoid repeated calculations of RDD under the same operator and computational logic, as shown in the following figure:

Double calculation of RDD

Modify the RDD computing architecture in the above figure to get the optimization results shown in the following figure:

RDD architecture optimization

two。 Filter as soon as possible

After getting the initial RDD, you should consider filtering out the unwanted data as soon as possible, so as to reduce the footprint of memory and improve the efficiency of Spark jobs.

3. Read a large number of small files-use wholeTextFiles

When we read a text file as RDD, each line we enter becomes an element of RDD.

You can also read multiple complete text files into a pairRDD at once, where the key is the file name and the value is the file content.

Val input:RDD [String] = sc.textFile ("dir/*.log")

If you pass a directory, all files in the directory are read as RDD. The file path supports wildcards.

However, it is not efficient to read a large number of small files, so wholeTextFiles should be used.

The return value is RDD [(String, String)], where Key is the name of the file and Value is the content of the file.

Def wholeTextFiles (path: String, minPartitions: Int = defaultMinPartitions): RDD [(String, String)])

WholeTextFiles reads small files:

Val filesRDD: RDD [(String, String)] = sc.wholeTextFiles ("D:\\ data\\ files", minPartitions = 3) val linesRDD: RDD [String] = filesRDD.flatMap (_. _ 2.split ("\\ r\\ n") val wordsRDD: RDD [String] = linesRDD.flatMap (_ .split (") wordsRDD.map ((_, 1)). ReduceByKey (_ + _). Collect (). Foreach (println)

4. MapPartition and foreachPartition

MapPartitions

Map (_... (.) Represents each element

MapPartitions (_... (.) An iterator that represents the data of each partition

The ordinary map operator operates on every element in RDD, while the mapPartitions operator operates on every partition in RDD.

If it is an ordinary map operator, assuming that a partition has 10, 000 pieces of data, then the function in the map operator has to execute 10, 000 times, that is, to operate on each element.

Map operator

If it is a mapPartition operator, because a task processes the partition of a RDD, then a task will only execute function,function once to receive all partition data, which is more efficient.

MapPartition operator

For example, when you want to write all the data in RDD through JDBC, if you use the map operator, you need to create a database connection for each element in RDD, which consumes a lot of resources. if you use the mapPartitions operator, you only need to establish a database connection for the data in a partition.

The mapPartitions operator also has some shortcomings: for ordinary map operations, if you deal with one piece of data at a time, if there is not enough memory after processing 2000 pieces of data, then the 2000 pieces of data that have been processed can be garbage collected from memory; but if you use the mapPartitions operator, but when the amount of data is very large, function processes the data of one partition at a time, and if there is not enough memory, it may cause OOM, that is, memory overflow.

Therefore, the mapPartitions operator is suitable for when the amount of data is not particularly large, at this time the use of mapPartitions operator to improve the performance is still good. (when the amount of data is large, once the mapPartitions operator is used, it will be directly OOM.)

In the project, you should first estimate the amount of data in RDD, the amount of data per partition, and the memory resources allocated to each Executor, and if resources permit, consider using the mapPartitions operator instead of map.

ForeachPartition

Rrd.foreache (_... (.) Represents each element

Rrd.forPartitions (_... (.) An iterator that represents the data of each partition

In the production environment, the foreachPartition operator is usually used to write the database. Through the characteristics of the foreachPartition operator, the performance of writing the database can be optimized.

If you use the foreach operator to complete the operation of the database, because the foreach operator traverses every piece of data in RDD, therefore, each piece of data will establish a database connection, which is a great waste of resources. Therefore, we should use the foreachPartition operator for writing to the database.

Very similar to the mapPartitions operator, foreachPartition takes each partition of RDD as a traversal object and processes the data of one partition at a time, that is, if operations related to the database are involved, the data of a partition only needs to create a database connection, as shown in the following figure:

ForeachPartition operator

After using the foreachPartition operator, the following performance improvements can be achieved:

For the function function we wrote, we process the data of an entire partition one at a time

Create a unique database connection for data within a partition

You only need to send the SQL statement and multiple sets of parameters to the database once.

In a production environment, all use the foreachPartition operator to complete database operations. There is a problem with the foreachPartition operator, which is similar to the mapPartitions operator. If the amount of data in a partition is particularly large, it may cause OOM, that is, memory overflow.

5. Filter+coalesce/repartition (reduce partitions)

In the Spark task, we often use the filter operator to filter the data in RDD. In the initial phase of the task, the amount of data loaded from each partition is similar, but once it has been filtered by filter, the amount of data in each partition may vary greatly, as shown in the following figure:

Partition data filtering results

Based on the above picture, we can find two problems:

The amount of data per partition becomes smaller. If the current data is processed according to the previous number of task equal to partition, it will be a waste of task computing resources.

The amount of data in each partition is different, which will lead to different amounts of data to be processed by each task when dealing with each task, which is likely to lead to data skew.

As shown in the figure above, there are only 100 pieces of data left after filtering in the second partition and 800pieces after filtering in the third partition. Under the same processing logic, the data volume handled by the task corresponding to the second partition and the task corresponding to the third partition has reached 8 times the difference, which may also lead to a gap of several times the running speed, that is, the problem of data skew.

In view of the above two problems, we analyze them respectively:

For the first problem, since the amount of data in the partition has become smaller, we hope to redistribute the partition data, for example, to convert the data from the original four partitions to two partitions, so that only the latter two task can be used for processing, thus avoiding the waste of resources.

For the second problem, the solution is very similar to the first problem, redistributing the partitioned data so that the amount of data in each partition is about the same, which avoids the problem of data skew.

So how should we achieve the above solution? We need the coalesce operator.

Both repartition and coalesce can be used for repartitioning. Repartition is only a simple implementation of shuffle as true in the coalesce API. Coalesce does not perform shuffle by default, but can be set by parameters.

Suppose we want to change the original number of partitions A to B by repartitioning, then there are several situations:

1. A > B (most partitions are merged into a few)

There is little difference between An and B

You can use coalesce at this point without the need for a shuffle process.

There is a big difference between An and B.

At this point, you can use coalesce and do not enable the shuffle process, but it will cause poor performance of the merge process, so it is recommended that you set the second parameter of coalesce to true, which is to start the shuffle process.

2.A

< B(少数分区分解为多数分区) 此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。 我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。 注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。 6. 并行度设置 Spark作业中的并行度指各个stage的task的数量。 如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费。 理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。 Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。 Spark作业并行度的设置如下: val conf = new SparkConf().set("spark.default.parallelism", "500") 原则:让 cpu 的 core(cpu 核心数) 充分利用起来, 如有100个 core,那么并行度可以设置为200~300。 7. repartition/coalesce调节并行度 Spark 中虽然可以设置并行度的调节策略,但是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效。 Spark SQL的并行度不允许用户自己指定,Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度,用户自己通 spark.default.parallelism 参数指定的并行度,只会在没Spark SQL的stage中生效。 由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行速度非常快。 为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子。 repartition 算子使用前后对比图如下: repartition 算子使用前后对比图 Spark SQL这一步的并行度和task数量肯定是没有办法去改变了,但是,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再涉及Spark SQL,因此stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的前后对比如上图所示。 8. reduceByKey本地预聚合 reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。 reduceByKey算子的执行过程如下图所示: reduceByKey 算子执行过程 使用reduceByKey对性能的提升如下: 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用; 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量; 本地聚合后,在reduce端进行数据缓存的内存占用减少; 本地聚合后,在reduce端进行聚合的数据量减少。 基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey。 groupByKey与reduceByKey的运行原理如下图1和图2所示:

Graph 1:groupByKey principle

Graph 2:reduceByKey principle

According to the figure above, groupByKey does not aggregate the data on the map side. Instead, it shuffle all the data on the map side to the reduce side, and then aggregates the data on the reduce side. Because reduceByKey has the characteristic of aggregation at the map end, the amount of data transmitted by the network is reduced, so the efficiency is obviously higher than that of groupByKey.

9. Use persistence + checkpoint

Spark persistence is fine in most cases, but sometimes the data may be lost. If the data is lost, the lost data needs to be recalculated, cached and used after calculation. In order to avoid data loss, you can choose to checkpoint the RDD, that is, persist a copy of the data to a fault-tolerant file system (such as HDFS).

After a RDD cache is checkpoint, if it is found that the cache is missing, it will first check whether the checkpoint data exists, and if so, the checkpoint data will be used instead of recalculating. In other words, checkpoint can be regarded as the guarantee mechanism of cache, and if cache fails, the data of checkpoint is used.

The advantage of using checkpoint is to improve the reliability of the Spark job. Once there is a problem with the cache, there is no need to recalculate the data. The disadvantage is that the checkpoint needs to write the data to the file system such as HDFS, which consumes a lot of performance.

The persistence settings are as follows:

Sc.setCheckpointDir ('HDFS') rdd.cache/persist (memory_and_disk) rdd.checkpoint

10. Use broadcast variabl

By default, if an external variable is used in the operator in task, each task gets a copy of the variable, which consumes a lot of memory. On the one hand, if RDD is persisted later, RDD data may not be stored in memory, but can only be written to disk, and disk IO will seriously consume performance; on the other hand, when creating objects, task may find that heap memory cannot hold newly created objects, which will cause frequent GC,GC to stop worker threads, which will cause Spark to suspend work for a period of time, seriously affecting Spark performance.

Suppose the current task is configured with 20 Executor, specified 500m task, and a 20m variable is shared by all task. In this case, 500replicas are produced in 500task, which consumes 10GB of memory in the cluster. If broadcast variables are used, each Executor saves a copy, consuming a total of 400MB of memory, which reduces memory consumption by five times.

The broadcast variable holds a copy of each Executor, and all task of this Executor share this broadcast variable, which greatly reduces the number of copies produced by the variable.

In the initial phase, the broadcast variable has only one copy in Driver. When task is running, if you want to use the data in the broadcast variable, you will first try to obtain the variable in the BlockManager corresponding to your local Executor. If it is not available locally, BlockManager will remotely pull a copy of the variable from the BlockManager of Driver or other nodes and be managed by the local BlockManager. Then all task of this Executor will obtain the variable directly from the local BlockManager.

Data that may be shared by multiple Task can be broadcast to each Executor:

Val broadcast variable name = sc.broadcast (the variable that will be used by each Task, that is, the variable that needs to be broadcast) broadcast variable name .value / / get broadcast variable

11. Serialization using Kryo

By default, Spark uses Java's serialization mechanism. The serialization mechanism of Java is easy to use and does not need additional configuration, and the variables used in the operator can implement the Serializable interface. However, the efficiency of Java serialization mechanism is not high, the serialization speed is slow and the serialized data still takes up a large amount of space.

Spark officially claims that the performance of the Kryo serialization mechanism is about 10 times higher than that of the Java serialization mechanism. Spark does not use Kryo as the serialization class library by default because it does not support the serialization of all objects. At the same time, Kryo requires users to register the types that need to be serialized before use, which is not convenient, but since version 2.0.0 of Spark Shuffling RDDs of simple types, arrays of simple types, and string types already use Kryo serialization by default.

The code of Kryo serialization registration method is as follows:

Public class MyKryoRegistrator implements KryoRegistrator {@ Override public void registerClasses (Kryo kryo) {kryo.register (StartupReportLogs.class);}}

The code to configure the Kryo serialization method is as follows:

/ / create SparkConf object val conf = new SparkConf () .setMaster (…) .setAppName (...) / / use the Kryo serialization library conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); / / register the custom class collection conf.set ("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator") in the Kryo serialization library; is it helpful to read the above? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report