Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the Join implementation methods of Spark SQL

2025-04-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly talks about "what are the Join implementation methods of Spark SQL", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what are the Join implementation methods of Spark SQL?"

Introduction to the overall process of SparkSQL

Before we elaborate on the implementation of Join, we first briefly introduce the overall flow of SparkSQL. Generally speaking, we have two ways to use SparkSQL, one is to write sql statements directly, which needs to be supported by Metabase, such as Hive, and the other is to write Spark applications through Dataset/DataFrame. As shown in the following figure, sql statements are parsed into query plans by syntax parsing (SQL AST), or we organize query plans into query plans through APIs provided by Dataset/DataFrame. Query plans are divided into two categories: logical plans and physical plans. This stage is usually called logical plans. After syntax analysis (Analyzer) and a series of query optimization (Optimizer), the optimized logical plans are finally mapped to physical plans and transformed into RDD execution.

Basic elements of Join

As shown in the following figure, Join roughly consists of three elements: Join mode, Join condition, and filter condition. The filter condition can also be placed in the Join condition through the AND statement.

Spark supports all types of Join, including:

Inner join

Left outer join

Right outer join

Full outer join

Left semi join

Left anti join

The implementations of these Join are described below.

Basic implementation process of Join

Overall, the basic implementation process of Join is shown in the following figure. Spark abstracts the two tables that participate in Join into a streaming traversal table (streamIter) and a lookup table (buildIter). Usually streamIter is a large table and buildIter is a small table. We do not have to worry about which table is streamIter and which table is buildIter. This spark will automatically help us complete according to the join statement.

In the actual calculation, spark will traverse based on streamIter, take out a record rowA in streamIter each time, calculate keyA according to Join condition, then go to buildIter according to the keyA to find all records that meet the Join condition (keyB==keyA) rowBs, and compare each record in rowBs with rowAjoin to get the join record, and finally get the final join record according to the filter condition.

From the above calculation process, it is not difficult to find that for each record from streamIter, it is necessary to look for matching records in buildIter, so buildIter must find a data structure with better performance. Spark provides three join implementations: sort merge join, broadcast join, and hash join.

Sort merge join implementation

In order to join two records together, you need to first put the records with the same key in the same partition, so generally speaking, you need to do a shuffle,map phase to determine the key of each record according to the join conditions, do shuffle write based on the key, and divide the records that may join together into the same partition, so that the records with the same key in the two tables can be pulled to the same partition for processing in the shuffle read phase. As we mentioned earlier, for buildIter, we must find data structures with better performance. We can usually think of the hash table, but for a larger table, it is impossible to put all the records into the hash table. In addition, we can also sort the buildIter first, and the search cost is also acceptable. We know that sorting is naturally supported in the spark shuffle stage, which is very easy to implement. The following is a schematic diagram of sort merge join.

In the shuffle read phase, merge sort the streamIter and buildIter respectively. When traversing the streamIter, each record is searched sequentially from the buildIter. Because the two tables are sorted, after each streamIter record is processed, the next record of the streamIter only needs to start at the end of the last search in the buildIter, so each search in the buildIter does not have to start over, on the whole. The lookup performance is still better.

Broadcast join implementation

In order to divide records with the same key into the same partition, we usually do shuffle, so if buildIter is a very small table, then there is no need to make a big effort to do shuffle, directly broadcast the buildIter to each compute node, and then put the buildIter into the hash table, as shown in the following figure.

As you can see from the above figure, it can be done directly in a map without doing shuffle, which is usually called map join. So the question is, when will it be implemented in broadcast join? We don't have to worry about this. Spark sql automatically helps us do it. When the estimated size of buildIter does not exceed the value set by parameter spark.sql.autoBroadcastJoinThreshold (default 10m), then broadcast join will be used automatically, otherwise sort merge join will be used.

Hash join implementation

In addition to the above two join implementations, spark also provides a hash join implementation that does not sort records during the shuffle read phase. Anyway, records with the same key from two tables will be in the same partition, but not sorted within the partition, and records from buildIter will be put into the hash table for easy lookup, as shown in the following figure.

It is not difficult to find that to put the records from buildIter into the hash table, the records from buildIter in each partition cannot be too large, otherwise they cannot be saved. By default, the implementation of hash join is turned off. If you want to use hash join, you must meet the following four conditions:

The overall estimated size of buildIter exceeds the value set by spark.sql.autoBroadcastJoinThreshold, that is, it does not meet the broadcast join condition.

Turn on the switch to try to use hash join, spark.sql.join.preferSortMergeJoin=false

The average size of each partition does not exceed the value set by spark.sql.autoBroadcastJoinThreshold, that is, the records of each partition from buildIter in the shuffle read phase should be able to be put into memory.

StreamIter is more than three times the size of buildIter.

Therefore, the conditions for using hash join are actually very stringent. In most practical scenarios, even if you can use hash join, using sort merge join is not much worse than hash join, so try to use hash.

Below we describe the implementation process of different Join methods respectively.

Inner join

Inner join must find records that meet the join conditions in the left and right tables. When we write sql statements or use DataFrame, we can ignore which is the left table and which is the right table. In the spark sql query optimization phase, spark will automatically set the large table to the left table, that is, streamIter, and the small table to the right table, that is, buildIter. In this way, the search for small tables is relatively better. The basic implementation process is shown in the following figure. In the lookup phase, if there is no record in the right table that meets the join condition, it is skipped.

Left outer join

Left outer join is based on the left table, looks for matching records in the right table, and returns a record in which all fields are null if the search fails. When we write sql statements or use DataFrmae, we usually have large tables on the left and small tables on the right. The basic implementation process is shown in the following figure.

Right outer join

Right outer join is based on the right table, looks for matching records in the left table, and returns a record in which all fields are null if the search fails. So, the right table is streamIter and the left table is buildIter. When we write sql statements or use DataFrame, we usually have the big table on the right and the small table on the left. The basic implementation process is shown in the following figure.

Full outer join

Full outer join is relatively complicated. Generally speaking, we have to do both left outer join and right outer join, but we can't simply left outer join, then right outer join, and finally union to get the final result, because there are two inner join results in the final result. Because you have to complete both left outer join and right outer join, full outer join is implemented only by sort merge join, and the left and right tables are used as both streamIter and buildIter, and the basic implementation process is shown in the following figure.

Since the left table and the right table have been sorted, first take out a record in the left table and the right table respectively, compare key, if key is equal, joinrowA and rowB, and update rowA and rowB to the next record in the left table and right table respectively; if keyAkeyB, it means that there is no record corresponding to the right table rowB in the left table, then joinnullRow and rowB, and then rowB updates to the next record in the right table. This loop traverses until all the records in the left and right tables are processed.

Left semi join

Left semi join is based on the left table and looks for matching records in the right table. If the search is successful, only the record on the left is returned, otherwise null is returned. The basic implementation process is shown in the following figure.

Left anti join

Left anti join, contrary to left semi join, looks for matching records in the right table based on the left table, and returns null if the search is successful, otherwise only the records on the left are returned. The basic implementation process is shown in the following figure.

Summary

Join is a very important grammatical feature in database query. In the database field, it can be said that "he who gets join wins the world". As a distributed data warehouse system, SparkSQL provides us with comprehensive join support, and silently does a lot of optimization in the internal implementation. Understanding the implementation of join will help us to understand the running track of our application more deeply.

At this point, I believe you have a deeper understanding of "what are the Join implementation methods of Spark SQL?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report