In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the method of spark homework tuning". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the method of spark homework tuning".
Overview of tuning
Sometimes, we may encounter one of the thorniest problems in big data's calculation-data skew, and the performance of Spark jobs will be much worse than expected. Data skew tuning is to use a variety of technical solutions to solve different types of data skew problems to ensure the performance of Spark jobs.
How painful is the tilt of data?!
If the data tilt is not resolved, performance tuning is completely impossible, and all other tuning methods are a joke. Data skew is a performance tuning problem that best reflects the level of engineer big data of spark.
If data skew can be solved, it means that you know how spark works like the back of your hand.
The tilt of the data has two direct and fatal consequences.
Data skew directly leads to a situation: OOM.
Slow, very slow, very slow, extremely slow, unacceptable slow.
We take 10 billion pieces of data as the list.
Individual Task (the Task of 8 billion pieces of data) handles excessive amounts of data. This slows down the execution time of the entire Job. This may cause the machine on which the Task is located, OOM, or run very slowly.
Reasons for skewed data:
In the Shuffle phase. Similarly, there are too many data items in Key. As a result, the amount of Task data in which a key (8 billion in the figure above) is located is too large. Far more data than other Task processes.
And such scenes are all too common. The law of twenty-eight can confirm this scenario.
To solve the data tilt, you need to:
Take care of shuffle
Fix the business scenario
Fix the usage of cpu core
The root cause of getting rid of OOM, etc.
So you need to be familiar with at least the above principles after settling the data tilt. So solving the data tilt is the key to the key.
Let me tell you a well-tried empirical conclusion: in general, the reason for OOM is data tilt. If the amount of data of a task task is too large, the pressure on GC is very great. This is not comparable to Kafka, because the memory of kafka does not pass through JVM. Is a Page based on the Linux kernel.
The principle of data skew
The principle of data skew is simple: in shuffle, the same key on each node must be pulled to a task on a node for processing, such as aggregation or join according to key. At this point, if the amount of data corresponding to a key is particularly large, data tilting will occur. For example, most key corresponds to 10 pieces of data, but individual key corresponds to 1 million pieces of data, then most task may only be allocated 10 pieces of data and run in 1 second, but individual task may be allocated 1 million data and need to run for an hour or two. Therefore, the progress of the entire Spark job is determined by the task with the longest running time.
So when there is a data skew, the Spark job seems to run very slowly, and may even cause a memory overflow due to the large amount of data processed by a task.
The following figure is a clear example: hello, the key, corresponds to a total of 7 pieces of data on three nodes, all of which are pulled into the same task for processing; while the two key of world and you each correspond to one piece of data, so the other two task only need to process one piece of data each. At this point, the running time of * * task may be 7 times longer than that of the other two task, and the running speed of the entire stage is determined by the slowest task.
How to locate the code that causes data skew
Data skew only occurs during shuffle. Here is a list of some commonly used operators that may trigger shuffle operations: distinct, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition, etc. When data skew occurs, it may be caused by the use of one of these operators in your code.
A situation in which task execution is very slow.
The first thing to look at is the number of stage in which the data tilt occurs.
If it is submitted in yarn-client mode, then the local log can be seen directly. You can find the stage; currently running in log. If you submit in yarn-cluster mode, you can use Spark Web UI to see the stage currently running. In addition, whether using yarn-client mode or yarn-cluster mode, we can take an in-depth look at the current amount of data allocated by each task of this stage on Spark Web UI to further determine whether the uneven distribution of data by task leads to data skew.
For example, in the following figure, the penultimate column shows the elapsed time of each task. It is obvious that some task runs very fast and takes only a few seconds to run, while some task runs very slowly and takes a few minutes to complete, at which point the data skew can be determined from the run time alone. In addition, the countdown * * column shows the amount of data processed by each task. It is obvious that a very short running time task only needs to process a few hundred KB of data, while a very long running time task needs to process thousands of KB of data, which is 10 times less than the amount of data processed. At this point, it is more certain that the data tilt has occurred.
Once we know which stage the data skew occurs, then we need to calculate which part of the code corresponding to the stage skew occurs according to the stage partition principle, and there must be a shuffle class operator in this part of the code. Accurate estimation of the corresponding relationship between stage and code requires an in-depth understanding of the source code of Spark. Here we can introduce a relatively simple and practical calculation method: as long as you see a shuffle class operator in the Spark code or a statement that will lead to shuffle (such as the group by statement) in the SQL statement of Spark SQL, then you can determine that the two stage are divided by that place.
Here we will take the most basic entry program of Spark-word counting as an example, how to use the simplest method to roughly calculate the corresponding code of stage. In the following example, in the entire code, only one reduceByKey is the operator that will occur shuffle, so it can be considered that, with this operator as the boundary, will be divided into the front and back two stage.
1. Stage0, mainly performing operations from textFile to map, and performing shuffle write operations. Shuffle write operation can be simply understood as partitioning the data in pairs RDD. In the data processed by each task, the same key will be written to the same disk file.
2. Stage1, which mainly executes the operation from reduceByKey to collect. As soon as each task of stage1 starts to run, it will first execute the shuffle read operation. The task that performs the shuffle read operation pulls the key that belongs to its own processing from the nodes where the task of the stage0 resides, and then performs global aggregation or join operations on the same key. In this case, it accumulates the value of the key. After stage1 executes the reduceByKey operator, it calculates the final wordCounts RDD, and then executes the collect operator and pulls all the data onto the Driver for us to traverse and print out.
Through the analysis of the word counting program, I hope to let you understand the most basic principle of stage partition and how the shuffle operation is executed at the boundary of the two stage after stage partition. Then we know how to quickly locate which part of the stage code where the data skew occurs. For example, we find in Spark Web UI or local log that some task of stage1 execute very slowly, and it is determined that stage1 has data skew, then we can go back to the code to locate that stage1 mainly includes reduceByKey, a shuffle class operator. At this time, we can basically determine that the data skew problem is caused by the educeByKey operator. For example, if a word appears 1 million times and other words appear only 10 times, then a task of stage1 will have to process 1 million data, and the whole stage will be slowed down by this task.
The solution of data skew
Solution 1: use Hive ETL to preprocess data
The solution applies to the scenario: it is the Hive table that causes the data to tilt. If the data in the Hive table is very uneven (for example, one key corresponds to 1 million data, while other key corresponds to 10 data), and the business scenario needs to frequently use Spark to perform an analysis operation on the Hive table, then this technical solution is more suitable.
Implementation idea: at this time, we can evaluate whether data preprocessing can be carried out through Hive (that is, data can be aggregated according to key in advance through Hive ETL, or join with other tables in advance), and then the data source in the Spark job is not the original Hive table, but the preprocessed Hive table. At this point, because the data has been aggregated or join in advance, there is no need to use the original shuffle class operator to perform such operations in the Spark job.
Solution implementation principle: this scheme solves the problem of data skew from the root, because it completely avoids the implementation of shuffle class operators in Spark, then there will certainly be no problem of data skew. But I would also like to remind you that this approach is a palliative rather than a permanent cure. After all, the data itself has the problem of uneven distribution, so when performing shuffle operations such as group by or join in Hive ETL, there will still be data skew, resulting in the slow speed of Hive ETL. We just advance the occurrence of data skew to Hive ETL to avoid data skew in Spark programs.
The advantages of the scheme: it is simple and convenient to implement, and the effect is very good. The data tilt is completely avoided, and the performance of the Spark job will be greatly improved.
Disadvantages of the scheme: if there is a temporary cure rather than a permanent cure, the data tilt will still occur in Hive ETL.
Practical experience of the scheme: in some projects where Java systems are used in conjunction with Spark, Java code frequently calls Spark jobs, and requires high performance of Spark jobs, so it is more suitable to use this scheme. Hive ETL, which tilts the data upstream, is executed only once a day, only once a day, and then every time Java invokes a Spark job, it executes quickly and provides a better user experience.
Practical experience of the project: this scheme is used in Meituan Dianping's interactive user behavior analysis system, which mainly allows users to submit data analysis and statistics tasks through the Java Web system, and the back end submits Spark jobs for data analysis and statistics through Java. It is required that the speed of Spark jobs must be fast, within 10 minutes as far as possible, otherwise the speed is too slow and the user experience will be very poor. Therefore, we advance the shuffle operations of some Spark jobs to Hive ETL, so that Spark can directly use the preprocessed Hive intermediate table, reduce the shuffle operations of Spark as much as possible, greatly improve the performance, and improve the performance of some jobs by more than 6 times.
Solution 2: filter a small number of key that cause tilt
The scenario applies: if it is found that there are only a few key that cause skew, and the impact on the calculation itself is not significant, then this scheme is suitable for use. For example, 99% of key corresponds to 10 pieces of data, but only one key corresponds to 1 million data, resulting in data skew.
Solution implementation idea: if we judge that the few key with a large amount of data are not particularly important to the execution of the job and the calculation results, then simply filter out the few key. For example, you can use the where clause in Spark SQL to filter out these key or execute the filter operator on RDD in Spark Core to filter out these key. If you need to dynamically determine which key has the largest amount of data and then filter it, you can use the sample operator to sample the RDD, then calculate the number of each key, and filter out the key with the largest amount of data.
The implementation principle of the scheme: after filtering out the key that leads to data skew, these key will not participate in the calculation, so it is naturally impossible to produce data tilt.
The advantages of the scheme: the implementation is simple, and the effect is very good, and the data tilt can be completely avoided.
Disadvantages of the scheme: it is not suitable for many scenarios, and in most cases, there are still many key that lead to tilt, not only a few.
Practical experience of the solution: we have also used this solution to solve the data tilt in the project. Once found that one day when the Spark job was running, it was suddenly OOM. After tracing, it was found that a certain key in the Hive table had abnormal data on that day, resulting in a surge in the amount of data. Therefore, we take sampling before each execution, calculate several key of the amount of data in the sample, and filter out those key directly in the program.
Solution 3: improve the parallelism of shuffle operations
The solution is applicable to the scenario: if we have to tilt the data to meet the difficulties, then it is recommended to give priority to using this scheme, because it is the easiest way to deal with data tilting.
The implementation idea of the scheme: when executing the shuffle operator on RDD, pass a parameter to the shuffle operator, such as reduceByKey (1000), which sets the number of shuffle read task when the shuffle operator executes. For shuffle statements in Spark SQL, such as group by, join, etc., you need to set a parameter, spark.sql.shuffle.partitions, which represents the parallelism of shuffle read task. The default value is 200, which is a bit too small for many scenarios.
The principle of solution implementation: by increasing the number of shuffle read task, multiple key originally assigned to one task can be assigned to multiple task, so that each task can process less data than before. For example, if there are five key, each key corresponds to 10 pieces of data, and each of the five key is assigned to a task, then the task needs to process 50 pieces of data. With the addition of shuffle read task, each task is assigned a key, that is, each task processes 10 pieces of data, and naturally the execution time of each task becomes shorter. The specific principle is shown in the following figure.
The advantages of the scheme: it is relatively simple to implement, and can effectively alleviate and reduce the impact of data tilt.
Disadvantages of the scheme: it only alleviates the tilt of data and does not completely eradicate the problem. According to practical experience, its effect is limited.
Practical experience of the solution: this solution usually can not completely solve the data skew, because if there are some extreme cases, such as 1 million data corresponding to a key, then no matter how much your task number increases, the key corresponding to 1 million data will definitely be allocated to a task for processing, so data skewing is bound to occur. So this scheme can only be said to try to use a simple method to alleviate the data tilt when it is found, or to combine it with other schemes.
Solution 4: two-stage aggregation (local aggregation + global aggregation)
The solution is suitable for scenarios: this scheme is more suitable when executing aggregation shuffle operators such as reduceByKey on RDD or using group by statements for grouping aggregation in Spark SQL.
The idea of realizing the scheme: the core idea of this scheme is to carry out two-stage aggregation. * each key is locally aggregated. First, a random number is assigned to each key, such as a random number within 10. Then the same key becomes different. For example, (hello, 1) (hello, 1) becomes (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1). Then perform local aggregation operations such as reduceByKey on the data marked with random numbers, and the result of local aggregation will become (1_hello, 2) (2_hello, 2). Then the prefix of each key is removed and it becomes (hello,2) (hello,2), and the final result can be obtained by performing the global aggregation operation again, such as (hello, 4).
The implementation principle of the scheme: the original same key is changed into several different key by adding random prefix, so that the data originally processed by one task can be dispersed to multiple task for local aggregation, thus solving the problem that a single task deals with too much data. Then remove the random prefix and do the global aggregation again, and the final result can be obtained. The specific principle is shown in the following figure.
The advantage of the scheme: for the data skew caused by the shuffle operation of the aggregation class, the effect is very good. It is usually possible to eliminate data skew, or at least significantly alleviate data skew, improving the performance of Spark jobs by several times.
Disadvantages of the scheme: it is only suitable for shuffle operations of aggregates, and the scope of application is relatively narrow. If it is the shuffle operation of the join class, you will have to use other solutions.
Solution 5: convert reduce join to map join
The solution is suitable for scenarios where join operations are used for RDD, or join statements are used in Spark SQL, and a RDD or table in a join operation has a small amount of data (such as several hundred megabytes or one or two gigabytes), which is more suitable for this scenario.
The realization idea of the scheme is not to use join operator for connection operation, but to use Broadcast variable and map class operator to realize join operation, so as to completely avoid the operation of shuffle class and avoid the occurrence and occurrence of data tilt completely. Pull the data in the smaller RDD directly into the memory of the driver through the collect operator, and then create a Broadcast variable to it; then execute the map class operator on another RDD, in the operator function, get the full amount of data of the smaller RDD from the Broadcast variable, and compare it with each piece of data of the current RDD according to the connection key, if the connection key is the same, then connect the data of the two RDD in the way you need.
Solution implementation principle: ordinary join will go through the shuffle process, but once shuffle, it is equivalent to pulling the data of the same key into a shuffle read task and then join, which is reduce join. However, if a RDD is relatively small, you can use the broadcast small RDD full data + map operator to achieve the same effect as join, that is, map join. At this time, there will be no shuffle operation and no data skew. The specific principle is shown in the following figure.
The advantage of the scheme: it works very well for the data tilt caused by the join operation, because there is no shuffle at all, and there is no data tilt at all.
Disadvantages of the scheme: there are fewer scenarios, because this scheme only applies to the case of a large table and a small table. After all, we need to broadcast the small table, which consumes memory resources. Driver and each Executor will host a full amount of data of the small RDD in memory. If the RDD data we broadcast is relatively large, such as more than 10 gigabytes, then a memory overflow may occur. Therefore, it is not suitable for situations where both are large tables.
Solution 6: sample tilt key and split join operation
Solution applicable scenario: when join is performed on two RDD/Hive tables, if the amount of data is too large to adopt solution 5, then you can take a look at the key distribution in the two RDD/Hive tables. If the data skew occurs because a few key in one RDD/Hive table has too much data, while all the key in the other RDD/Hive table is evenly distributed, then this solution is more appropriate.
Ideas for the implementation of the scheme:
For the RDD that contains a few key with excessive amount of data, sample a sample through the sample operator, and then count the number of each key to calculate which key is the amount of data.
Then split the data corresponding to these key from the original RDD to form a separate RDD, and prefix each key with a random number within n, without causing most of the tilted key to form another RDD.
Then another RDD that needs join is also filtered out the data corresponding to those tilted key and a separate RDD is formed. Each piece of data is expanded into n pieces of data, which are sequentially prefixed with a prefix of zero n, which will not cause most of the tilted key to form another RDD.
Then the independent RDD with random prefix is join with another independent RDD which expands n times. At this time, the same key can be broken up into n parts and dispersed into multiple task for join.
The other two ordinary RDD can just join as usual.
* combine the results of the two join using the union operator, which is the final join result.
Implementation principle: for the data tilt caused by join, if only a few key cause the tilt, a few key can be split into independent RDD, and the random prefix can be added to break up into n parts for join. At this time, the data corresponding to these key will not be concentrated on a few task, but scattered to multiple task for join. The specific principle is shown in the following figure.
The advantage of the scheme: for the data skew caused by join, if only a few key cause skew, this method can break up the key for join in the most effective way. Moreover, it only needs to expand the data corresponding to a small number of tilted key by n times, and there is no need to expand the full data. Avoid taking up too much memory.
Disadvantages of the solution: this approach is not appropriate if there are a large number of key tilts, such as thousands of key that cause data skew.
Solution 7: use random prefixes and expanded RDD for join
The solution is suitable for scenarios: if a large number of key in the RDD causes data skew during the join operation, there is no point in splitting the key. In this case, only one solution can be used to solve the problem.
Ideas for the implementation of the scheme:
The implementation idea of this scheme is basically similar to "solution 6". First, look at the data distribution in the RDD/Hive table and find the RDD/Hive table that causes the data tilt. for example, there are multiple key corresponding to more than 10, 000 pieces of data.
Each piece of data in the RDD is then prefixed with a random prefix within n.
At the same time, the capacity of another normal RDD is expanded, and each piece of data is expanded to n pieces of data, and each piece of data is prefixed with a zero n in turn.
* join the two processed RDD.
The principle of the solution: the original key is transformed into a different key by appending random prefixes, and then these processed "different key" can be dispersed into multiple task for processing, instead of having one task handle a large number of the same key. The difference between this scheme and solution 6 is that the previous scheme only carries out special processing for a small number of data corresponding to tilted key as far as possible. Because the processing process requires expansion of RDD, the previous scheme does not take up much memory after the expansion of RDD. This scheme is aimed at the situation where there are a large number of skewed key, so it is impossible to split part of the key for separate processing, so it can only expand the data capacity of the whole RDD, which requires high memory resources.
Advantages of the scheme: the tilt of data of join type can be handled basically, and the effect is relatively significant, and the performance improvement effect is very good.
Disadvantages of the scheme: this scheme is more to alleviate data tilt than to avoid data tilt completely. And the entire RDD needs to be expanded, which requires high memory resources.
Solution practice: when developing a data requirement, it was found that a join led to data skew. Before optimization, the execution time of the job is about 60 minutes; after using this scheme, the execution time is reduced to about 10 minutes, and the performance is improved 6 times.
Solution 8: a combination of multiple solutions
In practice, it has been found that in many cases, if you only deal with relatively simple data tilting scenarios, you can basically solve it by using one of the above schemes. However, if you are dealing with a more complex data skew scenario, you may need to combine multiple scenarios. For example, for Spark jobs with multiple data skew links, we can first use solution 1 and 2 to preprocess part of the data and filter some data to alleviate it. Secondly, we can improve the parallelism of some shuffle operations and optimize their performance. * * you can also choose a solution to optimize its performance for different aggregation or join operations. After we have a thorough understanding of the ideas and principles of these schemes, we need to flexibly use a variety of schemes to solve their own data tilt problem according to different situations in practice.
Thank you for your reading, the above is the content of "what is the method of spark homework tuning". After the study of this article, I believe you have a deeper understanding of what the method of spark homework tuning is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.