In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail the example analysis of Hive data skew. The editor thinks it is very practical, so I share it for you as a reference. I hope you can get something after reading this article.
1. The main challenge of offline data: "data skew"
First of all, the concept of "data tilt" is introduced.
The "tilt" should come from the skewed distribution in statistics, and the tilt in data processing is related to it.
For distributed data processing, we want the data to be evenly distributed to each processing node, but in fact, due to the problems of the business data itself or the distribution algorithm, the amount of data allocated to each node is probably not what we expected.
In other words, the whole data processing task can be completed only when the node with the most data has finished processing the data, and the significance of distribution is greatly reduced. Think of the 99% stuck.
In fact, even if each node is assigned roughly the same amount of data, the data may still tilt, such as considering the extreme problem of statistical word frequency. if a node is assigned a word, it will take a long time to show this node. even if the amount of data is the same as that of other nodes.
The optimization of Hive adopts various measures and methods to optimize and deal with the tilt problem of the above scenes.
II. Optimization of Hive
In fact, in the actual Hive SQL development process, Hive SQL performance issues are actually only a small part of the data tilt-related.
In many cases, the slow running of Hive SQL is caused by developers' lack of understanding of the data used and some bad usage habits.
Developers should determine the following points:
"do the indicators that need to be calculated really need to be summarized from the common detail layer of the data warehouse?" is the common summary layer developed by the data common layer team able to meet its own needs? For the public, KPI-related indicators and other usually well-designed data warehouse common layer must have been included, can be used directly.
"do you really need to scan so many partitions?" for example, for the sales detail transaction table, the calculation and IO overhead caused by scanning a year's partition and scanning a week's partition are two orders of magnitude, and the time spent must be different. As developers, we need to carefully consider the business needs and try not to waste computing and storage resources!
"try not to use the method of select * from your_table, specifying which columns are used. "for example, select coll and col2 from your_table, in addition, filter conditions are also added to the where condition as far as possible to remove irrelevant data rows, thus reducing the amount of data that needs to be processed and distributed in the whole MapReduce task.
"the input file should not be a large number of small files. "the default Input Split for Hive is 128MB (configurable), and small files can be merged into large files first.
After ensuring the above points, sometimes it is found that Hive SQL still has to run for a long time, or even can not run out, then the real Hive optimization technology is needed!
III. Join-independent optimization
Basically, most of the Hive SQL performance problems are related to join, and for the problems that have nothing to do with join, there are mainly group by-related tilt and count distinct-related optimizations.
Tilt optimization caused by group by
The tilt caused by group by is mainly caused by the uneven distribution of input data rows according to "group by columns".
For example, suppose that the order number is counted according to the supplier-to-sales factual table, then the order quantity of some large suppliers is obviously very large, while the order quantity of most suppliers is mediocre. Because the group by is distributed to each Reduce Task according to the supplier's ID, then the Reduce Task assigned to the large supplier allocates more orders at this time, resulting in data skew.
For tilt caused by group by, the optimization measure is very simple, you only need to set the following parameters:
Set hive.map.aggr = true set hive.groupby.skewindata=true
At this point, Hive will load balance when the data is skewed, and the generated query plan will have two MapReduce Job.
In the first MapReduce Job, the set of output results from Map is randomly distributed to the Reduce, and each Reduce does a partial aggregation and outputs the results. The result of this treatment is that the same GroupBy Key may be distributed to different Reduce, thus achieving the purpose of load balancing.
According to the results of the preprocessed data, the second MapReduce Job distributes to the Reduce according to GroupBy Key (this process ensures that the same GroupBy Key is distributed to the same Reduce), and finally completes the final aggregation operation.
Count distinct optimization
During Hive development, you should be careful with count distinct, as it can easily cause performance problems, such as the following SQL:
Select count (distinct user) from some_table
Because it has to be deduplicated, Hive will distribute all the output of the Map phase to the Reduce Task, which can easily cause performance problems. For this case, you can optimize it by first group by and then count. The optimized SQL is as follows:
Select count (*) from (select user from some_table group by user) tmp
The principle is: first use group by to remove duplicates, and then count the number of rows of group by.
IV. Join optimization for large tables and small tables
The optimization related to join is mainly divided into the optimization that can be solved by mapjoin (that is, large table join small table) and the optimization that mapjoin can not solve (that is, large table join large table). Large table join small table is relatively easy to solve, large table join is relatively complex and difficult to solve, but it is not insurmountable, it is just relatively troublesome.
First of all, this paper introduces the optimization of large table join and small table. The sales detail fact table is still used as an example to illustrate the scenario of large join and small tables.
If the supplier will make a rating, such as (five-star, four-star, two-star, one-star), the business staff wants to be able to analyze the daily sales of each supplier star and its proportion.
Developers generally write the following SQL:
Select Seller_srar, count (order_id) as ordre_cnt from (select order_id,seller_id from dwd_sls_fact_detail_table where partition_value='20170101') a Left outer join (select seller_id,seller_star from dim_seller where partition_value='20170101') b on a.seller_id = b.seller_id group by b.seller_star
But as mentioned above, the real-world 28 rule will lead to a concentration of orders on some suppliers, while good suppliers are usually rated higher, exacerbating the skew of the data. If not optimized, the above SQL will take a long time or even run without results!
Generally speaking, suppliers are limited, such as thousands or tens of thousands, the amount of data will not be very large, but the sales details and fact tables are relatively large. This is a typical large table join and small table problem, which can be optimized by mapjoin. You only need to add mapjoin hint. The optimized SQL is as follows:
Select / * + mapjoin (b) * / Seller_srar, count (order_id) as ordre_cnt from (select order_id,seller_id from dwd_sls_fact_detail_table where partition_value='20170101') a Left outer join (select seller_id,seller_star from dim_seller where partition_value='20170101') b on a.seller_id = b.seller_id group by b.seller_star
/ * + mapjoin (b) * / that is mapjoin himt. If more than one table is required for mapjoin, the format is / * + mapjoin (bmemec) * /.
Hive is enabled by default for mapjoin, and the setting parameter is:
Set hive.auto.convert.join=ture
Mapjoin optimization is to do join in the Map phase, instead of join on each Reduce task node after the Reduce phase is distributed according to join columns as usual. there is no need for distribution and there is no skew problem. Instead, Hive copies all the small tables to each Map task node (in this case, dim_seller, of course, only the columns specified in table b sql), and then each Map task node executes the lookup small table.
"as can be seen from the above analysis, the small table should not be too large, otherwise the loss of full copy and distribution outweighs the gain. "
In fact, Hive determines whether the size of the small table meets the condition (the default is 25m) based on the parameter hive.mapjoin.smalltable.filesize (0.11.0 followed by hive.auto.convert.join.noconditionaltask.size).
In practice, the maximum value allowed for this parameter can be modified, but generally the maximum cannot exceed 1GB (if it is too large, the memory of the node where the Map task is located will burst and Hive will report an error. It is also important to note that the file size shown by HDFS is the compressed size, and when it is actually loaded into memory, the capacity will increase a lot, and may expand 10 times in many scenarios.
5. Optimization of large table Join
What if the above mapjoin small table dim_seller is very large? Like exceeding the size of a 1GB? This is the problem of big join.
This kind of problem is relatively complex, we first introduce specific problem scenarios, and then introduce a variety of optimization schemes based on this.
Problem scenario
Let's assume a problem scenario:
Table An is a summary table, which summarizes the transaction summary information of sellers and buyers in the last N days, that is, for each seller in the last N days, how many orders have been completed by each buyer and how much the total amount is, we only take 90 days here. The total value only takes the single number of transactions. The fields of table An are: buyer_id, seller_id, and pay_cnt_90d.
Table B is the seller's basic information table, which contains a hierarchical rating information of the seller, such as dividing the seller into six levels: S0, S1, S2, S3, S4, S5, S6.
The result to be obtained is the transaction percentage information of each buyer at all levels of the seller, such as:
A buyer, S0RO 10%; S1R20%, S2R20%, S3R10%, S4R20%, S4R10%, S5R10, 10%.
The fields of table B are: seller_id and s_level.
As with the example in mapjoin, our first reaction is to directly join the table and count:
Select m.buyer_id, sum (pay_cnt_90d) as pay_cnt_90d, sum (case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0, sum (case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl, sum (case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2, sum (case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3 Sum (case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4, sum (case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 from (select a. Buyerwriters) a. Sellertrainidcoveragea.payroll cntparts 90d from (select buyer_id, seller_id,pay_cnt_90d from table A) a join (select seller_id) S_level from table B) b on a.seller_id=b.seller_id) m group by m.buyer_id
But this SQL will cause data skew because of the seller's 28-8 rule. Some sellers have millions or even tens of millions of buyers within 90 days, but most sellers have only a small number of buyers within 90 days. Join table_A and table_B are distributed by ODPS according to Seller_id, and table_A 's big sellers tilt the data.
"however, the skew problem of this data cannot be solved by mapjoin table_B, because the seller has more than 10 million GB and several GB of file size, which exceeds the maximum 1GB limit of the mapjoin table. "
Scenario 1: convert to mapjoin
Large tables cannot be directly mapjoin, so is it possible to do so indirectly? In fact, there are two ways to do this: restrict rows and restrict columns.
Restrict rows: you don't need the full table of join B, only the join that exists in table A. For this problem scenario, it is to filter out the sellers who have not closed the transaction within 90 days.
Limit column: take only the fields you need.
Select m.buyer_id, sum (pay_cnt_90d) as pay_cnt_90d, sum (case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0, sum (case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl, sum (case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2, sum (case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3 Sum (case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4, sum (case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 from (select / * + mapjoin (b) * / a.buyerSecretidlce a. SellerSecretidLiga. Seller levelling. Payoffs cntters 90d from (select buyer_id, seller_id,pay_cnt_90d from table_A) a join (select b0.seller id) S_level from table_B b0 join (select seller_id from table_A group by seller_id) a0 on b0.seller_id=a0.seller_id) b on a.seller_id=b.seller_id) m group by m.buyer_id
This scheme can work in some cases, but in many cases it can not solve the above problems, because although most sellers do not have 90 buyers, there are still some, and the filtered B table is still very large.
Use the case when statement when you plan 2:join
The application scenario is that the value of tilt is explicit and the number is very small, such as the tilt caused by null value.
When these skewed values are randomly distributed to Reduce, the main core logic is to concat random numbers with these special values when join, so as to achieve the purpose of random distribution. The core logic is as follows:
Select a. Hive' _
Hive has optimized this. You don't need to modify SQL, you just need to set parameters. For example, the values of "0" and "1" of table_B cause skew. You only need to set the following settings:
Set hive.optimize.skewinfo=table_B: (seller_id) [("0") ("1")]; set hive.optimize.skewjoin=true
However, option 2 still can not solve the above problems, because there are a large number of inclined sellers and dynamic changes.
Scheme 3: multiple B table, and then join.
General scheme
Is to create a numbers table with only one column of int rows, such as 1 to 10 (depending on the tilt), then zoom in to table B 10 times, and then take the module join.
Select mtraining buermaker id, sum (pay_cnt_90d) as pay_cnt_90d, sum (case when m.s_level=O then pay_cnt_90d end) as pay cnt 90d so, sum (case when m.s_level=l then pay cnt 90d end) as pay cnt 90d_sl, sum (case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d S2, sum (case when m.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3 Sum (case when m.s_level=4 then pay_cnt_90d end) as pay cnt 90d S4, sum (case when m.s level=S then pay cnt 90d end) as pay cnt 90d s5 from (select a. Buyerwriters from 90d select buyer_id,seller_id,pay_cnt_90d from table_A) a JOin (select / * + mapjoin (members) * / seller_id,s_level Member from table_B join numbers) b on a.seller_id=b.seller_id and mod (a.payroll CNT 90d 10) + 1=b.number) m group by m.buyer_id
The core of the idea is: since the distribution will be skewed according to seller_id, then manually add a column for distribution, so that the tilt of the previously skewed value will be reduced to the original 1apt 10. The tilt can be reduced by configuring the numbers table to modify the magnification, but the disadvantage is that the B table expands N times.
Proprietary scheme
The general idea of the scheme is to magnify each piece of data in Table B by the same multiple. In fact, you only need to magnify the big seller.
First of all, you need to know the list of big sellers, that is, to create a temporary table to dynamically store the latest daily big sellers (such as dim_big_seller), while the big sellers of this table should expand by a predetermined multiple (such as 1000 times).
Create a new join column in table An and table B, whose logic is: if it is a big seller, then concat randomly assigns a positive integer (between 0 and a predefined multiple, in this case, 0,1000); if not, it remains the same.
Compared with the general scheme, the running efficiency of the special scheme is obviously much better, because it only magnifies the number of rows of big sellers in table B by 1000 times, while the number of rows of other sellers remains the same, but at the same time, we can also see that the code is much more complicated. and you must first establish a big seller table.
Option 4: dynamic split into two
In fact, programs 2 and 3 use the idea of splitting into two, but they are not complete. For the problems that mapjoin cannot solve, the ultimate solution is dynamic division, that is, to deal with the tilted key value and the non-tilted key value separately, the normal join without tilting can be done, tilted to find them and then do mapjoin, and finally union all the results.
But this solution is troublesome, the code becomes complex and requires a temporary table to hold skewed key values.
-direct map join for sellers with more than 10000 buyers in 90 days, and normal join for other sellers
-Direct map join for sellers with more than 10000 buyers in 90 days For other sellers, normal join can be select m.buyer_id, sum (pay_cnt_90d) as pay_cnt_90d, sum (case when rn.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0, sum (case when rn.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl, sum (case when rn.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 Sum (case when rn.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3, sum (case when rn.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4, sum (case when rn.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 from Pay_cnt_90d from table_A) a join (select seller_id, a.s_level from table_A a left outer join tmp_table_B b on a.user_id = b.seller_id where b.seller_id is null) b on a.seller id=b.seller id union all select / * + mapjoin (b) * / a.buyerwriting A.pay_cnt_90d from select buyer_id,seller_id,pay_cnt_90d from table A) a join select seller_id,s_level from table B) b on a.seller id=b.seller id) m group by m.buyer_id) m group by m.byer_id
To sum up, the general solutions in scenarios 1, 2, and 3 are not guaranteed to solve the problem of large tables join large tables, because they all have different limitations and specific usage scenarios.
The special scheme of scheme 3 and scheme 4 are the recommended optimization schemes, but they all need to create a new temporary table to store the big sellers who change every day.
Compared with scenario 4, the dedicated scheme of scenario 3 does not need to modify the code framework, but the B table will be enlarged, so it must be a dimension table, otherwise the statistical results will be wrong. The solution of scenario 4 is the most general and the most liberal, but the changes to the code are also the biggest, and even the code framework needs to be changed, which can be used as the ultimate solution.
This is the end of the article on "sample Analysis of Hive data skew". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.