Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What does data skew in Hadoop mean?

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Editor to share with you what the data tilt in Hadoop means, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!

In parallel computing, we always hope that each task allocated can be split at the same granularity and the completion time is not much different, but the hardware in the cluster may be different, the type of application and the size of the split data will always be different, some tasks will greatly slow down the completion time of the whole task, not to mention different hardware, different types of applications, such as some computing in page rank or data mining. The cost of each record is different. Here we only discuss the problem of data skew in data segmentation of relational operations (which can generally be expressed in SQL).

A background in which data skew in hadoop can greatly affect performance is that sort is always carried out regardless of conditions in the mapreduce framework. In general cases, map sort + partition + reduce sort can get results, but this process is not necessarily optimal. For relational computing, where the greatest impact of data skew is in the sort of reduce, when the amount of data processed by reduce exceeds the threshold of less than 2 times the size of the given reduce jvm (this threshold is my guess, based on the actual monitoring operation), multi-pass merge sort will occur on the reduce side. At this time, if you look at the metrics of these slower reduce task, you will find that the metrics associated with IO in reduce is much larger than that of other reduce. For details, please refer to the ppt of Todd's performance tuning on hadoop summit this year (page 26):

Http://www.slideshare.net/cloudera/mr-perf

This kind of unconditional sorting on the reduce side is only implemented by hadoop, and it is not necessary for the mapreduce framework to sort. Other mapreduce implementations or other distributed computing frameworks may have less bottleneck on reduce. For example, group by in shark is based on hash rather than sort.

There are two common data skews in relational computing: group by and join, and other possible ones are:

The operation of in or exists, especially the return of in or exists as subquery (in or exists sometimes becomes left semi join)

Union or union all with the same input source may also be available (other collection types of operations such as intersect may also be available).

Udtf in hive is also a kind.

Only the most common cases of group by and join are discussed here.

Data distribution:

The normal data distribution is theoretically skewed, which is what we call the 20-80 principle: 80% of wealth is concentrated in the hands of 20% of people, 80% of users only use 20% of features, and 20% of users contribute 80% of visits. There are generally two situations in which different data fields may skew:

One is that the unique values are very small, and very few values have very many recorded values (the only values are less than a few thousand).

One is that there are more unique values. Some values in this field have far more records than other values, but its proportion is also less than 1% or 1/1000.

Zoning:

The common mapreduce partitioning methods are hash and range

