Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to solve the problem of Hive 100 billion-level data skew

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "how to solve Hive trillion-level data tilt". In daily operation, I believe many people have doubts about how to solve the problem of Hive trillion-level data tilt. Xiaobian consulted all kinds of information and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubt of "Hivetrillion-level data tilt how to solve"! Next, please follow the editor to study!

Analysis of the problem of data skew

Data skewing is an inevitable problem in distributed systems, any distributed system is likely to have data tilting, but some partners are not very aware of it in their daily work. Here, we should pay attention to the title of this article-"hundreds of billions of data". Why do we say hundreds of billions of data? because if the amount of data of a task is only a few million, even if it has a data tilt, all the data will be executed by one machine. For millions of data, there is no pressure for a machine to execute. At this time, the data tilt is not very perceptive to us, only when the data reaches an order of magnitude. A machine can not handle so much data, if the data tilt occurs, it is difficult to calculate the result.

Therefore, we need to optimize the problem of data tilt to avoid or reduce the impact of data tilt as far as possible.

Before solving the problem of data skew, I would like to mention one more thing: when there is no bottleneck, talking about optimization is asking for trouble.

Come to think of it, among the two stages of map and reduce, the most prone to data skew is the reduce phase, because map to reduce will go through the shuffle phase, and in shuffle, hash will be carried out by default according to key. If there are too many of the same key, then the result of hash will be a large number of the same key into the same reduce, resulting in data skew.

So is it possible to tilt the data in the map phase?

In a task, the data file will be split before entering the map phase. The default is 128m per data block. However, if you use compression methods such as GZIP compression that do not support file splitting operation, the MR task cannot split the compressed file when it reads the compressed file. The compressed file will only be read by one task. If a very large, non-sharpable compressed file is read by a map The data skew occurs in the map phase.

So, in essence, there are two reasons for data skew: one is that a large amount of data from the same key needs to be processed in the task. Second, the task reads indivisible large files.

Data tilting solution

The principle of the data tilt solution in MapReduce and Spark is similar. The following discussion discusses the data tilt caused by Hive using the MapReduce engine, and Spark data tilt can also be used as a reference.

1. Data skew caused by null value

In the actual business, there are a large number of null values or some meaningless data involved in the calculation job, and there are a large number of null values in the table. If join operations are carried out between tables, shuffle will be generated, so that all null values will be assigned to a reduce, which will inevitably lead to data skew.

Before, a partner asked, if the join operation of An and B tables, if the field that needs join in A table is null, but the field that needs join in B table is not null, these two fields are not join at all, why would they be put in a reduce?

Here we need to make it clear that the reason why the data is put in the same reduce is not because the fields can be join or not, but because of the hash operation in the shuffle phase. As long as the hash result of the key is the same, they will be pulled into the same reduce.

Solution:

The first: you can directly disallow null values from participating in join operations, that is, null values are not allowed to have shuffle phases

SELECT * FROM log a JOIN users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL

Second: because the hash results of null values are the same when participating in shuffle, we can randomly assign null values so that their hash results are different and will go into different reduce:

SELECT * FROM log a LEFT JOIN users b ON CASE WHEN a.user_id IS NULL THEN concat ('hive_', rand ()) ELSE a.user_id END = b.user_id

two。 Data skew caused by different data types

For two tables join, the field key that requires join in table an is int, and the key field in table b has both string and int types. When performing join operations on two tables according to key, the default Hash operation is assigned according to int id, so that all string types are assigned to the same id, resulting in all string type fields going into one reduce, causing data skew.

Solution:

If the key field has both string type and int type, the default hash will be assigned according to the int type, so we can directly convert all the int types to string, so that if the key fields are all string,hash, they will be assigned according to the string type:

SELECT * FROM users a LEFT JOIN logs b ON a.usr_id = CAST (b.user_id AS string)

3. Data skew caused by inseparable large files

When the amount of data in the cluster grows to a certain scale and some data needs to be archived or dumped, the data is often compressed. When GZIP compression and other compression methods that do not support file division operation are used, when a job involves reading the compressed file in the future, the compressed file will only be read by one task. If the compressed file is large, the Map to process the file will take much longer than the Map time to read the normal file, and the Map task will become a bottleneck for the job to run. In this case, Map reads the data skew of the file.

Solution:

There is no good solution to this data skew problem, and files that do not support file segmentation such as GZIP compression can only be converted to file segmentation compression methods such as bzip and zip.

Therefore, when we compress files, in order to avoid the tilt of data reading caused by inseparable large files, we can use compression algorithms such as bzip2 and Zip that support file segmentation.

4. Data skew caused by data expansion

In the calculation of multidimensional aggregation, if there are too many fields for grouping aggregation, as follows:

Select a dint breco c count (1) from log group by a dint breco c with rollup

Note: I don't know if you have used the final with rollup keyword, with rollup is used for statistical summary on the basis of grouped statistics, that is, to get the summary information of group by.

