In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the knowledge of "how to avoid data skew in Hive". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1. Overview of buckets in hive
For each table (table) or partition, Hive can be further organized into buckets, that is, buckets are finer-grained data range partitions. Hive is also an organization that conducts buckets for a column. Hive uses a hash of column values, and then divides the remainder by the number of buckets to determine which bucket the record is stored in.
There are two reasons to organize tables (or partitions) into Bucket:
(1) to obtain higher query processing efficiency.
Buckets add an extra structure to the table that Hive can take advantage of when dealing with some queries. Specifically, joining two tables with buckets on the same column (containing join columns) can be efficiently implemented using Map-side joins (Map-side join). Such as JOIN operations. For JOIN operations, two tables have the same column, if a bucket operation is performed on both tables. Then the bucket that holds the same column values can be operated by JOIN, which can greatly reduce the amount of data in JOIN.
(2) make sampling more efficient.
When dealing with a large data set, it will bring a lot of convenience if the query can be run on a small part of the data in the stage of developing and modifying the query.
Create tablecreate table bucketed_user (id int,name string) clustered by (id) sorted by (name) into 4 buckets row format delimited fields terminated by'\ t 'stored as textfile with bucket
First, let's look at how to tell Hive- that tables should be divided into buckets. We use the CLUSTERED BY clause to specify the columns used to divide buckets and the number of buckets to be divided:
CREATE TABLE bucketed_user (id INT) name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS
Here, we use the user ID to determine how to divide the bucket (Hive uses to hash the value and divide the result by the number of buckets and take the remainder. In this way, there will be a random set of users in any bucket (PS: it can actually be said to be random, isn't it? ).
In the case of a map-side connection, the two tables divide the buckets in the same way. The mapper that processes a bucket in the table on the left knows that the matching rows in the table on the right are in the corresponding bucket. Therefore, mapper only needs to get that bucket (which is only a small portion of the data stored in the table on the right) to join. This optimization method does not necessarily require that the number of buckets of two tables must be the same, and the number of buckets of two tables can also be multiplied. Use HiveQL to join two tables with divided buckets, see the "map connections" section (P400).
The data in the bucket can be sorted separately according to one or more columns. Because this turns the connection to each bucket into an efficient merge sort (merge-sort), it can further improve the efficiency of the map-side connection. The following syntax declares a table to use sort buckets:
CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS
How can we ensure that the data in the table is divided into buckets? It is certainly possible to load the data generated outside the Hive into a table divided into buckets. In fact, it is easier for Hive to divide buckets. This operation is usually for existing tables.
Hive does not check whether the buckets in the data file are consistent with the buckets in the table definition (whether for the number of buckets or the columns used to divide buckets). If the two do not match, you may encounter errors or undefined results when querying. Therefore, it is recommended that Hive be used to divide the buckets.
Join Optimization mapside join method in 2.hive 1: select / * + MAPJOIN (time_dim) * / count (*) from
Store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
Method 2: this can be done automatically by hive for map-side joinset hive.auto.convert.join=true
Select count (*) from
Store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
Executing the following code produces two map-only methods: select / * + MAPJOIN (time_dim, date_dim) * / count (*) from
Store_sales
Join time_dim on (ss_sold_time_sk = t_time_sk)
Join date_dim on (ss_sold_date_sk = d_date_sk)
Where t_hour = 8 and d_year = 2002
Setting the following two properties, hive, will automatically perform the above process. The first property defaults to true, and the second property sets the size of the map-side join that is suitable for reading memory files.
Set hive.auto.convert.join.noconditionaltask = true
Set hive.auto.convert.join.noconditionaltask.size = 10000000
Sort-Merge-Bucket (SMB) joins can be converted to SMB map joins.
We only need to set a few parameters:
Set hive.auto.convert.sortmerge.join=true
Set hive.optimize.bucketmapjoin = true
Set hive.optimize.bucketmapjoin.sortedmerge = true
The policy for large table selection has been set
Use the following properties:
Set hive.auto.convert.sortmerge.join.bigtable.selection.policy= org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
Several policy settings
Org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
Org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
Org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
For more information, please refer to connection: hive for detailed explanation of connection scheme (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins))
3, data tilt
The execution of Hive is divided into stages, and the difference in the amount of data processed by map depends on the reduce output of the previous stage, so how to distribute the data evenly to each reduce is the root to solve the data tilt.
Operation
Reason
1. Data is unevenly distributed on nodes.
2Gravity key is unevenly distributed (there is a large amount of individual value data in key, such as NULL, so data skew will easily occur in join)
3 count count (disctinct key), data skew is easy to occur when the data is large, because count (distinct) is grouped according to the data field.
4the use of by group is easy to cause data skew.
5, the characteristics of the business data itself
6. The table is not considered carefully.
7. Some SQL statements have data skew.
Performance
The progress of the task has been maintained at around 99% for a long time. Looking at the task monitoring page, it is found that only a few reduce tasks have not been completed. Because the amount of data it handles is too different from that of other reduce. The difference between the number of records in a single reduce and the average number of records is too large, usually up to 3 times or more. The longest time is longer than the average time.
4. The solution of data skew adjusts set hive.map.aggr=true parameters.
Partial aggregation at the end of Map, equivalent to Combiner
Set hive.groupby.skewindata=true
Load balancing is performed when the data is skewed, and the selected item is set to true, and the generated query plan will have two MR Job. In the first MR Job, the output result set of the Map is randomly distributed to the Reduce, and each Reduce does a partial aggregation operation and outputs the result. The result is that the same Group By Key may be distributed to different Reduce, thus achieving the purpose of load balancing. The second MR Job is distributed to the Reduce according to the Group By Key according to the preprocessed data results (this process ensures that the same Group By Key is distributed to the same Reduce), and finally completes the final aggregation operation.
SQL sentence adjustment: how to Join about the selection of the driving table, choose the table with the most uniform join key distribution as the driving table to do a good job of column clipping and filter operation, so as to achieve the effect that the amount of data becomes relatively small when the two tables do join. The size table Join uses map join to advance memory for small dimension tables (the number of records less than 1000). Complete reduce on the map. Large table Join big table changes the key of null values into a string plus random numbers, and divides the skewed data into different reduce. Because the null values are not associated, the final result will not be affected after processing. A large number of the same special values of count distinct are easy to tilt, and when there are a large number of values in the xx field, NULL or empty record solutions will deal with specific values. For example, null, filter out, where case specific ways to convert specific values, so that these values are different, while these values do not affect the analysis. Group by dimension is too small: Group by performs partial data merging set hive.map.aggr on Map;-- > whether to aggregate data on Map. The default setting is true.
Set hive.groupby.mapaggr.checkinterval;-- > the number of entries that perform aggregation operations on the map side.
Load balancing set hive.groupby.skewindata
The default value is false, which needs to be set to true
When set to true, it becomes two MapReduce
In the first MR JOb, the output of map will be randomly distributed to the Reduce, and each Reduce will do part of the aggregation operation and output the result, so that the same Group By Key may be distributed to different Reduce, thus achieving the purpose of auxiliary equalization.
The second MR JOb will be distributed to the Reduce according to the key according to the results of the preprocessed data, and the aggregation operation is finally completed.
5. Data skew caused by null values in a typical business scenario
Scenario: for example, in a log, there is often a problem of information loss, such as the user_id in the log. If you associate the user_id in the log with the user_id in the user table, you will encounter the problem of data skew.
Do not participate in the association select * from log an if the solution 1:user_id is empty
Join users b
On a.user_id is not null
And a.user_id = b.user_id
Union all
Select * from log a
Where a.user_id is null
Solution 2: assign a new key value select * to a null score
From log a
Left outer join users b
On case when a.user_id is null then concat ('hive',rand ()) else a.user_id end = b.user_id
Conclusion: method 2 is more efficient than method 1, not only with less io, but also with fewer tasks. In workaround 1, log is read twice, and jobs is 2. Solution 2 the number of job is 1. This optimization is suitable for skew problems caused by invalid id (such as-99,'', null, etc.). By changing the null key into a string plus a random number, the skewed data can be divided into different reduce to solve the data skew problem.
Data skew caused by the association of different data types
Scenario: the user_id field in the user table is the user_id field in the int,log table, with both string and int types. When Join operations for two tables are performed according to user_id, the default Hash operation is allocated by int-type id, which results in all records of string type id being assigned to one Reducer.
Solution: convert the number type to the string type select * from users a
Left outer join logs b
On a.usr_id = cast (b.user_id as string)
Small table is not small or big, how to use map join to solve the tilt problem
Use map join to solve the data skew problem of small tables (with a small number of records) associated with large tables. This method is used very frequently, but if the small table is so large that there will be bug or exceptions in map join, special handling is required. The following examples are:
Select * from log a
Left outer join users b
On a.user_id = b.user_id
The users table has a record of 600w +, so distributing users to all map is a lot of overhead, and map join does not support such large small tables. If you use ordinary join, you will encounter the problem of data tilt.
Solution: select / * + mapjoin (x) * / * from log a
Left outer join (
Select / * + mapjoin (c) * / d.*
From (select distinct user_id from log) c
Join users d
On c.user_id = d.user_id
) x
On a.user_id = b.user_id
If there are millions of user _ id in log, this goes back to the original map join problem. Fortunately, there will not be too many uv members per day, not too many members with transactions, not too many members with clicks, not too many members with commission, and so on. So this method can solve the problem of data skew in many scenarios.
6. Data tilt summary
It is our ultimate goal to distribute the output data of map to reduce more evenly. Due to the limitations of Hash algorithm, pressing key Hash will cause data tilt more or less. A great deal of experience shows that the cause of data tilt is human negligence in table construction or can be avoided by business logic. The more general steps are given here:
1. Sample the log table, which user_id is tilted, and get a result table tmp1. As for the computing framework, he does not know the distribution of all the data, so sampling is indispensable.
2. The distribution of data accords with the rules of sociological statistics, and there is inequality between the rich and the poor. There will not be too many inclined key, just as there are not many rich people and not many strange people in a society. So the number of tmp1 records will be very small. Map join tmp1 and users to generate tmp2, and read tmp2 to distribute file cache. This is a map process.
3, map read in users and log, if the record comes from log, check whether user_id is in tmp2, if so, output to the local file a, otherwise the generated key,value pair, if the record comes from member, the generated key,value pair enters the reduce phase.
4. Finally, the a file is merged and the output files of Stage3 reduce phase are merged and written to hdfs.
If confirming that the business requires such skewed logic, consider the following optimization options:
1. For join, when judging that the small table is not larger than 1G, use map join
2. For group by or distinct, set the
This is the end of hive.groupby.skewindata=true 's "how to avoid data skew in Hive". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.