The advantage of hash partition is that it is flexible, independent of data type, and easy to implement (just set the number of reduce. Generally, you don't need to implement it yourself)

Range partition requires implementers to understand the data distribution, and sometimes manual sample sampling. At the same time, it is not flexible enough, which is shown in several aspects, 1. Different range partition needs to be implemented for different fields of the same table. For the time field, the size of the range is not necessarily segmented according to different query types or filter conditions.

2. Sometimes it may be designed to use multiple field combinations, when the previous partition class of a single field cannot be used, and there may be implicit relationships between multiple field combinations, such as date of birth and constellation, merchandise and season.

3. Manual sample is very time-consuming and requires users to have domain knowledge of the distribution of data sets used in queries.

4. The distribution mode is dead, and the number of reduce is determined. Once the tilt occurs under certain circumstances, adjust the parameters.

Other partition types include hregionpartitioner or totalorder partitioner of hbase.

Some solutions to data skew that you can think of (welcome to add, especially if there are similar problems encountered by friends who are doing search or data mining):

1. Increase the jvm memory of reduce

two。 Increase the number of reduce

3. Customer partition

4. Other optimization discussions.

5. Discussion on reduce sort merge sorting algorithm

6. Hive skewed join.

7. Pipeline

8. Distinct

9. Index, especially bitmap index

Method 1: since the calculation of reduce itself needs to be supported by appropriate memory, if the hardware environment allows, increasing the memory size of reduce obviously has the possibility to improve data skew, this way is especially suitable for the first case of data distribution, a single value has a large number of records, all records of this value have exceeded the memory allocated to reduce, no matter how you partition this situation will not change. Of course, the limitation of this situation is also very obvious, 1. Memory limitations exist, 2. It may have an unstable impact on the operation of other tasks in the cluster.

Method 2: this is valid for the second case of data distribution, there are more unique values, and the number of records of a single unique value will not exceed the memory allocated to reduce. If occasional data skew occurs, increasing the number of reduce can alleviate the accidental situation in which some reduce inadvertently allocate more records. But it is not valid for the first kind of data distribution.

Method 3: one situation is that some domain knowledge tells you the significant type of data distribution, such as the temperature problem in hadoop definitive guide, the distribution of a fixed combination (the location and temperature of the observation station) is fixed, for a specific query, if the previous two methods are not useful, implementing your own partitioner may be a good way.

Mode 4: there are some optimizations for data skew, such as pig's skewed join.

Http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins

The pig document says that the partition is determined according to the statistical information entered by the data (that is, range partition?). In addition, it is not clear whether this behavior is determined by the dynamic runtime, that is, there is a step before running that pig automatically does the work of sample, because pig does not have statistical information.

Group by in hive

Hive.groupby.skewindata

False

Whether there is skew in data to optimize group by queries

Hive.optimize.groupby

True

Whether to enable the bucketed group by from bucketed partitions / tables.

Hive.mapjoin.followby.map.aggr.hash.percentmemory

0.3

Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join

Hive.groupby.mapaggr.checkinterval

100000

Number of rows after which size of the grouping keys/aggregation classes is performed

The idea of the last parameter hive.groupby.mapaggr.checkinterval is similar to that of in-memory combiner. In-memeory combiner occurs before the sort on the mapper side, rather than the current combiner occurs after mapper sort or even reread the disk and sort and merge after writing to the disk. In-memeory combiner seemed to be "Data-Intensive Text Processing with MapReduce" at first, and it was mentioned in mapr's introduction ppt last year that they also had this optimization. There is a greater chance of data reduction on the mapper side than on the reduce side, so we generally do not see the discussion of the combiner on the reduce side, but there is also this way of thinking. For example, in the join discussion of google tenzing, a small optimization of prev-next is based on the combiner of the reduce side, but that premise is that the data is already ordered on the basis of the block shuffle implementation, so there is a high probability that the former data is the same as the latter.

Skewed join in hive: the previous article has introduced several optimizations of hive in two tables join. The similar idea of skewed join is the second kind of skewed introduced above: increase the number of reduce. In hive, it is determined that if the threshold is greater than the amount of data that a reduce needs to process, restart an extra task to process the data that the excess reduce itself needs to process, which is a late remedy. The hive itself is skewed at the beginning of the partition (the way of partition is unreasonable). When run-time monitoring reduce finds skewed special key and then extra task to deal with, the effect is relatively general. Interested students can refer to the discussion of this optimization idea between me and the facebook team in HIVE-3086. In section 6, I will discuss the difference between what I think and what facebook is doing.

Method 5: when the memory allocated by reduce is far less than the amount of data processed, the situation that multi-pass sort will be generated is the bottleneck, so ask

1. Is this sort of sort necessary?

two。 Are there any other sorting algorithms or optimizations that can lower the threshold of his bottleneck according to a particular situation?

3. Is map reduce suitable for this situation?

About question 1. If it is group by, then for data distribution case 1, hash is much better than sort, even if one reduce handles more data than other reduce, the calculation method of hash will not be too different.

Question 2. One is that if you implement block shuffle, it will greatly reduce the cost of sorting itself. In addition, if the reduce after the partition is not calculated using copy-> sort-merge-> reduce, the header information of each block can be stored in memory after copy, and the reduce can be calculated directly without sort-merge, but at this time it becomes random access, rather than the sequential access after the current sort-merge. There are two types of implementation of block shuffle. One is that when there is a column data format in hadoop, there is a greater chance that the data will have been sorted and split according to block. Generally, the block is 1m (you can follow avro-806). At this time, mapper does nothing, and even the cost of computing partitions is many times less. Go directly to the last step of reduce, and the second type is support without column data format. Need mapper to sort to get the maximum and minimum value of block, reduce side to save the maximum and minimum value in memory, after the completion of copy, directly use this value to do random reading and then reduce. (the implementation of block shuffle can focus on MAPREDUCE-4039, and hash computing can focus on MAPREDUCE-1639)

Question 3. Map reduce has only two functions, one map and one reduce. Once data skew occurs, partition fails. In the case of join, a certain key allocates too many records, and for the chance of only one partittion, the harm caused by misallocation of data skew is already caused. This situation is difficult to debug, but if you calculate based on map- reduce-reduce, you do not need to assign the same key to the same reduce. The results obtained in the first reduce can be summarized in the second reduce to remove weight, the second reduce does not need sort-merge steps, because the former reduce has been sorted, the middle reduce does not care about how to divide the partition data, the amount of data processed is the same, while the second reduce does not use sort-merge to sort, will not encounter the current memory size problem, for skewed join this kind of case bottleneck is naturally much smaller.

Mode 6: hive currently has several jira case, HIVE-3086, HIVE-3286, HIVE-3026 that are under development to deal with skewed join situations. A brief introduction is that facebook wants to list a single tilted value by manually enumerating in advance, and treat these values as map join when join, and use the original way for other values. Personally, I think this is too inscalable, the value itself does not take into account the size of the data after the application of filtering conditions and optimization, and the values listed in advance are based on the entire partition. If join key is to combine key, it should not be considered. There are restrictions on the storage of metastore. Both large and small input tables will be scan twice (once processing non-skew key and once processing skew key as map join), and output tables will also scan twice (merge the two results). Skew key must list in advance, which has the cost of additional maintenance. At present, because the development is not complete enough to be put into production, so when all the features have been dealt with, there are documents to see if this processing method is effective. I personally think that the idea should be to continue with bucked map join's thinking. It is just that you do not have to deal with the problem of cluster key in advance. At this time, the choice of cluster key should be join key + some column that can disperse join key, which is tantamount to dispersing the value of the same key of the large table into several different reduce, while the join key of the small table must also be cluster to the same key corresponding to the large table. It is not too difficult for the second case of data distribution in join. It is good to increase the number of reduce, mainly the first one. Join key of large tables is required to be distributed, and small tables with the same join key can match all records in large tables. This idea is that there is no need to scan the large table twice or the result output table, and it does not need to be processed manually in advance. The data is the data after the filter condition is applied in dynamic sample, rather than the inaccurate results based on statistical data in advance. This basic idea is the same as the distributed hash join described in tenzing, find a way to cut it to the right size and then use hash and map join.

Mode 7: when join and group appear at the same time, then the two operations should be performed in pipeline (pipeline) mode. In join, you can directly use the group operator to reduce a large amount of data, instead of waiting for join to complete, and then write to the disk, and group reads the disk to do group operations. HIVE-2206 is doing this optimization. There is no concept of pipeline in hive. People like cloudera's crunch or twitter's Scalding all have this concept.

Mode 8: distinct itself is an abbreviation of group by. I originally thought that count (distinct x) is the same as group by, but I found that distinct in hive is obviously slower than group by, which may be related to the combiner on the map side of group by. In addition, it is observed that hive estimates the number of reduce of count (distinct x) is less than that of group by, so use count (distinct x) in hive, or set the number of reduce as large as possible. Directly set the number of reduce or hive.exec.reducers.bytes.per.reducer to reduce, I personally prefer to adjust the latter one. If there is no statistical information on the current number of reduce in hive, you will enter the previous value using count (distinct x). If you use count (distinct x) after join, this default value will generally be tragic. If there is a where condition and can filter a certain amount of data, then the default number of Maps may be better. Anyway, it's better to waste a little more reduce slot than to wait for a dozen or even dozens of minutes, or it's good to convert it to group by, and distributed by is also helpful when writing it as group by.

Mode 9: the index in hive is the materialized view. In the case of group by and distinct, it becomes the map side doing the calculation, and naturally there is no tilt. Bitmap index, in particular, has a greater advantage for columns with fewer values, but the trouble with index is that you need to judge whether your sql is commonly used sql. In addition, if you do not select the field you use when querying, this index cannot be used (in hive, there can never be the concept of using index to lookup or join original table in DBMS).

Other suggestions:

Another good description of data tilt that can be found on the Internet is

Http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf

Neither map side skew nor expensive record is a problem in relational computing, so it is not the focus of this article. For relational computing, the greatest impact of data tilt is in reduce's sort. The five good suggestions summarized at the end of this article are worthy of reference.

The third of which requires you to know the application of combiner and special optimization methods to improve the performance, hive map aggr in data distribution 1 effect will be better, data distribution 2 effect is not big, there are combiner applications when it consumes system resources, confirm whether this consumption is worthwhile and not in any case to use combiner.

For the fourth point of relational calculation, the tilt of map is not common. One example that can be given is unreasonable partitioning, or unreasonable key selection of cluster by in hive (all using directories, which are the minimum processing units).

Use domain knowledge when choosing the

Map output partitioning scheme if the reduce operation is

Expensive: Range partition or some other form of explicit

Partition may be better than the default hash-partition

Try different partitioning schemes on sample

Workloads or collect the data distribution at the reduce input

If a MapReduce job is expected to run several times

Implement a combiner to reduce the amount

Of data going into the reduce-phase and, as such, significantly

Dampen the effects of any type of reduce-skew

Use a pre-processing MapReduce job that

Extracts properties of the input data in the case of a longruning

Skew-prone map phase. Appropriately partitioning the

Data before the real application runs can significantly reduce

Skew problems in the map phase.

Best Practice 5. Design algorithms whose runtime depends

Only on the amount of input data and not the data distribution.

The other is the data tilt summary of Taobao:

Http://www.alidata.org/archives/2109

However, I personally think that it is not very helpful. The first solution, the impact of null values, is strongly opposed by the first Union All. The same table, especially the large table, is scanned twice. The extra cost does not match the benefits. It is not recommended. The second way to change the special value into random, is the result correct? Especially in all kinds of cases, is the output correct? The background inside seems to be that the primary key of the small table users is userid, and then userid is join key, and it is not empty? It is not recommended, and the correctness of the background conditions and output is in doubt.

The second problem of different data types is the same as that of HIVE-3445, which is modified in advance.

The third is because there is no map side hash aggr parameter in Taobao's hadoop version. And written as distinct also has an extra MR step, which is not recommended.

Data skew is also a topic in MPP, which also designs a problem of data redistribution, but compared with MPP, there is a more mature mechanism. One is that mpp always specifies segmented by or distributed by, which is displayed and assigned to different physical machines, when dealing with the initial distribution of data. In addition, statistics will help the execution engine choose the appropriate redistribution. But statistics are not omnipotent, such as

1: the granularity and update of statistical information.

2: the data after applying the filter conditions may not conform to the original expected data distribution.

3: the statistical information is based on sampling, and there are always errors in all the real data.

4: the statistical information is based on partittion. If the query does not involve the segmentation of partition fields, only the sum of each partition can not be used to represent the overall statistical information.

5. A situation in which there is no statistical information for the intermediate process data of a temporary table or multi-step query.

6. A variety of other algorithm optimizations, such as in-mapper combiner or google Tenzing's prev-next combine, will affect the statistical difference in algorithm selection.

The above is all the content of the article "what does data skew in Hadoop mean?" Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report