In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What this article shares with you is about how to tune the offline performance of sparkcore. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
1. Conventional tuning 1.1 optimal resource allocation
The first step of Spark performance tuning is to allocate more resources to tasks. To a certain extent, increasing the allocation of resources is proportional to the improvement of performance. After achieving the optimal resource allocation, we will consider the performance tuning strategy discussed later.
Resource allocation is specified when using scripts to submit Spark tasks, and standard Spark task submission scripts such as code listings:
/ usr/opt/modules/spark/bin/spark-submit
-- class com.atguigu.spark.Analysis
-- num-executors 80
-- driver-memory 6g
-- executor-memory 6g
-- executor-cores 3
/ usr/opt/modules/spark/jar/spark.jar\
* name * * description * *-number of num-executors* configuration Executor *-driver-memory* configuration Driver memory (little impact) *-executor-memory* configuration memory size per Executor *-executor-cores* configuration number of CPU core per Executor
Adjustment principle: try to adjust the resources assigned by the task to the maximum amount of resources that can be used.
For the allocation of specific resources, we discuss the two Cluste operation modes of Spark:
The first is Spark Standalone mode. Before submitting the task, you must know or can get the resources you can use from the operation and maintenance department. When writing submit scripts, allocate resources according to the resources available. For example, the cluster has 15 machines, each machine has 8 GB of memory and 2 CPU core. Then specify 15 Executor, each Executor allocates 8 GB of memory, 2 CPU core.
The second is Spark Yarn mode, because Yarn uses resource queues for resource allocation and scheduling, when writing submit scripts, resources are allocated according to the resource queue to which the Spark job is submitted. For example, if the resource queue has 400 GB of memory and 100 CPU core, then specify 50 Executor, each Executor allocates 8 GB of memory and 2 CPU core.
Performance improvement after resource adjustment
* name * * Resolution * * increase the number of Executor * if resources permit, increasing the number of Executor can improve the parallelism of executing task. For example, if there are 4 Executor and 2 CPU core for each Executor, then 8 task can be executed in parallel. If the number of Executor is increased to 8 (resources permit), then 16 task can be executed in parallel, which doubles the parallelism capability. * increase the number of CPU core per Executor * if resources permit, increasing the number of Cpu core per Executor can improve the parallelism of executing task. For example, if there are 4 Executor and 2 CPU core for each Executor, then 8 task can be executed in parallel. If the number of CPU core per Executor is increased to 4 (resources permit), then 16 task can be executed in parallel, which doubles the parallelism capability. * increase the amount of memory per Executor * when resources permit, after increasing the amount of memory per Executor, there are three points to improve performance: 1. More data can be cached (that is, cache the RDD), and the data written to the disk can be reduced accordingly, even without writing to the disk, reducing the possible disk IO; 2. 5. It can provide more memory for shuffle operation, that is, there is more space to store the data pulled by the reduce side, and the data written to the disk is reduced accordingly, even without writing to the disk, which reduces the possible disk IO; 3. More memory can be provided for the execution of task. Many objects may be created during the execution of task. A small amount of memory will cause frequent GC. After increasing memory, frequent GC can be avoided and the overall performance can be improved. 1.2 RDD Optimization 1.2.1 RDD Multiplexing
When performing operators on RDD, it is necessary to avoid repeated calculations of RDD under the same operator and computational logic.
1.2.2 RDD persistence
In Spark, when the operator operation is performed on the same RDD many times, the ancestor RDD of the RDD must be recalculated each time. This situation must be avoided. The repeated calculation of the same RDD is a great waste of resources. Therefore, the RDD that is used many times must be persisted, and the data of the public RDD must be cached to memory / disk through persistence. After that, the calculation of the public RDD fetches the RDD data directly from memory / disk. For RDD persistence, there are two points to note: 1. RDD persistence can be serialized, when memory can not store RDD data completely, you can consider using serialization to reduce the data volume and store the data completely in memory.
If the reliability of the data is high and there is plenty of memory, you can use the copy mechanism to persist the RDD data. When the replica mechanism is enabled for persistence, a copy is stored for each persistent data unit and placed on other nodes, thus achieving fault tolerance of the data. Once a copy of the data is lost, there is no need to recalculate. Another copy can be used.
1.2.3 do filter as early as possible for RDD
After getting the initial RDD, you should consider filtering out the unwanted data as soon as possible, so as to reduce the footprint of memory and improve the efficiency of Spark jobs.
1.3 Adjustment of parallelism
Parallelism in Spark jobs refers to the number of task for each stage.
If the parallelism setting is unreasonable and the parallelism is too low, it will lead to a great waste of resources, for example, 20 Executor, each Executor allocates 3 CPU core, while the Spark job has 40 task, so the number of task allocated to each Executor is 2, which makes each Executor have one CPU core idle, resulting in a waste of resources.
The ideal parallelism setting should be to match the parallelism with the resources. To put it simply, if the resources permit, the parallelism should be set as large as possible so that the cluster resources can be fully utilized. Reasonable setting of parallelism can improve the performance and running speed of the whole Spark job.
Spark officially recommends that the number of task should be set to 2-3 times the total number of CPU core of Spark jobs. The reason why it is not recommended that the number of task is equal to the total number of CPU core is that the execution time of task is different, some task execution speed is fast and some task execution speed is slow. If the number of task is equal to the total number of CPU core, then when the fast task execution is completed, the CPU core will be idle. * if the number of task is set to * * 2 to 3 times the total number of CPU core, then after one task is executed, CPU core will execute the next task immediately, reducing the waste of resources and improving the efficiency of Spark job operation. *
The setting of Spark job parallelism is shown in the code:
New SparkConf () .set ("spark.default.parallelism", "500") 1.4 broadcast large variables
By default, if an external variable is used in the operator in task, each task gets a copy of the variable, which consumes a lot of memory. -on the one hand, if RDD is persisted later, RDD data may not be stored in memory, but can only be written to disk, and disk IO will seriously consume performance;-on the other hand, when creating objects, task may find that heap memory cannot store newly created objects, which will cause frequent GC,GC to stop worker threads, which will cause Spark to suspend work for a period of time, seriously affecting Spark performance.
Suppose the current task is configured with 20 Executor, specified 500m task, and a 20m variable is shared by all task. In this case, 500replicas are produced in 500task, which consumes 10GB of memory in the cluster. If broadcast variables are used, each Executor saves a copy, consuming a total of 400MB of memory, which reduces memory consumption by five times.
The broadcast variable holds a copy of each Executor, and all task of this Executor share this broadcast variable, which greatly reduces the number of copies produced by the variable.
1.5 modify serialization mode
By default, Spark uses Java's serialization mechanism. The serialization mechanism of Java is easy to use and does not need additional configuration, and the variables used in the operator can implement the Serializable interface. However, the efficiency of Java serialization mechanism is not high, the serialization speed is slow and the serialized data still takes up a large amount of space.
The performance of Kryo serialization mechanism is about 10 times higher than that of Java serialization mechanism. Spark does not use Kryo as the serialization class library by default because it does not support the serialization of all objects. At the same time, Kryo requires users to register the types that need to be serialized before use, which is not convenient. But from the Spark 2.0.0 version, Shuffling RDDs of simple type, simple type array and string type has used Kryo serialization by default.
Public class MyKryoRegistrator implements KryoRegistrator {@ Override public void registerClasses (Kryo kryo) {kryo.register (StartupReportLogs.class);}} / / create a SparkConf object val conf = new SparkConf () .setMaster (…) .setAppName (...) / / use the Kryo serialization library. If you want to use the Java serialization library, you need to shield the line from conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); / / register the custom class collection in the Kryo serialization library; if you want to use the Java serialization library, you need to block the line from conf.set ("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator"); 1.6 adjust the localization wait time
While the Spark job is running, the Driver allocates the task for each stage. According to Spark's task allocation algorithm, Spark wants task to run on the node where the data it wants to calculate (the idea of data localization), so that the network transmission of data can be avoided.
Generally speaking, task may not be assigned to the node where the data it processes, because the resources available to these nodes may have been exhausted. In this case, Spark will wait for a period of time, default to 3s. If it still cannot run on the specified node after waiting for a specified time, it will automatically downgrade and try to assign task to the node corresponding to the worse localization level. For example, assign the task to a node that is closer to the data it wants to calculate, and then calculate it, and if the current level still fails, then continue to downgrade.
Data transfer occurs when the data to be processed by task is not on the node where the task is located. Task will obtain the data through the BlockManager of the node where it belongs. When BlockManager finds that the data is not local, it will obtain the data from the BlockManager of the node where the data is located through the network transport component.
We do not want to see the situation of network data transmission, a large number of network transmission will seriously affect the performance, therefore, we hope that by adjusting the localization waiting time, if the target node processes part of the task within the waiting time, then the current task will have a chance to be executed, which can improve the overall performance of the Spark job.
Table 2-3 Spark Localization level
* name * * Resolution * * PROCESS_LOCAL* process is localized, task and data are in the same Executor, and the performance is the best. * NODE_LOCAL* node is localized, task and data are in the same node, but task and data are not in the same Executor, and data needs to be transferred between processes. * RACK_LOCAL* rack localization, task and data are on two nodes of the same rack, and data needs to be transferred between nodes through the network. * for task, NO_PREF* is the same wherever it is obtained, and there is no difference between good and bad. * ANY*task and data can be anywhere in the cluster, and not in the same rack, with the worst performance.
During the development phase of the Spark project, you can use client mode to test the program. At this time, you can see more complete log information locally, and there is a clear level of task data localization in the log information. If most of them are PROCESS_LOCAL, then there is no need to adjust them. But if you find that many levels are NODE_LOCAL and ANY, then you need to adjust the localization waiting time by extending the localization waiting time. See if the localization level of task has improved, and see if the running time of Spark jobs has been reduced. Note that it is not enough to extend the localization wait time too long, which leads to an increase in the running time of the Spark job because of the large amount of wait time.
Val conf = new SparkConf (). Set ("spark.locality.wait", "6") / / configure this one, all have the equivalent of 2. 0. Operator tuning 2.1 mapPartitions
The ordinary map operator operates on every element in RDD, while the mapPartitions operator operates on every partition in RDD.
2.2 foreachPartition optimizes database operations
After using the foreachPartition operator, the following performance improvements can be achieved:
1. For the function function we wrote, we process the data of an entire partition one at a time
two。 Create a unique database connection for data within a partition
3. You only need to send the SQL statement and multiple sets of parameters to the database once.
In a production environment, all use the foreachPartition operator to complete database operations. There is a problem with the foreachPartition operator, which is similar to the mapPartitions operator. If the amount of data in a partition is particularly large, it may cause OOM, that is, memory overflow.
2.3 use of filter in conjunction with coalesce
In the Spark task, we often use the filter operator to filter the data in RDD. In the initial stage of the task, the amount of data loaded from each partition is similar, but once it has been filtered by filter, the amount of data in each partition may be quite different.
Both repartition and coalesce can be used for repartitioning. Repartition is only a simple implementation of shuffle as true in the coalesce API. Coalesce does not perform shuffle by default, but can be set by parameters.
2.4 repartition solves the problem of low parallelism in SparkSQL
In the general performance tuning in the first section, we explained the adjustment strategy of parallelism, but the setting of parallelism does not work for Spark SQL, and the parallelism set by users only works for stage of all Spark except Spark SQL.
Users are not allowed to specify the parallelism of Spark SQL. By default, Spark SQL automatically sets the parallelism of the stage in which the Spark SQL resides based on the number of split in the HDFS file corresponding to the hive table. The parallelism specified by the user through the spark.default.parallelism parameter will only take effect in the stage without Spark SQL.
Since the parallelism of the stage where the Spark SQL is located cannot be set manually, if the amount of data is large, and the subsequent transformation operations in this stage have complex business logic, and the number of task automatically set by Spark SQL is very small, this means that each task has to deal with a large amount of data, and then have to execute very complex processing logic, which may show that the first stage with Spark SQL is very slow. The subsequent stage without Spark SQL runs very fast.
To solve the problem that Spark SQL cannot set the degree of parallelism and the number of task, we can use the repartition operator.
2.5 reduceByKey prepolymerization
Similar to mapreduce's combiner, it can achieve local preaggregation, reduce the amount of data transmitted by shuffle, and improve performance.
3.shuffle tuning 3.1 adjusts the buffer size on the map side
During the running of the Spark task, if the amount of data processed by the map side of the shuffle is relatively large, but the size of the buffer on the map side is fixed, there may be frequent spill overwrites of the buffered data on the map side to the disk files, resulting in very poor performance. By adjusting the size of the buffer on the map side, frequent disk IO operations can be avoided, thus improving the overall performance of the Spark task.
The default configuration of buffering on the map side is 32KB. If each task processes 640KB data, 640 + 32 = 20 overwrites will occur. If each task processes 64000KB data, 64000 Universe 32,000 will occur. This will have a very serious impact on performance.
Val conf = new SparkConf () .set ("spark.shuffle.file.buffer", "64") 3.2 adjust the size of the data pull buffer on the reduce side
In the process of Spark Shuffle, the size of the buffer buffer of shuffle reduce task determines the amount of data that reduce task can buffer each time, that is, the amount of data that can be pulled each time. If the memory resources are sufficient, properly increasing the size of the pull data buffer can reduce the number of times to pull data, also reduce the number of network transmission, and then improve performance.
The size of the data pull buffer on the reduce side can be set by spark.reducer.maxSizeInFlight parameter. The default is 48MB.
Val conf = new SparkConf () .set ("spark.reducer.maxSizeInFlight", "96") 3.3 adjust the number of retries to pull data on the reduce side
During the Spark Shuffle process, when reduce task pulls its own data, if it fails due to network anomalies and other reasons, it will automatically retry. For those jobs that include particularly time-consuming shuffle operations, it is recommended to increase the maximum number of retries (such as 60) to avoid data pull failures due to factors such as JVM full gc or network instability. In practice, it is found that for shuffle processes with large amounts of data (billions ~ tens of billions), adjusting this parameter can greatly improve the stability.
The number of retries for pulling data on the reduce side can be set by the spark.shuffle.io.maxRetries parameter, which represents the maximum number of retries that can be made. If the pull is not successful within the specified number of times, it may cause the job execution to fail. The default is 3.
Val conf = new SparkConf () .set ("spark.shuffle.io.maxRetries", "6") 3.4 adjust the wait interval for pulling data on the reduce side
During the Spark Shuffle process, when reduce task pulls its own data, if it fails due to network anomalies and other reasons, it will automatically retry. After a failure, it will wait for a certain interval and then try again. You can increase the stability of the shuffle operation by increasing the interval (for example, 60s).
The waiting interval for pulling data on reduce can be set by parameter spark.shuffle.io.retryWait. The default value is 5s.
Val conf = new SparkConf () .set ("spark.shuffle.io.retryWait", "60s") 3.5 adjusts the SortShuffle sorting operation threshold
For SortShuffleManager, if the number of shuffle reduce task is less than a certain threshold, no sorting operation occurs during the shuffle write process, but the data is written directly in the way of an unoptimized HashShuffleManager, but eventually all temporary disk files generated by each task are merged into a single file and a separate index file is created.
When you use SortShuffleManager, if you really do not need sorting operation, it is recommended to increase this parameter, greater than the number of shuffle read task, then map-side will not sort, reducing the sorting performance overhead, but in this way, a large number of disk files will still be generated, so shuffle write performance needs to be improved. The threshold of SortShuffleManager sorting operation can be set through spark.shuffle.sort. The parameter bypassMergeThreshold is set. The default value is 200.
Val conf = new SparkConf () .set ("spark.shuffle.sort.bypassMergeThreshold", "400") 4.jvm tuning 4.1 to reduce the memory share of cache operations 4.1.1 static memory management mechanism
According to the Spark static memory management mechanism, heap memory is divided into two blocks, Storage and Execution.
Storage is mainly used to cache RDD data and broadcast data, and Execution is mainly used to cache intermediate data generated during shuffle. Storage accounts for 60% of system memory and 20% of system memory, and the two are completely independent. In general, the memory of Storage is provided to cache operations, but in some cases, if the memory of cache operations is not very tight, and there are many objects created in task operators, and Execution memory is relatively small, this leads to frequent minor gc, and even frequent full gc, which in turn causes Spark to stop working frequently, which will have a great impact on performance. You can check the running status of each stage in Spark UI, including the running time of each task, gc time, and so on. If GC is found to be too frequent and the time is too long, you can consider adjusting the memory share of Storage to let task execute operator functions, so that more memory can be used. Storage memory area can be specified by spark.storage.memoryFraction parameter. The default is 0.6, that is, 60%, which can be reduced step by step.
Val conf = new SparkConf () .set ("spark.storage.memoryFraction", "0.4") 4.1.2 Unified memory management mechanism
According to the Spark unified memory management mechanism, heap memory is divided into two blocks, Storage and Execution. Storage is mainly used to cache data, and Execution is mainly used to cache intermediate data generated during the shuffle process. The memory part composed of the two is called unified memory. Storage and Execution each account for 50% of the unified memory. Due to the implementation of the dynamic footprint mechanism, when the memory required by the shuffle process is too large, it will automatically occupy the memory area of the Storage, so there is no need to adjust manually.
4.2 adjust Executor out-of-stack memory
Executor's out-of-heap memory is mainly used for program shared libraries, Perm Space, thread Stack and some Memory mapping, or C-like allocate object.
Sometimes, if your Spark job handles a very large amount of data, reaching hundreds of millions of data, running Spark jobs will report errors from time to time, such as shuffle output file cannot find,executor lost,task lost,out of memory, etc., which may be due to insufficient out-of-heap memory in Executor, resulting in memory overflow in the process of Executor running.
When stage's task is running, you may have to pull shuffle map output files from some Executor, but Executor may have been hung up due to memory overflow, and its associated BlockManager is gone, which may report errors such as shuffle output file cannot find,executor lost,task lost,out of memory. At this time, you can consider adjusting Executor's out-of-heap memory to avoid errors. At the same time, when the out-of-heap memory adjustment is relatively large, As far as performance is concerned, it will also bring some improvement.
By default, the upper limit of Executor out-of-heap memory is more than 300 MB. In the actual production environment, there will be problems when dealing with huge amounts of data, resulting in repeated crashes of Spark jobs and failure to run. This parameter will be adjusted to at least 1G, or even 2G or 4G.
The configuration of Executor out-of-heap memory needs to be configured in the spark-submit script.
-- conf spark.executor.memoryOverhead=2048
After the above parameters are configured, some JVM OOM exception problems will be avoided and the performance of the overall Spark job can be improved.
4.3 adjust the connection waiting time
In the process of running a Spark job, Executor first obtains some data from its own locally associated BlockManager. If the local BlockManager does not exist, it will remotely connect to the BlockManager of Executor on other nodes through TransferService to obtain the data.
If task creates a large number of objects or creates large objects during operation, it will take up a lot of memory, which will lead to frequent garbage collection, but garbage collection will cause all the work sites to stop, that is to say, once garbage collection is carried out, the Executor process of Spark will stop working and cannot provide a response. At this time, due to no response, the network connection cannot be established, which will lead to network connection timeout.
In a production environment, errors such as file not found and file lost are sometimes encountered. In this case, it is very likely that Executor's BlockManager cannot establish a connection when pulling data, and then declares the data pull failure after exceeding the default connection waiting time of 120s. If repeated attempts fail to pull data, it may lead to the crash of the Spark job. This situation may also cause DAGScheduler to submit stage,TaskScheduler repeatedly several times and return to submit task several times, greatly increasing the running time of our Spark job.
At this point, you can consider adjusting the connection timeout. The connection wait time needs to be set in the spark-submit script.
-- conf spark.core.connection.ack.wait.timeout=300
After adjusting the waiting time of the connection, you can usually avoid some XX file pull failures, XX file lost and other errors.
The above is how to tune the offline performance of sparkcore, and the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.