If the amount of data in the above log table is very large, and the aggregation on the map side can not play a good role in data compression, it will lead to the rapid expansion of the data produced by the Map side, which can easily lead to the exception of job memory overflow. If the log table contains a data skew key, it will aggravate the data skew of the Shuffle process.

Solution:

You can split the above sql and split the with rollup into the following sql:

SELECT a, b, c, COUNT (1) FROM log GROUP BY a, b, c; SELECT a, b, NULL, COUNT (1) FROM log GROUP BY a, b; SELECT a, NULL, NULL, COUNT (1) FROM log GROUP BY a; SELECT NULL, COUNT (1) FROM log

However, the above approach is not very good, because now we are grouping and aggregating 3 fields, so if it is 5 or 10 fields, then there will be more SQL statements to be dismantled.

In Hive, you can automatically control the disassembly of jobs by configuring the parameter hive.new.job.grouping.set.cardinality, which defaults to 30. Indicates that for multidimensional aggregations such as grouping sets/rollups/cubes, if the last disassembled key combination is greater than that value, a new task is enabled to handle combinations that are greater than that value. If the column of a grouping aggregate has a large tilt when processing data, the value can be reduced appropriately.

5. Data skew caused by table join

When two tables perform ordinary repartition join, if the key of the table join is skewed, it will inevitably cause data skew in the Shuffle phase.

Solution:

It is a common practice to store skewed data in a distributed cache and distribute it to each node where the Map task resides. The join operation, or MapJoin, is completed during the Map phase, which avoids Shuffle and thus data skew.

MapJoin is an optimized operation of Hive, which is suitable for small table JOIN and large table scenarios. Because the JOIN operation of the table is carried out on Map and in memory, it does not need to start the Reduce task and does not need to go through the shuffle phase, thus saving resources and improving JOIN efficiency to a certain extent.

Before Hive 0.11, if you wanted to complete the join operation in the Map phase, you had to use MAPJOIN to start the optimization operation explicitly, and pay attention to the size of the small table because it needs to load the small table into memory.

If you put table an in the memory of Map, you need to write this before version 0.11 of Hive:

Select / * + mapjoin (a) * / a.id, a.name, b.age from a join b on a.id = b.id

If you want to put multiple tables into the memory on the map side, you only need to write multiple table names in mapjoin (), separated by commas. For example, if you put table an and table c into the memory on the map side, then / * + mapjoin (amemc) * /.

In Hive version 0.11 or later, Hive enables the optimization by default, that is, it no longer uses MAPJOIN tags that need to be displayed, and it will trigger the optimization operation to convert ordinary JOIN into MapJoin if necessary. You can set the trigger time for this optimization through the following two attributes:

The default value of hive.auto.convert.join=true is true, and MAPJOIN optimization is enabled automatically.

The default value of hive.mapjoin.smalltable.filesize=2500000 is 2500000 (25m). Configure this property to determine the size of the table that uses the optimization, and if the size of the table is less than this value, it will be loaded into memory.

Note: if an inexplicable BUG occurs by default (for example, MAPJOIN does not work), set the following two attributes to fase and manually use the MAPJOIN tag to start the optimization:

Hive.auto.convert.join=false (turn off automatic MAPJOIN conversion operation)

Hive.ignore.mapjoin.hint=false (do not ignore the MAPJOIN tag)

One more thing: when putting the table into the memory on the map side, if the memory of the node is very large, but there is still a memory overflow, we can adjust the size of the memory on the map side through this parameter mapreduce.map.memory.mb.

6. It is really unable to reduce the data skew caused by the amount of data.

There is no way to reduce the amount of data in some operations, such as when using the collect_list function:

Select scuttle listing (s_score) list_score from student group by s_age

Collect_list: returns a column in a group into an array.

In the above sql, if there is data skew in s_age, when the amount of data is large to a certain amount, it will lead to a memory overflow exception in dealing with skewed reduce tasks.

Note: collect_list outputs an array and the intermediate results are put in memory, so if collect_list aggregates too much data, it will cause a memory overflow.

Some partners say that this is the data skew caused by groupby grouping, and you can turn on the hive.groupby.skewindata parameter to optimize it. Let's analyze it next:

Enabling this configuration splits the job into two jobs. The first job distributes the Map data equally to the Reduce phase as far as possible, and pre-aggregates the data at this stage to reduce the amount of data processed by the second job. The second job aggregates the results based on the data processed by the first job.

The core role of hive.groupby.skewindata is that the first job generated can effectively reduce the number. However, for functions such as collect_list, which require full operation of the intermediate results of all data, it obviously does not work. On the contrary, the introduction of new jobs increases the burden on the disk and network, resulting in even lower performance.

Solution:

The most direct way to solve this kind of problem is to adjust the amount of memory that reduce executes.

Adjust the memory size of reduce using the configuration of mapreduce.reduce.memory.mb.

At this point, the study on "how to solve the Hive trillion-level data tilt" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report