Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the method of spark performance tuning

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "what is the method of spark performance tuning". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the method of spark performance tuning".

What resources are allocated? Executor 、 cpu per executor 、 memory per executor 、 driver memory

Where are these resources allocated? When we submit a spark job in a production environment, we use the spark-submit shell script to adjust the corresponding parameters.

/ usr/local/spark/bin/spark-submit\-- class cn.spark.sparktest.core.WordCountCluster\-- num-executors 3\ number of configuration executor-- driver-memory 100m\ configure driver memory (significant impact)-- executor-memory 100m\ configure memory size per executor-- executor-cores 3\ configure number of cpu core per executor / usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar\

How big is the adjustment, which is the largest?

First, Spark Standalone, a set of Spark cluster has been set up on the company cluster. You should know in your heart how much memory and how much cpu core; each machine can still give you. Then, when setting up, adjust the resource allocation of each spark job according to this actual situation. For example, each of your machines can give you 4 gigabytes of memory and 2 cpu core;20 machines; executor,20; has an average of 2 cpu core per executor:4G memory.

The second is Yarn. Resource queue. Resource scheduling. You should check, how many resources are there in your spark job, the resource queue you want to submit to? 500 gigabytes of memory, 100 cpu core;executor,50; average executor:10G memory, 2 cpu core.

Set queue name: spark.yarn.queue default

As a rule, try to adjust the amount of resources you can use to the maximum size (the number of executor ranges from dozens to hundreds; executor memory; executor cpu core)

Why can performance be improved after adjusting resources?

Add executor:

If the number of executor is relatively small, then the number of task that can be executed in parallel is relatively small, which means that the ability of parallel execution of our Application is very weak. For example, if there are 3 executor and each executor has 2 cpu core, then the task that can be executed in parallel at the same time is 6. After the implementation of 6, the next batch of 6 task will be replaced. Increasing the number of executor means that there are more task that can be executed in parallel. For example, it used to be 6, but now it may be possible to execute 10, or even 20, 100 in parallel. Then the parallel ability is several times, tens of times higher than before. Accordingly, performance (the speed of execution) can also be improved several times to dozens of times.

Sometimes the amount of data is relatively small, and the performance will be degraded by adding a lot of task. Why? Just think about it. If you use more, others will use less. )

Increase the cpu core for each executor:

It also increases the parallel ability of execution. There were originally 20 executor, each with only 2 cpu core. The number of task that can be executed in parallel is 40 task. Now the number of cpu core per executor has increased to five. The number of task that can be executed in parallel is 100 task. The speed of execution has been increased by 2.5 times.

SparkContext,DAGScheduler,TaskScheduler will cut our operator into a large number of task.

Submit it to the executor of Application for execution.

Increase the amount of memory per executor:

After increasing the amount of memory, there are three points to improve performance:

1. If you need to cache RDD, more memory will allow you to cache more data, write less data to disk, and even not write to disk. Reduced disk IO.

2. For shuffle operations, reduce requires memory to store the pulled data and aggregate it. If there is not enough memory, it will also be written to disk. If more memory is allocated to executor, there will be less data that needs to be written to disk

It doesn't even need to be written to disk. Reduced disk IO and improved performance.

3. For the execution of task, many objects may be created. If the memory is small, it may frequently cause the JVM heap memory to become full, followed by frequent GC, garbage collection, minor GC, and full GC. The speed is very slow As the memory increases, it brings less GC and garbage collection, which avoids slowing down and getting faster.

What does Spark parallelism mean?

Spark job, Application,Jobs,action (collect) triggers a job,1 job; and each job is split into multiple stage

When a shuffle occurs, a stage,reduceByKey is split.

