Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Three Join implementations of SparkSQL

2025-02-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Introduction

Join is a common operation in SQL statements. A good table structure can distribute data in different tables, make it conform to a certain normal form, reduce table redundancy, update fault tolerance and so on. The best way to establish a relationship between a table and a table is the Join operation.

For Spark, there are 3 implementations of Join, and each Join corresponds to a different application scenario:

Broadcast Hash Join: suitable for join with a smaller table and a large table

Shuffle Hash Join: suitable for join between a small table and a large table, or join between two small tables

Sort Merge Join: suitable for join between two larger tables

Both of the first two are based on Hash Join, but you need to shuffle or broadcast before hash join. The specific principles of these three different join will be explained in detail below.

Hash Join

Let's first take a look at this SQL statement:

Select * from order,item where item.id = order.i_id

Identify Build Table and Probe Table: this concept is important. Build Table uses join key to build Hash Table, while Probe Table uses join key to probe, and if the probe succeeds, you can join together. Typically, a small table is used as a Build Table and a large table as a Probe Table. In this case, item is Build Table,order, Probe Table; is a simple Join node, and the two tables that participate in join are item and order,join key are item.id and order.i_id, respectively. Now suppose the Join uses the hash join algorithm, and the whole process goes through three steps:

Build Hash Table: read the data of Build Table (item) in turn, hash,hash each row of data to the corresponding Bucket according to join key (item.id), and generate a record in hash table. The data is cached in memory. If there is no room for internal storage, you need to dump to external memory.

Probe: scan the data of Probe Table (order) in turn, use the same hash function to map the records in Hash Table, and then check the join condition (item.id = order.i_id) after the mapping is successful. If the match is successful, the two can be join together.

For the basic process, please refer to the figure above. Here are two small issues to pay attention to:

How is the performance of hash join? Obviously, hash join basically scans two tables only once, so it can be considered that o (aqb) has lost many streets compared with the most extreme Cartesian set operation aquib.

Why did Build Table choose a small table? The reason is simple, because the constructed Hash Table is best loaded in memory, which is the most efficient. This also determines that the hash join algorithm is only suitable for join scenarios with at least one small table, but not for join scenarios with two large tables.

As mentioned above, hash join is a stand-alone join algorithm in the traditional database, which needs to undergo some distributed transformation in the distributed environment. In the final analysis, it is to use distributed computing resources for parallel computing as much as possible to improve the overall efficiency. There are generally two classic schemes for hash join distributed transformation:

Broadcast hash join: distribute one of the small table broadcasts to the partition node where the other large table is located, and hash join concurrently with the partition records on it. Broadcast is suitable for scenarios where small tables are small and can be broadcast directly.

Shuffler hash join: once a small table has a large amount of data, it is no longer suitable for broadcast distribution. In this case, according to the same principle of the same inevitable partition of join key, the partitions of the two tables can be reorganized according to join key, so that join can be divided into many small join, making full use of cluster resources parallelization.

Broadcast Hash Join

As you know, in common database models (such as star model or snowflake model), tables are generally divided into two types: fact tables and dimension tables. Dimension tables generally refer to fixed, less changing tables, such as contacts, types of items, etc., with limited data. Fact tables generally record pipelining, such as sales lists, and usually expand over time.

Because the Join operation is to join the records with the same key value in two tables, in SparkSQL, the most direct way to Join the two tables is to first partition according to key, and then take out the records with the same key value in each partition to do the join operation. But this inevitably involves shuffle, and shuffle is a time-consuming operation in Spark, so we should design Spark applications as much as possible to avoid a large number of shuffle.

When dimension tables and fact tables perform Join operations, in order to avoid shuffle, we can distribute all the data of dimension tables of limited size to each node for use by fact tables. Executor stores all the data of the dimension table, sacrificing space to a certain extent, in exchange for a lot of time-consuming shuffle operations, which is called Broadcast Join in SparkSQL, as shown in the following figure:

Table B is a smaller table, black means it is broadcast to each executor node, and each partition of Table A fetches Table A's data through block manager. According to the Join Key of each record, the corresponding record in Table B is taken, and the operation is carried out according to Join Type. The process is relatively simple and will not be repeated.

The conditions of Broadcast Join are as follows:

The table to be broadcast needs to be less than the value configured by spark.sql.autoBroadcastJoinThreshold. The default is 10m (or hint with broadcast join added).

The base table cannot be broadcast, such as left outer join, only the right table can be broadcast.

It seems that broadcasting is an ideal solution, but does it have any disadvantages? And it's obvious. This scheme can only be used to broadcast smaller tables, otherwise the redundant transmission of data will be much greater than the cost of shuffle; in addition, the broadcast performance needs to be collect to the driver side, when frequent broadcasts appear, it is also a test for the memory of driver.

As shown in the following figure, broadcast hash join can be divided into two steps:

Broadcast phase: distribute small table broadcasts to all hosts where large tables are located. There are many broadcast algorithms, the simplest one is to send it to driver,driver and then distribute it to all executor;, or it is based on the P2P idea of bittorrete

