In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
The main content of this article is to explain "what is the method of Spark data tilt tuning". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the method of Spark data tilt tuning"?
# data tilt tuning
Most task execution is very fast, but a few task execution is extremely slow. For example, a total of 1000 task,997 task were executed in less than a minute, but the remaining two or three task took an hour or two. This situation is very common.
The Spark job that could have been executed normally suddenly reported an OOM (memory overflow) exception one day. Observing the exception stack is caused by the business code we wrote. This situation is relatively rare.
# # principle of data skew
The principle of data skew is simple: in shuffle, the same key on each node must be pulled to a task on a node for processing, such as aggregation or join according to key. At this point, if the amount of data corresponding to a key is particularly large, data tilting will occur. For example, most key corresponds to 10 pieces of data, but individual key corresponds to 1 million pieces of data, then most task may only be allocated 10 pieces of data and run in 1 second, but individual task may be allocated 1 million data and need to run for an hour or two. Therefore, the progress of the entire Spark job is determined by the task with the longest running time.
So when there is a data skew, the Spark job seems to run very slowly, and may even cause a memory overflow due to the large amount of data processed by a task.
The following figure is a clear example: hello, the key, corresponds to a total of 7 pieces of data on three nodes, all of which are pulled into the same task for processing; while the two key of world and you each correspond to one piece of data, so the other two task only need to process one piece of data each. At this point, the first task may run seven times longer than the other two task, and the running speed of the entire stage is determined by the slowest task.
# # how to locate the code that causes data skew
Data skew only occurs during shuffle. Here is a list of some commonly used operators that may trigger shuffle operations: distinct, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition, etc. When data skew occurs, it may be caused by the use of one of these operators in your code.
# if the execution of a task is particularly slow, the first thing to see is the number of stage in which the data skew occurs.
If it is submitted in yarn-client mode, then the local log can be seen directly. You can find the stage; currently running in log. If you submit in yarn-cluster mode, you can use Spark Web UI to see the stage currently running. In addition, whether using yarn-client mode or yarn-cluster mode, we can take an in-depth look at the current amount of data allocated by each task of this stage on Spark Web UI to further determine whether the uneven distribution of data by task leads to data skew.
For example, in the following figure, the penultimate column shows the elapsed time of each task. It is obvious that some task runs very fast and takes only a few seconds to run, while some task runs very slowly and takes a few minutes to complete, at which point the data skew can be determined from the run time alone. In addition, the penultimate column shows the amount of data processed by each task. It is obvious that a very short running time task only needs to process a few hundred KB of data, while a very long running time task needs to process thousands of KB of data, which is 10 times less. At this point, it is more certain that the data tilt has occurred.
Once we know which stage the data skew occurs, then we need to calculate which part of the code corresponding to the skew stage according to the principle of stage partition, and there must be a shuffle class operator in this part of the code. Accurate estimation of the corresponding relationship between stage and code requires an in-depth understanding of the source code of Spark. Here we can introduce a relatively simple and practical calculation method: as long as you see a shuffle class operator in the Spark code or a statement that will lead to shuffle (such as the group by statement) in the SQL statement of Spark SQL, then you can determine that the two stage are divided by that place.
Here we will take the most basic entry program of Spark-word counting as an example, how to use the simplest method to roughly calculate the corresponding code of stage. In the following example, in the entire code, only one reduceByKey is the operator that will occur shuffle, so it can be considered that, with this operator as the boundary, will be divided into the front and back two stage.
Stage0, mainly performing operations from textFile to map, and performing shuffle write operations. Shuffle write operation can be simply understood as partitioning the data in pairs RDD. In the data processed by each task, the same key will be written to the same disk file.
Stage1, which mainly performs the operation from reduceByKey to collect, performs the shuffle read operation as soon as each task of stage1 starts to run. The task that performs the shuffle read operation pulls the key that belongs to its own processing from the nodes where the task of the stage0 resides, and then performs global aggregation or join operations on the same key. In this case, it accumulates the value of the key. After stage1 executes the reduceByKey operator, it calculates the final wordCounts RDD, and then executes the collect operator and pulls all the data onto the Driver for us to traverse and print out.
Val conf = new SparkConf () val sc = new SparkContext (conf) val lines = sc.textFile ("hdfs://...") val words = lines.flatMap (_ .split (")) val pairs = words.map ((_, 1)) val wordCounts = pairs.reduceByKey (_ + _) wordCounts.collect (). Foreach (println (_))
Through the analysis of the word counting program, I hope to let you understand the most basic principle of stage partition and how the shuffle operation is executed at the boundary of the two stage after stage partition. Then we know how to quickly locate which part of the stage code where the data skew occurs. For example, we find in Spark Web UI or local log that some task of stage1 execute very slowly, and it is determined that stage1 has data skew, then we can go back to the code to locate that stage1 mainly includes reduceByKey, a shuffle class operator. At this time, we can basically determine that the data skew problem is caused by the educeByKey operator. For example, if a word appears 1 million times and other words appear only 10 times, then a task of stage1 will have to process 1 million data, and the whole stage will be slowed down by this task.
# in the case of an inexplicable memory overflow of a task, it is easier to locate the code that has identified the problem. We recommend looking directly at the exception stack of the local log in yarn-client mode, or looking at the exception stack in log in yarn-cluster mode through YARN. In general, the exception stack information can be used to locate which line of your code has a memory overflow. Then look around that line of code, and there is usually a shuffle class operator, which is likely to cause the data skew.
However, it should be noted that data skewing can not be determined simply by accidental memory overflows. Memory overflows can also occur because of the bug of your own code, as well as occasional data exceptions. Therefore, it is still necessary to follow the method described above, through the Spark Web UI to check the running time of each task of the stage that reported the error and the amount of data allocated to determine whether the memory overflow is caused by data skew.
# # View the data distribution of key that leads to data skew
Once you know where the data skew occurs, you usually need to analyze the RDD/Hive table that performed the shuffle operation and caused the data skew to see the distribution of key in it. This is mainly to provide a basis for which technical scheme to choose later. In view of the combination of different key distributions and different shuffle operators, it may be necessary to choose different technical solutions.
There are many ways to view the key distribution, depending on how you perform the operation:
If the data skew is caused by the group by and join statements in Spark SQL, query the key distribution of the tables used in SQL.
If the data skew is caused by the execution of the shuffle operator on Spark RDD, you can add code to view the key distribution in the Spark job, such as RDD.countByKey (). Then for the statistics of the number of key occurrence, collect/take to the client to print, you can see the distribution of key.
For example, for the word counting program mentioned above, if it is determined that stage1's reduceByKey operator is causing the data skew, then you should look at the key distribution in the RDD performing the reduceByKey operation, which in this case refers to pairs RDD. For the following example, we can first sample 10% of the sample data of pairs, then use the countByKey operator to count the number of occurrences of each key, and finally traverse and print the number of occurrences of each key in the sample data on the client.
Val sampledPairs = pairs.sample (false, 0.1) val sampledWordCounts = sampledPairs.countByKey () sampledWordCounts.foreach (println (_))
# # solution to data skew
# solution 1: using Hive ETL to preprocess data is suitable for scenarios: it is the Hive table that causes the data tilt. If the data in the Hive table is very uneven (for example, one key corresponds to 1 million data, while other key corresponds to 10 data), and the business scenario needs to frequently use Spark to perform an analysis operation on the Hive table, then this technical solution is more suitable.
Implementation idea: at this time, we can evaluate whether data preprocessing can be carried out through Hive (that is, data can be aggregated according to key in advance through Hive ETL, or join with other tables in advance), and then the data source in the Spark job is not the original Hive table, but the preprocessed Hive table. At this point, because the data has been aggregated or join in advance, there is no need to use the original shuffle class operator to perform such operations in the Spark job.
Solution implementation principle: this scheme solves the problem of data skew from the root, because it completely avoids the implementation of shuffle class operators in Spark, then there will certainly be no problem of data skew. But I would also like to remind you that this approach is a palliative rather than a permanent cure. After all, the data itself has the problem of uneven distribution, so when performing shuffle operations such as group by or join in Hive ETL, there will still be data skew, resulting in the slow speed of Hive ETL. We just advance the occurrence of data skew to Hive ETL to avoid data skew in Spark programs.
The advantages of the scheme: it is simple and convenient to implement, and the effect is very good. The data tilt is completely avoided, and the performance of the Spark job will be greatly improved.
Disadvantages of the scheme: if there is a temporary cure rather than a permanent cure, the data tilt will still occur in Hive ETL.
Practical experience of the scheme: in some projects where Java systems are used in conjunction with Spark, Java code frequently calls Spark jobs, and requires high performance of Spark jobs, so it is more suitable to use this scheme. Hive ETL, which tilts the data upstream, is executed only once a day, only once a day, and then every time Java invokes a Spark job, it executes quickly and provides a better user experience.
Practical experience of the project: this scheme is used in Meituan Dianping's interactive user behavior analysis system, which mainly allows users to submit data analysis and statistics tasks through the Java Web system, and the back end submits Spark jobs for data analysis and statistics through Java. It is required that the speed of Spark jobs must be fast, within 10 minutes as far as possible, otherwise the speed is too slow and the user experience will be very poor. Therefore, we advance the shuffle operations of some Spark jobs to Hive ETL, so that Spark can directly use the preprocessed Hive intermediate table, reduce the shuffle operations of Spark as much as possible, greatly improve the performance, and improve the performance of some jobs by more than 6 times.
# solution 2: filter a small number of key that cause tilt
The scenario applies: if it is found that there are only a few key that cause skew, and the impact on the calculation itself is not significant, then this scheme is suitable for use. For example, 99% of key corresponds to 10 pieces of data, but only one key corresponds to 1 million data, resulting in data skew.
Solution implementation idea: if we judge that the few key with a large amount of data are not particularly important to the execution of the job and the calculation results, then simply filter out the few key. For example, you can use the where clause in Spark SQL to filter out these key or execute the filter operator on RDD in Spark Core to filter out these key. If you need to dynamically determine which key has the largest amount of data and then filter it, you can use the sample operator to sample the RDD, then calculate the number of each key, and filter out the key with the largest amount of data.
The implementation principle of the scheme: after filtering out the key that leads to data skew, these key will not participate in the calculation, so it is naturally impossible to produce data tilt.
The advantages of the scheme: the implementation is simple, and the effect is very good, and the data tilt can be completely avoided.
Disadvantages of the scheme: it is not suitable for many scenarios, and in most cases, there are still many key that lead to tilt, not only a few.
Practical experience of the solution: we have also used this solution to solve the data tilt in the project. Once found that one day when the Spark job was running, it was suddenly OOM. After tracing, it was found that a certain key in the Hive table had abnormal data on that day, resulting in a surge in the amount of data. Therefore, we take sampling before each execution, calculate the key with the largest amount of data in the sample, and filter out those key directly in the program.
# solution 3: improve the parallelism of shuffle operations
The solution is applicable to the scenario: if we have to tilt the data to meet the difficulties, then it is recommended to give priority to using this scheme, because it is the easiest way to deal with data tilting.
The implementation idea of the scheme: when executing the shuffle operator on RDD, pass a parameter to the shuffle operator, such as reduceByKey (1000), which sets the number of shuffle read task when the shuffle operator executes. For shuffle statements in Spark SQL, such as group by, join, etc., you need to set a parameter, spark.sql.shuffle.partitions, which represents the parallelism of shuffle read task. The default value is 200, which is a bit too small for many scenarios.
The principle of solution implementation: by increasing the number of shuffle read task, multiple key originally assigned to one task can be assigned to multiple task, so that each task can process less data than before. For example, if there are five key, each key corresponds to 10 pieces of data, and each of the five key is assigned to a task, then the task needs to process 50 pieces of data. With the addition of shuffle read task, each task is assigned a key, that is, each task processes 10 pieces of data, and naturally the execution time of each task becomes shorter. The specific principle is shown in the following figure.
The advantages of the scheme: it is relatively simple to implement, and can effectively alleviate and reduce the impact of data tilt.
Disadvantages of the scheme: it only alleviates the tilt of data and does not completely eradicate the problem. According to practical experience, its effect is limited.
Practical experience of the solution: this solution usually can not completely solve the data skew, because if there are some extreme cases, such as 1 million data corresponding to a key, then no matter how much your task number increases, the key corresponding to 1 million data will definitely be allocated to a task for processing, so data skewing is bound to occur. So this scheme can only be said to be the first way to try to alleviate the data tilt with a simple method, or to use it in combination with other schemes.
# solution 4: two-stage aggregation (local aggregation + global aggregation)
The solution is suitable for scenarios: this scheme is more suitable when executing aggregation shuffle operators such as reduceByKey on RDD or using group by statements for grouping aggregation in Spark SQL.
The idea of realizing the scheme: the core idea of this scheme is to carry out two-stage aggregation. The first time is local aggregation, first give each key a random number, such as a random number less than 10, then the same key becomes different, such as (hello, 1) (hello, 1), it becomes (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1). Then perform local aggregation operations such as reduceByKey on the data marked with random numbers, and the result of local aggregation will become (1_hello, 2) (2_hello, 2). Then the prefix of each key is removed and it becomes (hello,2) (hello,2), and the final result can be obtained by performing the global aggregation operation again, such as (hello, 4).
The implementation principle of the scheme: the original same key is changed into several different key by adding random prefix, so that the data originally processed by one task can be dispersed to multiple task for local aggregation, thus solving the problem that a single task deals with too much data. Then remove the random prefix and do the global aggregation again, and the final result can be obtained. The specific principle is shown in the following figure.
The advantage of the scheme: for the data skew caused by the shuffle operation of the aggregation class, the effect is very good. It is usually possible to eliminate data skew, or at least significantly alleviate data skew, improving the performance of Spark jobs by several times.
Disadvantages of the scheme: it is only suitable for shuffle operations of aggregates, and the scope of application is relatively narrow. If it is the shuffle operation of the join class, you will have to use other solutions.
/ / the first step is to give each key in the RDD a random prefix. JavaPairRDD randomPrefixRdd = rdd.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L; @ Override public Tuple2 call (Tuple2 tuple) throws Exception {Random random = new Random (); int prefix = random.nextInt (10); return new Tuple2 (prefix + "_" + tuple._1, tuple._2) }}) / / the second step is to locally aggregate the key with a random prefix. JavaPairRDD localAggrRdd = randomPrefixRdd.reduceByKey (new Function2 () {private static final long serialVersionUID = 1L; @ Override public Long call (Long v1, Long v2) throws Exception {return v1 + v2;}}); / / the third step is to remove the random prefix for each key in RDD. JavaPairRDD removedRandomPrefixRdd = localAggrRdd.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L; @ Override public Tuple2 call (Tuple2 tuple) throws Exception {long originalKey = Long.valueOf (tuple._1.split ("_") [1]); return new Tuple2 (originalKey, tuple._2);}}) / / the fourth step is to globally aggregate the RDD without random prefixes. JavaPairRDD globalAggrRdd = removedRandomPrefixRdd.reduceByKey (new Function2 () {private static final long serialVersionUID = 1L; @ Override public Long call (Long v1, Long v2) throws Exception {return v1 + v2;}})
# solution 5: convert reduce join to map join
The solution is suitable for scenarios where join operations are used for RDD, or join statements are used in Spark SQL, and a RDD or table in a join operation has a small amount of data (such as several hundred megabytes or one or two gigabytes), which is more suitable for this scenario.
The realization idea of the scheme is not to use join operator for connection operation, but to use Broadcast variable and map class operator to realize join operation, so as to completely avoid the operation of shuffle class and avoid the occurrence and occurrence of data tilt completely. Pull the data in the smaller RDD directly into the memory of the driver through the collect operator, and then create a Broadcast variable to it; then execute the map class operator on another RDD, in the operator function, get the full amount of data of the smaller RDD from the Broadcast variable, and compare it with each piece of data of the current RDD according to the connection key, if the connection key is the same, then connect the data of the two RDD in the way you need.
Solution implementation principle: ordinary join will go through the shuffle process, but once shuffle, it is equivalent to pulling the data of the same key into a shuffle read task and then join, which is reduce join. However, if a RDD is relatively small, you can use the broadcast small RDD full data + map operator to achieve the same effect as join, that is, map join. At this time, there will be no shuffle operation and no data skew. The specific principle is shown in the following figure.
The advantage of the scheme: it works very well for the data tilt caused by the join operation, because there is no shuffle at all, and there is no data tilt at all.
Disadvantages of the scheme: there are fewer scenarios, because this scheme only applies to the case of a large table and a small table. After all, we need to broadcast the small table, which consumes memory resources. Driver and each Executor will host a full amount of data of the small RDD in memory. If the RDD data we broadcast is relatively large, such as more than 10 gigabytes, then a memory overflow may occur. Therefore, it is not suitable for situations where both are large tables.
/ / first, collect the RDD data with a relatively small amount of data to Driver. List rdd1Data = rdd1.collect () / / then use the broadcast function of Spark to convert the data of small RDDs into broadcast variables, so that there is only one copy of RDD data per Executor. / / memory space can be saved as much as possible, and network transmission performance overhead can be reduced. Final Broadcast rdd1DataBroadcast = sc.broadcast (rdd1Data); / / A map class operation is performed on another RDD instead of a join class operation. JavaPairRDD joinedRdd = rdd2.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L; @ Override public Tuple2 call (Tuple2 tuple) throws Exception {/ / in the operator function, the rdd1 data in the local Executor is obtained by broadcasting variables. List rdd1Data = rdd1DataBroadcast.value (); / / you can convert the data of rdd1 into a Map to facilitate subsequent join operations. Map rdd1DataMap = new HashMap (); for (Tuple2 data: rdd1Data) {rdd1DataMap.put (data._1, data._2);} / / get the key and value of the current RDD data. String key = tuple._1; String value = tuple._2; / / get the data that can be join from the rdd1 data Map according to key. Row rdd1Value = rdd1DataMap.get (key); return new Tuple2 (key, new Tuple2 (value, rdd1Value));}}); / / A hint here. / / the above method is only applicable to the key in rdd1. There is no repetition, all of which are the only scenarios. / / if there are multiple identical key in the rdd1, then you have to use the operation of the flatMap class. Instead of using map when performing the join, you have to traverse all the data of the rdd1 for join. / / each piece of data in rdd2 may return data after multiple join.
# solution 6: sample tilt key and split join operation
Solution applicable scenario: when join is performed on two RDD/Hive tables, if the amount of data is too large to adopt solution 5, then you can take a look at the key distribution in the two RDD/Hive tables. If the data skew occurs because a few key in one RDD/Hive table has too much data, while all the key in the other RDD/Hive table is evenly distributed, then this solution is more appropriate.
Ideas for the implementation of the scheme:
For the RDD that contains a few key with excessive amount of data, sample a sample through the sample operator, and then count the number of each key, and calculate which key has the largest amount of data.
Then split the data corresponding to these key from the original RDD to form a separate RDD, and prefix each key with a random number within n, without causing most of the tilted key to form another RDD.
Then another RDD that needs join is also filtered out the data corresponding to those tilted key and a separate RDD is formed. Each piece of data is expanded into n pieces of data, which are sequentially prefixed with a prefix of zero n, which will not cause most of the tilted key to form another RDD.
Then the independent RDD with random prefix is join with another independent RDD which expands n times. At this time, the same key can be broken up into n parts and dispersed into multiple task for join.
The other two ordinary RDD can just join as usual.
Finally, the results of the two join can be combined by using the union operator, which is the final join result.
Implementation principle: for the data tilt caused by join, if only a few key cause the tilt, a few key can be split into independent RDD, and the random prefix can be added to break up into n parts for join. At this time, the data corresponding to these key will not be concentrated on a few task, but scattered to multiple task for join. The specific principle is shown in the following figure.
The advantage of the scheme: for the data skew caused by join, if only a few key cause skew, this method can break up the key for join in the most effective way. Moreover, it only needs to expand the data corresponding to a small number of tilted key by n times, and there is no need to expand the full data. Avoid taking up too much memory.
Disadvantages of the solution: this approach is not appropriate if there are a large number of key tilts, such as thousands of key that cause data skew.
/ / first sample 10% of the sample data from the rdd1 that contains a few key that cause the data to tilt. JavaPairRDD sampledRDD = rdd1.sample (false, 0.1); / / A pair of sample data RDD counts the number of occurrences of each key and sorts them in descending order. / / A pair of data sorted in descending order, and take out the data of top 1 or top 100, that is, the first n data with the largest number of key. / / it is up to everyone to decide how many key with the largest amount of data to take out. We will take one as a demonstration here. JavaPairRDD mappedSampledRDD = sampledRDD.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L; @ Override public Tuple2 call (Tuple2 tuple) throws Exception {return new Tuple2 (tuple._1, 1L);}}); JavaPairRDD countedSampledRDD = mappedSampledRDD.reduceByKey (new Function2 () {private static final long serialVersionUID = 1L) @ Override public Long call (Long v1, Long v2) throws Exception {return v1 + v2;}}); JavaPairRDD reversedSampledRDD = countedSampledRDD.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L @ Override public Tuple2 call (Tuple2 tuple) throws Exception {return new Tuple2 (tuple._2, tuple._1);}}); final Long skewedUserid = reversedSampledRDD.sortByKey (false) .take (1) .get (0) .2 rdd1 / split the key that leads to data skew from the rdd1 to form an independent RDD. JavaPairRDD skewedRDD = rdd1.filter (new Function () {private static final long serialVersionUID = 1L; @ Override public Boolean call (Tuple2 tuple) throws Exception {return tuple._1.equals (skewedUserid);}}); / / split the normal key that does not cause data skew from the rdd1 to form an independent RDD. JavaPairRDD commonRDD = rdd1.filter (new Function () {private static final long serialVersionUID = 1L; @ Override public Boolean call (Tuple2 tuple) throws Exception {return! tuple._1.equals (skewedUserid);}}); / / rdd2, which is the rdd with relatively uniform distribution of all key. / / here, the data corresponding to the key obtained in rdd2 is filtered out and split into a separate rdd, and the data in rdd is expanded by 100x using the flatMap operator. / / each piece of data for the expansion is prefixed with 0,100. JavaPairRDD skewedRdd2 = rdd2.filter (new Function () {private static final long serialVersionUID = 1L; @ Override public Boolean call (Tuple2 tuple) throws Exception {return tuple._1.equals (skewedUserid);}}) .flatMapToPair (new PairFlatMapFunction () {private static final long serialVersionUID = 1L) @ Override public Iterable call (Tuple2 tuple) throws Exception {Random random = new Random (); List list = new ArrayList (); for (int I = 0; I < 100; iTunes +) {list.add (new Tuple2 (I + "_" + tuple._1, tuple._2)) } return list;}}); / / separate rdd that leads to skewed key split in rdd1, with a random prefix within 100 for each piece of data. / / then join the independent rdd split in the rdd1 and the independent rdd split in the above rdd2. JavaPairRDD joinedRDD1 = skewedRDD.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L; @ Override public Tuple2 call (Tuple2 tuple) throws Exception {Random random = new Random (); int prefix = random.nextInt; return new Tuple2 (prefix + "_" + tuple._1, tuple._2) ) .join (skewedUserid2infoRDD) .mapToPair (new PairFunction () {private static final long serialVersionUID = 1L) @ Override public Tuple2 call (Tuple2 tuple) throws Exception {long key = Long.valueOf (tuple._1.split ("_") [1]); return new Tuple2 (key, tuple._2) }}); / / split the independent rdd containing ordinary key in rdd1 and join it directly with rdd2. JavaPairRDD joinedRDD2 = commonRDD.join (rdd2); / / compare the result after tilting key join with the result after ordinary key join, uinon. / / is the final join result. JavaPairRDD joinedRDD = joinedRDD1.union (joinedRDD2)
# solution 7: use random prefixes and expanded RDD for join
The solution is applicable in the scenario: if a large number of key in the RDD causes data skew during the join operation, then there is no point in splitting the key, so the last solution can only be used to solve the problem.
Ideas for the implementation of the scheme:
The implementation idea of this scheme is basically similar to "solution 6". First, look at the data distribution in the RDD/Hive table and find the RDD/Hive table that causes the data tilt. for example, there are multiple key corresponding to more than 10, 000 pieces of data.
Each piece of data in the RDD is then prefixed with a random prefix within n.
At the same time, the capacity of another normal RDD is expanded, and each piece of data is expanded to n pieces of data, and each piece of data is prefixed with a zero n in turn.
Finally, the two processed RDD can be join.
The principle of the solution: the original key is transformed into a different key by appending random prefixes, and then these processed "different key" can be dispersed into multiple task for processing, instead of having one task handle a large number of the same key. The difference between this scheme and solution 6 is that the previous scheme only carries out special processing for a small number of data corresponding to tilted key as far as possible. Because the processing process requires expansion of RDD, the previous scheme does not take up much memory after the expansion of RDD. This scheme is aimed at the situation where there are a large number of skewed key, so it is impossible to split part of the key for separate processing, so it can only expand the data capacity of the whole RDD, which requires high memory resources.
Advantages of the scheme: the tilt of data of join type can be handled basically, and the effect is relatively significant, and the performance improvement effect is very good.
Disadvantages of the scheme: this scheme is more to alleviate data tilt than to avoid data tilt completely. And the entire RDD needs to be expanded, which requires high memory resources.
Solution practice: when developing a data requirement, it was found that a join led to data skew. Before optimization, the execution time of the job is about 60 minutes; after using this scheme, the execution time is reduced to about 10 minutes, and the performance is improved 6 times.
/ / first, one of the RDD with relatively uniform key distribution is expanded by a factor of 100. JavaPairRDD expandedRDD = rdd1.flatMapToPair (new PairFlatMapFunction () {private static final long serialVersionUID = 1L; @ Override public Iterable call (Tuple2 tuple) throws Exception {List list = new ArrayList (); for (int I = 0; I < 100) ) {list.add (new Tuple2 (0 + "_" + tuple._1, tuple._2);} return list;}}); / / second, put another RDD with data tilting key, each with a random prefix of less than 100. JavaPairRDD mappedRDD = rdd2.mapToPair (new PairFunction () {private static final long serialVersionUID = 1L; @ Override public Tuple2 call (Tuple2 tuple) throws Exception {Random random = new Random (); int prefix = random.nextInt; return new Tuple2 (prefix + "_" + tuple._1, tuple._2) }}); / / join the two processed RDD. JavaPairRDD joinedRDD = mappedRDD.join (expandedRDD)
# solution 8: in practice, it is found that in many cases, if you only deal with relatively simple data tilting scenarios, you can basically use one of the above solutions. However, if you are dealing with a more complex data skew scenario, you may need to combine multiple scenarios. For example, for Spark jobs with multiple data skew links, we can first use solution one and two to preprocess part of the data and filter part of the data to alleviate it; secondly, we can improve the parallelism of some shuffle operations and optimize their performance; finally, we can choose a scheme to optimize its performance for different aggregation or join operations. After we have a thorough understanding of the ideas and principles of these schemes, we need to flexibly use a variety of schemes to solve their own data tilt problem according to different situations in practice.
At this point, I believe you have a deeper understanding of "what is the method of Spark data tilt tuning". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.