Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to optimize the Join Operation of Reduce Terminal heavy Partition by MapReduce

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

MapReduce how to achieve Reduce heavy partition Join operation optimization, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can gain something.

1. Re-partition Join operation (Reduce side)

The first method introduced in this article is the most basic repartitioning Join operation, which allows internal and external Join to be performed. Before we begin, let's make it clear that the problem to be solved is to put the large dataset Join together, and the solution we choose is the Reduce end heavy partition Join. This method is a Join implementation on the reduce side, which combines records together using the sortmerge of MapReduce as a single MapReduce job, and can support N connections, where N is the number of datasets to be connected.

The Map side is responsible for reading data from the data set, determining the value of each Join operation, and including the key output of the value, the output key in the reducer, and combining the data sets together to generate the final result.

A single reducer call receives all the values corresponding to the Key issued by the Join operation of the map function, and divides the data into N partitions, where N is the number of datasets to connect. Reducer reads all inputs connected to value and partitions them into memory, then performs a Cartesian product across all partitions and emits the result of each Join operation.

Figure 6.10 basic MapReduce implementation of the repartitioned Join operation

To support this technology, MapReduce code needs to meet the following conditions:

Multiple map classes are supported, and each map class handles a different input dataset, which is done by using the MultipleInputs class.

You need a way to mark the records emitted by mapper so that they can be associated with the dataset of its origin, and this article uses the htuple project to work with the data in MapReduce.

The code for the repartition Join operation is as follows:

You can run the job and view the output using the following command:

Summary

Hadoop bundles a hadoop-datajoin module, which is a re-partitioned Join operation framework, including pipes for handling multiple input datasets and performing Join operations. The above example and the hadoop-datajoin code are the most basic forms of repartitioning Join, both of which require that all data connected to the key be loaded into memory before performing Cartesian product, but this method is not suitable if the cardinality of the connected key is greater than the available memory. The next technology will focus on solving this problem.

Second, optimize the re-partition Join operation.

The implementation of the old repartitioning Join operation wastes a lot of space, and you need to load all the value of a given key into memory to perform multiplexing, and to load smaller datasets into memory to iterate over larger datasets, and it is more efficient to execute Join along the way.

We want to perform a repartitioning Join in MapReduce without caching all records in reducer. The optimized repartitioning Join framework will cache only one of the datasets to connect to to reduce the amount of data cached in the reducer. This optimization caches only records from the smaller of the two datasets to reduce the memory overhead of caching all records. Figure 6.11 shows the improved repartitioning Join implementation.

Figure 6.11 re-partitioned Join operation optimization MapReduce implementation

This technique is somewhat different from the previous version, where auxiliary sorting is used to ensure that all records from the smaller dataset reach the reducer before the records from the big data set, thus minimizing the amount of data to be cached in the reducer. In addition, mapper issues the key of the username tuple that needs to be Join and the fields that identify the original dataset.

The following code shows a new enumeration showing how the user mapper populates the tuple field:

The MapReduce driver code needs to be updated to indicate which fields in the tuple are applied to sorting, partitioning, and grouping:

The partitioning program should partition only based on the user name so that all the user's records reach the same reducer.

The user name and dataset indicator should be used to sort the smaller dataset first (because the USERS constant is smaller than the USER_LOGS constant, the user records are sorted before the user logs in).

Grouping should be grouped by users so that both datasets are streamed to the same reducer call:

Finally, we will modify the reducer to cache the incoming user record, and then Join it with the user log:

You can use the following command to run the job and view the output:

Hive

Hive can support similar optimizations when performing repartitioning Join operations. Hive caches all datasets of the Join key and then streams large datasets so that they do not need to be stored in memory. It is assumed that the dataset last specified by Hive is the largest when querying. Imagine that you have two tables named users and user_logs, and the user_logs is much larger. To join these tables, we need to make sure that the user_logs table is referenced as the last in the query:

If you don't want to re-query, you can use the STREAMTABLE prompt to tell Hive which table is larger:

Summary

This operation improves the earlier technology by buffering only a small data set of value, but it still has the problem of data transfer between map and reducer, which is an expensive network cost. In addition, the old version can support N-way connections, but this implementation only supports two-way connections.

Third, use Bloom filter to reduce mixed washing data.

If you want to perform Join operations on a subset of data based on certain predicates, such as "only users living in California." So far, we have to implement filters in reducer to do this, because only one dataset holds detailed information about the state-- not in the user log. Next, I'll show you how to use Bloom filters on the map side, which can have a significant impact on job execution time. The problem I want to solve is to filter the data in the repartitioning Join operation, but to push the filter to mapper. One possible solution is to create a Bloom filter using a preprocessing job, and then load the Bloom filter in the repartition job to filter the records in the mapper.