Hash join phase: execute stand-alone hash join on each executor, small table mapping, large table trial

SparkSQL stipulates that the basic condition for broadcast hash join execution is that the broadcast small table must be less than the parameter spark.sql.autoBroadcastJoinThreshold, and the default is 10m.

Shuffle Hash Join

When the table on one side is relatively small, we choose to broadcast it to avoid shuffle and improve performance. However, because the broadcast table is first collect to the driver segment, and then redundant distributed to each executor, when the table is large, the adoption of broadcast join will put more pressure on the driver side and the executor side.

However, because Spark is a distributed computing engine, large quantities of data can be divided into n smaller data sets for parallel computing. The application of this idea to Join is Shuffle Hash Join. Using the principle of the same key and the same partition, the same rows of key in the two tables will be shuffle to the same partition. SparkSQL divides the join of the larger table into n partitions, and then Hash Join the data of the corresponding partitions in the two tables, which, to a certain extent, reduces the pressure on the driver broadcast side table and reduces the memory consumption of the executor side to get the whole broadcast table. The principle is as follows:

Shuffle Hash Join is divided into two steps:

The two tables are repartitioned according to join keys, namely shuffle, in order to divide the records with the same join keys value into the corresponding partition.

Join the data in the corresponding partition. Here, the small table partition is constructed as a hash table, and then matched according to the join keys value recorded in the large table partition.

The conditions of Shuffle Hash Join are as follows:

The average size of the partition does not exceed the value configured by spark.sql.autoBroadcastJoinThreshold. The default is 10m.

The base table cannot be broadcast, such as left outer join, only the right table can be broadcast.

The table on one side should be significantly smaller than the other side, and the small side will be broadcast (significantly smaller is defined as 3 times smaller, here is the empirical value)

We can see that in a certain size table, SparkSQL completes join by repartitioning the two tables and hash the partitions in the small table from the point of view of the combination of time and space. On the basis of maintaining a certain complexity, the memory pressure of driver and executor is reduced as much as possible, and the stability of computing is improved.

Under the condition of big data, if a table is very small, there is no doubt that the best choice to perform join operation is broadcast hash join, which is the most efficient. However, once the amount of data in the small table increases, the memory, bandwidth and other resources required for broadcasting will inevitably be too large, and broadcast hash join will no longer be the optimal solution. At this time, we can partition according to join key, and according to the same principle of the same inevitable partition of key, we can divide the large table join into many small table join, and make full use of the parallelization of cluster resources. As shown in the following figure, shuffle hash join can also be divided into two steps:

Shuffle phase: partition the two tables according to join key, redistribute the records of the same join key to the same node, and the data of the two tables will be redistributed to all nodes in the cluster. This process is called shuffle.

Hash join phase: the data on each partition node executes the stand-alone hash join algorithm separately.

Seeing here, it can be preliminarily concluded that if two small tables join can directly use stand-alone hash join;, if a large table join a very small table, you can choose the broadcast hash join algorithm; if it is a large table join and a small table, you can choose the shuffle hash join algorithm; what if two large tables are used for join?

Sort Merge Join

The two implementations described above are suitable for tables of a certain size, but when both tables are very large, it is clear that either will put a lot of pressure on computational memory. This is because when join, both use hash join, which completely loads the data on one side into memory, and uses hash code to connect records with equal join Keys values.

When both tables are very large, SparkSQL adopts an entirely new scheme to Join the tables, or Sort Merge Join. In this way, you do not need to load all the data on one side and then enter the hash join, but you need to sort the data before join, as shown in the following figure:

As you can see, first, the two tables are re-shuffle according to join keys to ensure that records with the same join Keys value will be divided into corresponding partitions. Sort the data in each partition, and then connect the records in the corresponding partition, as shown below:

Does it look familiar? It is also very simple, because both sequences are ordered, traverse from scratch, and output if you encounter the same key; if different, the small on the left will continue to take the left, and vice versa.

It can be seen that no matter how large the partition is, Sort Merge Join does not have to load all the data on one side into memory, but discards it right away, thus greatly improving the stability of sql join under a large amount of data.

SparkSQL uses a new algorithm for two large table join-sort-merge join. As shown in the following figure, the whole process is divided into three steps:

Shuffle phase: two large tables are repartitioned according to join key, and the data of the two tables are distributed to the entire cluster for distributed parallel processing.

Sort phase: sort the data of two tables of a single partition node respectively

Merge phase: the join operation is performed on the ordered data of two partition tables. The operation of join is very simple. You can iterate through two ordered sequences respectively. If you encounter the same join key, you will output merge. Otherwise, take the smaller side, as shown in the following figure:

After the above analysis, it is clear that each Join algorithm has its own applicable scenarios. When designing a data warehouse, it is best to avoid large table and large table join queries. SparkSQL can also increase the parameter spark.sql.autoBroadcastJoinThreshold according to memory resources and bandwidth resources, so that more join can actually be implemented as broadcast hash join.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report