In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What this article shares to you is about the expression of the eight practical methods to solve the tilt of Spark data, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
What is data tilt?
For the distributed big data system like Spark/Hadoop, the large amount of data is not terrible, but the terrible thing is the data tilt.
For distributed systems, ideally, the overall application time decreases linearly with the increase of system size (number of nodes). If it takes 120 minutes for a machine to process a large batch of data, the ideal time is 120 / 3 = 40 minutes when the number of machines increases to 3. However, if we want to achieve that the execution time of each machine is 1 / N of that of a single machine in a distributed situation, we must ensure that the amount of tasks of each machine is equal. Unfortunately, in many cases, the distribution of tasks is uneven, even so that most of the tasks are assigned to individual machines, and most other machines only account for a small part of the total amount of tasks. For example, one machine handles 80% of the tasks, and the other two machines each handle 10% of the tasks.
"not too much but suffer from inequality" is the biggest problem in a distributed environment. It means that computing power is not linear expansion, but there is a short board effect: the time spent by a Stage is determined by the slowest Task.
Because all task in the same Stage perform the same calculation, on the premise of excluding the difference of computing power of different computing nodes, the difference of time between different task is mainly determined by the amount of data processed by the task. Therefore, if we want to give full play to the advantages of parallel computing in distributed systems, we must solve the problem of data skew.
The harm of data skew
When data tilt occurs, a small number of tasks take much more time than other tasks, which makes the overall time-consuming too much and can not give full play to the parallel computing advantages of distributed systems.
In addition, when the data tilt occurs, the amount of data processed by some tasks is too large, which may cause the task to fail due to insufficient memory, and then introduce the whole application failure.
The phenomenon of skewed data
Nine times out of ten, the data tilt occurs when the following phenomena are found:
Most task execution is very fast, but individual task execution is so slow that the overall task cannot be finished at some stage.
The Spark job that could have been executed normally suddenly reported an OOM (memory overflow) exception one day. Observing the exception stack is caused by the business code we wrote. This situation is relatively rare.
TIPS:
In Spark streaming programs, data skew is more likely to occur, especially when the program contains operations such as join and group like sql. Because we usually do not allocate too much memory when Spark Streaming programs are running, it is very easy to cause OOM once some data skew occurs in this process.
The reason for the tilt of data
When performing shuffle, the same key on each node must be pulled to a task on a node for processing, such as aggregating according to key or join. At this point, if the amount of data corresponding to a key is particularly large, data tilting will occur. For example, most key corresponds to 10 pieces of data, but individual key corresponds to 1 million pieces of data, then most task may only be allocated 10 pieces of data and run in 1 second, but individual task may be allocated 1 million data and need to run for an hour or two.
So when there is a data skew, the Spark job seems to run very slowly, and may even cause a memory overflow due to the large amount of data processed by a task.
Problem discovery and location
1. Through Spark Web UI
Look at the amount of data (Shuffle Read Size/Records) allocated by each task of the currently running stage through Spark Web UI to further determine whether the uneven distribution of data by task leads to data skew.
Once we know which stage the data skew occurs, then we need to calculate which part of the code corresponding to the skew stage according to the stage partition principle, and there must be a shuffle class operator in this part of the code. You can view the distribution of each key through countByKey.
TIPS:
Data skew only occurs during shuffle. Here is a list of some commonly used operators that may trigger shuffle operations: distinct, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition, etc. When data skew occurs, it may be caused by the use of one of these operators in your code.
2. Through key statistics
It can also be verified by sampling statistics of the number of key occurrences.
Because of the huge amount of data, we can sample the data by sampling, count the number of occurrences, and sort the first few according to the number of occurrences:
Df.select ("key") .sample (false, 0.1) / / data sampling. (k = > (k, 1)). ReduceBykey (_ + _) / / counts the number of key occurrences.map (k = > (k.room2) SortByKey (false) / / sorts according to the number of key occurrences. Take (10) / take the first 10.
If it is found that most of the data are evenly distributed, and individual data are several orders of magnitude larger than others, it means that the data is skewed.
How to alleviate data skew?
Basic ideas:
Business logic: we optimize the data tilt from the business logic level, for example, to count orders in different cities, then we do count for this first-tier city separately, and finally integrate with other cities.
Program implementation: for example, in Hive, we often encounter count (distinct) operations, which will lead to only one reduce in the end, so we can first group and then package a layer of count on the outside; use reduceByKey instead of groupByKey in Spark.
Parameter tuning: both Hadoop and Spark come with a lot of parameters and mechanisms to adjust data tilt, which can solve most of the problems.
Train of thought 1. Filter exception data
If the key that causes the data skew is abnormal data, then simply filter it out.
The first step is to analyze the key to determine which key causes the data skew. The specific methods have been introduced above, and I will not repeat them here.
Then analyze the records corresponding to these key:
Null values or outliers are mostly caused by this reason.
Invalid data, a large number of duplicate test data or valid data that has little impact on the results
Valid data, normal data distribution caused by business
Solution:
For the first two cases, the data can be filtered directly.
The third case requires special treatment, which we will describe in detail below.
Train of thought 2. Improve the parallelism of shuffle
When Spark does Shuffle, it uses HashPartitioner (not Hash Shuffle) to partition data by default. If the parallelism is not set properly, a large number of data corresponding to different Key may be allocated to the same Task, resulting in the data processed by the Task is much larger than other Task, resulting in data tilt.
If the parallelism of Shuffle is adjusted so that different Key assigned to the same Task are sent to different Task for processing, the amount of data to be processed by the original Task can be reduced, thus the short board effect caused by the data tilt problem can be alleviated.
(1) Operation flow
The RDD operation can set the degree of parallelism directly on the operator that requires Shuffle or use the spark.default.parallelism setting. In the case of Spark SQL, the parallelism can also be set through SET spark.sql.shuffle.partitions= [num _ tasks]. The default parameters are controlled by different Cluster Manager.
DataFrame and sparkSql can set the spark.sql.shuffle.partitions= [num _ tasks] parameter to control the concurrency of shuffle, which defaults to 200.
(2) applicable scenarios
A large number of different Key are assigned to the same Task, resulting in an excessive amount of data in the Task.
(3) solution
Adjust parallelism. Generally, the degree of parallelism is increased, but sometimes the effect can be achieved by reducing the degree of parallelism.
(4) advantages
It is easy to implement and only needs parameter tuning. The problem can be solved at the minimum cost. In general, if there is a data tilt, you can try this method several times first, and then try other methods if the problem is not solved.
(5) inferiority
It applies to fewer scenarios, but makes each task perform fewer different key. It is impossible to solve the skew caused by the particularly large key, and if the size of some key is very large, even if a task executes it alone, it will be troubled by data skew. And this method can only alleviate the data tilt, not completely eliminate the problem. From the perspective of practical experience, the effect is general.
TIPS:
Data skew can be compared to hash conflicts. Increasing parallelism is similar to increasing the size of the hash table.
Train of thought 3. Custom Partitioner
(1) principle
Using a custom Partitioner (default is HashPartitioner), assign different Key that would have been assigned to the same Task to a different Task.
For example, we use a custom Partitioner on the groupByKey operator:
.groupByKey (new Partitioner () {@ Override public int numPartitions () {return 12;} @ Override public int getPartition (Object key) {int id = Integer.parseInt (key.toString () If (id > = 9500000 & & id (prefix + SPLIT + t.room1,1) .reduceByKey ((v1, v2) = > v1 + v2) .map (t = > (t._1.split (SPLIT) (1), t2.room2)) .reduceByKey ((v1, v2) = > v1 + v2)}
However, the performance of mapreduce twice is slightly worse than that of once.
Data skew in Hadoop
Mapreduce programs and Hive programs are used directly to users in Hadoop. Although Hive is finally executed by MR (at least at present, Hive memory computing is not popular), the logic of the content written is very different, one is the program, the other is Sql, so here is a slight distinction.
The data tilt in Hadoop is mainly manifested in that the ruduce phase card is 99.99%, and 99.99% can not be finished all the time.
Here, if you look at the log or the monitoring interface in detail, you will find:
There is one more reduce stuck.
All kinds of container error OOM
The amount of data read and written is huge, at least far more than other normal reduce.
With the tilt of the data, there will be all kinds of strange performances such as the task being kill.
Experience:
The data skew of Hive generally occurs on Group and On in Sql, and is deeply bound to the data logic.
Optimization method:
Here are some methods and ideas, and the specific parameters and usage can be found on the official website.
Map join mode
The operation of count distinct is first converted to group, then count.
Parameter tuning
Set hive.map.aggr=trueset hive.groupby.skewindata=true
The use of left semi jion
Set the map output and the compression of the intermediate result. (it is not entirely to solve the problem of data skew, but it reduces IO read and write and network transmission, which can improve a lot of efficiency)
Description:
Hive.map.aggr=true: partial aggregation is done in map, which is more efficient but requires more memory.
Hive.groupby.skewindata=true: load balancer when data is skewed. If the selected item is set to true, the generated query plan will have two MRJob. In the first MRJob, the output result set of the Map is randomly distributed to the Reduce, and each Reduce does a partial aggregation operation and outputs the result. The result is that the same GroupBy Key may be distributed to different Reduce, thus achieving the purpose of load balancing. The second MRJob is distributed to the Reduce according to the GroupBy Key according to the preprocessed data results (this process ensures that the same GroupBy Key is distributed to the same Reduce), and finally completes the final aggregation operation.
The above is the expression of the eight practical ways to solve the tilt of Spark data, and the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.