Bloom filter is a very useful random data structure, which uses a bit array to succinctly indicate a set and can determine whether an element belongs to the set or not. However, Bloom requires much less memory than HashSet in Java, so they are well suited for working with large datasets. This solution has two steps, one is to run a job to generate a Bloom filter, which manipulates user data and is populated by users living in California, and the other is to use this Bloom filter to discard unwanted users in a repartitioning Join operation, which requires the Bloom filter because the mapper of the user log has no status details.

Figure 6.12 two-step process of using Bloom filters in a repartitioned Join

Step 1: create a Bloom filter

The first job is to create a Bloom filter that contains the California user name. Mapper generates intermediate Bloom filters, which reducer combines into a Bloom filter, and the job output is an Avro file containing serialized Bloom filters:

Step 2: repartition Join

The only difference between repartitioning Join and the above mentioned is that mapper loads the Bloom filter generated in the first step, and when processing map records, an element review of the Bloom filter is performed to determine whether the record should be sent to reducer. The following code shows two things: generalized Bloom filter loading, abstract mapper, and subclasses that support two Join datasets:

The following command runs two jobs and dumps the Join output:

Summary

This technique proposes an effective method to perform map-side filtering on two data sets to minimize the network I-O between mapper and reducer. As part of shuffle, it also reduces the amount of disk overflow data from mapper and reducer. Filters are usually the easiest and most effective way to accelerate and optimize jobs, and repartitioned Join is also suitable for other MapReduce jobs.

4. Data skew may occur in the Join operation on the reducer side.

Data skew is a problem that is easy to encounter in practice, and there may be two types of data skew:

High Join-key cardinality, where some of the connected key has a large number of records in one or two datasets, which I call join-product deviation.

Bad hash partition, a few reducer account for a large proportion of the total number of records, I call this hash partition tilt.

Add a large dataset with a high connection key base

This technique solves the skew problem of join-product, and the next technique checks the hash partition deviation. The problem now is that some connection key has a high cardinality, which can cause some reducer to run out of memory when trying to cache these key. We can filter out these key and connect them separately or overflow them into the reducer and schedule subsequent jobs Join.

If you know in advance which Key is high-cardinality, you can divide it into separate Join jobs, and if you are not sure which high-cardinality Key is, you may need to build intelligent detection in reducer and write it to a copy file, which is followed by the job Join, as shown in figure 6.14.

Figure 6.13 skewed processing when high cardinality keys are known in advance

Figure 6.14 know in advance the deviation of high cardinality key processing

Hive

Hive supports a skew mitigation strategy similar to the second method, which can be enabled by specifying the following configuration before running the job:

You can choose to set some other configurations to control the map-side connections that run on the high cardinality key:

Finally, if you use GROUP BY in SQL, you may also need to consider enabling the following configuration to handle deviations in grouped data:

Summary

This technique assumes that only one dataset has a high cardinality for a given Join key, so the map-side connection of a smaller dataset can be cached. If both datasets are high cardinality, you will be faced with an expensive Cartesian product operation that is slow to perform because it does not fit the way MapReduce works (which means it is inherently inseparable and parallelizable). In this case, we should re-examine whether there are any techniques, such as filtering or projection, that can help reduce the time required to perform join.

6. Deal with the deviation generated by the hash partition

The default partitioning program for MapReduce is a hash partitioning program that accepts a hash of each map output key and models the number of reducer to determine which reducer the key is sent to. Hash partitioning programs can be used well as general partitioning programs, but some datasets may cause hash partitioning programs to be overloaded because some disproportionate keys are hashed to the same reducer. These reducer take longer to complete than most reducer. In addition, when you check the straggler reducer counter, you will notice that the number of groups sent to the laggards is much higher than the other groups that have been completed.

To distinguish between deviations caused by high cardinality key and hash partitions, MapReduce reducer can be used to identify data skew types. The deviation introduced by the poor-performing hash divider sends more groups (unique keys) to these reducer, while the high cardinality key that leads to skew can be proved by roughly the same number of groups in all reducer, and the more skew, the greater the number of reducer records.

The problem we are trying to solve is that the reducer-side connection takes a long time to complete, while lagging groups take longer than most reducer. Use a range partitioning program or write a custom partitioning program to centralize offset key into a set of reducer. The goal of this solution is to omit the default hash partitioning program and replace it with something that can better handle data skew, and this article provides two options to explore:

Replace the hash partitioning program with the range partitioning program using sampler and TotalOrderPartitioner bundled with Hadoop.

Write a custom partition program to route key with data skew to Reducer reserved for skewed key.

Range zoning method

The range partition allocates map output based on predefined values, where each map receives all reducer within that range, which is how TotalOrderPartitioner works. In fact, TeraSort uses TotalOrderPartitioner to distribute evenly across all Reducer to minimize data skew. TotalOrderPartitioner comes with a sampler that samples the input data and writes it to HDFS, which is then used by TotalOrderPartitioner when partitioning.

Custom zoning method

If you already know which Key displays skewed data, and the set of Key is static, you can write a custom partitioning program to push these high cardinality key to a set of reducer.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report