Stage0val lines = sc.textFile ("hdfs://") val words = lines.flatMap (_ .split (")) val pairs = words.map ((_, 1)) val wordCount = pairs.reduceByKey (_ + _) stage1val wordCount = pairs.reduceByKey (_ + _) wordCount.collect ()

When the task of reduceByKey,stage0 executes to reduceByKey, a file is created for the task of each stage1 (or it may be merged in a small number of files); the task of each stage1 goes to its own file created by each task on each node to pull data; the data pulled by the task of each stage1 must be the corresponding data of the same key. For the same key, the corresponding values can perform our custom function operation (_ + _)

Parallelism: in fact, it refers to the number of task of each stage in the Spark job, which represents the parallelism of the Spark job at each stage (stage).

What happens if parallelism is not adjusted and parallelism is too low?

Task is not set, or there are very few settings, for example, 100 task. There are 50 executor, each executor has 3 cpu core, that is, any stage of your Application has a total of 150 cpu core when running, which can be run in parallel. But now you have only 100 task, evenly distributed, each executor allocated to 2 task,ok, then there are only 100 task running at the same time, and each executor will only run 2 task in parallel. The remaining cpu core of each executor is wasted.

Although you have allocated enough resources, the problem is that the degree of parallelism does not match the resources, resulting in the waste of all the resources you have allocated. A reasonable parallelism setting should be large enough to make full use of your cluster resources; for example, in the above example, the cluster has a total of 150 cpu core and can run 150 task in parallel. Then you should set the parallelism of your Application to at least 150g in order to make full use of your cluster resources and let 150task execute in parallel; and when the number of task is increased to 150G, it can be run in parallel at the same time, and the amount of data to be processed by each task can be reduced; for example, a total of 150G of data needs to be processed, if it is 100G task, each task calculates 1.5G of data. Now it has been increased to 150 task, which can be run in parallel, and each task can mainly handle 1G of data.

In a very simple way, as long as you set the parallelism properly, you can make full use of your cluster computing resources, reduce the amount of data to be processed by each task, and ultimately improve the performance and running speed of your entire Spark job.

The number of task is set to be at least the same as the total number of cpu core of Spark application (ideally, for example, a total of 150 cpu core, with 150 task assigned, running together, running at about the same time)

Officially, it is recommended that the number of task should be set to 2 to 3 times the total number of cpu core of spark application, for example, 150 cpu core. Basically, the number of task should be set to 300,500. The actual situation is different from the ideal situation, some task will run faster, such as 50s, some task may be a little slower, it will take a minute and a half to finish, so if your number of task is exactly the same as the number of cpu core, it may still lead to a waste of resources, because, for example, 150task,10 is finished first, and the remaining 140s are still running, but at this time There are 10 cpu core available, which leads to waste. Well, if the number of task is set to 2 to 3 times the total number of cpu core, then after one task is run, another task can be made up immediately, so that the cpu core is not idle as far as possible. At the same time, it is also trying to improve the efficiency and speed of spark jobs, and improve performance.

How to set the parallelism of a Spark Application?

Spark.default.parallelism SparkConf conf = new SparkConf () .set ("spark.default.parallelism", "500")

By default, executing an operator on a RDD multiple times to get different RDD; will recalculate all of the RDD and the previous parent RDD. The situation of reading HDFS- > RDD1- > RDD2-RDD4 is absolute and must be avoided. Once a RDD double calculation occurs, it will lead to a sharp decline in performance. For example, if the time for HDFS- > RDD1-RDD2 is 15 minutes, you have to walk twice at this time, which becomes 30 minutes.

RDD architecture refactoring and optimization try to reuse RDD. Similar RDD can be extracted as a common RDD for repeated use in subsequent RDD calculations.

Public RDD must be persistent. It's like eating dumplings in the north, making them and cooking them. Here you are. Order a plate of dumplings. Stuffing + dumpling skin + water-> wrapped dumplings, to the wrapped dumplings to boil, boiled, you need cooked, hot dumplings. In real life, dumplings are now cooked, of course, is the best. But in Spark, RDD has to cook it now, which is a fatal disaster. For public RDD that needs to be calculated and used multiple times, be sure to persist. Persistence, that is, caching RDD data to memory / disk, (BlockManager). No matter how many times the RDD is calculated later, the persistent data of the RDD will be taken directly, for example, a piece of data will be extracted directly from memory or disk.

Persistence can be serialized if the data is normally persisted in memory, it may result in excessive memory footprint, which may lead to an OOM memory overflow. When pure memory cannot support the full storage of public RDD data, it is preferred to use serialization to store it in pure memory. Serialize the data of each partition of RDD into a large array of bytes, just one object; after serialization, the memory footprint is greatly reduced. The only drawback to serialization is that it needs to be deserialized when getting data. If serialization of pure memory mode still results in OOM, memory overflow; you can only consider disk mode, memory + disk normal way (no serialization). Memory + disk, serialization.

For the sake of high reliability of data and sufficient memory, we can use the double copy mechanism to persist the double copy mechanism, a copy after persistence, because the machine is down and the copy is lost, it still has to be recalculated; each persistent data unit stores a copy and puts it on other nodes; thus fault tolerance is carried out. One copy is lost, there is no need to recalculate, another copy can be used. In this way, only for your extremely abundant memory resources.

Persistence, which is simple, calls the persist () method on RDD and passes in a persistence level

If it is persist (StorageLevel.MEMORY_ONLY ()), pure memory, unserialized, then you can use the cache () method instead.

StorageLevel.MEMORY_ONLY_SER (), second choice

StorageLevel.MEMORY_AND_DISK (), third choice

StorageLevel.MEMORY_AND_DISK_SER (), fourth choice

StorageLevel.DISK_ONLY (), fifth choice

If there is enough memory, to use the double copy high reliability mechanism, choose the policy with suffix _ 2.

StorageLevel.MEMORY_ONLY_2 ()

Thank you for reading, the above is the content of "what is the method of spark performance tuning". After the study of this article, I believe you have a deeper understanding of what the method of spark performance tuning is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report