In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "what are the JOIN strategies of Spark". In the daily operation, I believe many people have doubts about the JOIN strategy of Spark. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "what are the JOIN strategies of Spark?" Next, please follow the editor to study!
JOIN operation is a very common data processing operation. As a unified big data processing engine, Spark provides a very rich JOIN scenario.
The size of the dataset of factors that affect the JOIN operation
The size of the dataset participating in JOIN directly affects the efficiency of Join operations. Similarly, it will also affect the choice of JOIN mechanism and the execution efficiency of JOIN.
Conditions of JOIN
The conditions of JOIN involve logical comparisons between fields. According to the conditions of JOIN, JOIN can be divided into two categories: equivalent connection and non-equivalent connection. An equivalent connection involves one or more equality conditions that need to be met at the same time. Each equivalence condition is applied between the attributes of the two input datasets. When other operators are used (the operational connector is not =), it is called a non-equivalent join.
Type of JOIN
After the join condition is applied between the records of the input dataset, the JOIN type affects the result of the JOIN operation. There are the following main types of JOIN:
Inner Join: outputs only records that match the connection conditions from the input dataset. External connection (Outer Join): it is divided into left outer link, right outer link and full outer connection. Semi Join: the right table is only used to filter the data in the left table and does not appear in the result set. Cross Join: the cross join returns all rows in the left table, and each row in the left table is combined with all rows in the right table. Cross joins are also called Cartesian products. Five policies implemented by JOIN in Spark
Spark provides five JOIN policies to perform specific JOIN operations. The five JOIN policies are as follows:
Introduction to Shuffle Hash JoinBroadcast Hash JoinSort Merge JoinCartesian JoinBroadcast Nested Loop JoinShuffle Hash Join
When the amount of table data you want to JOIN is large, you can choose Shuffle Hash Join. This allows the large table to be repartitioned according to JOIN's key, ensuring that each of the same JOIN key is sent to the same partition. The following figure is shown:
As shown in the figure above, the basic steps of Shuffle Hash Join include the following two main points:
First of all, for the two tables participating in JOIN, repartition according to join key, the process will involve Shuffle, the purpose is to send data of the same join key to the same partition to facilitate join within the partition. Second, for each partition after Shuffle, the partition data of the small table is built into a Hash table, and then matched with the partition data record of the large table according to the join key. Conditions and features only support equivalent joins, and join key does not need sorting to support all join types except all external joins (full outer joins). It is a memory-intensive operation to build Hash map on a small table. If the data on one side of the Hash table is relatively large, it may cause OOM to set the parameter spark.sql.join.prefersortmergeJoin to false (default is true) Broadcast Hash Join introduction
Also known as Map-side JOIN. When a table is small, we usually choose Broadcast Hash Join, which avoids the overhead of Shuffle and improves performance. For example, when JOIN the fact table and the dimension table, because the data of the dimension table is usually very small, you can use Broadcast Hash Join to Broadcast the dimension table. This avoids the Shuffle of the data (Shuffle operations are time-consuming in Spark), thus improving the efficiency of JOIN. Before Broadcast Join, Spark needs to send the data on the Executor side to the Driver side, and then the Driver side broadcasts the data to the Executor side. If we need to broadcast more data, it will cause OOM on the Driver side. The details are as follows:
Broadcast Hash Join mainly consists of two phases:
Broadcast phase: small tables are cached in the Hash Join phase of executor: the execution of Hash Join conditions and features in each executor only supports equivalent joins, and join key does not need to sort to support all join types of Broadcast Hash Join except all external joins (full outer joins). Compared with other JOIN mechanisms, it is more efficient. However, Broadcast Hash Join is a network-intensive operation (data redundant transmission). In addition, data needs to be cached on the driver side, so when the amount of data in a small table is large, it will occur in the case of OOM. The amount of data in the broadcast small table is less than spark.sql.autoBroadcasting join Thresold value. The default is 10MB (10485760). The size threshold of the broadcast table cannot exceed 8GB dataSize 2.4 source code: BroadcastExchangeExec.scalalongMetric ("dataSize") + = dataSize
If (dataSize > = (8L Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
Between Cartesian and Broadcast Nested Loop Join, if it is an internal connection or a non-equivalent connection, the Broadcast Nested Loop policy is preferred, and when a non-equivalent connection and a table can be broadcast, Cartesian Join is selected.
Conditions and features support equivalent and non-equivalent connections support all JOIN types, the main optimization points are as follows: broadcast left table when right outer connection, right table when left outer connection, broadcast right table when inner join, broadcast left and right tables how to choose equivalent connection of JOIN policy with join prompt (hints), 1.Broadcast Hint in the following order: if the join type supports, select broadcast hash join
2.Sort merge hint: if join key is sorted, select sort-merge join
3.shuffle hash hint: select shuffle hash join if the join type is supported
4.shuffle replicate NL hint: if it is an internal connection, choose Cartesian product
If there is no join prompt (hints), compare the following rules one by one
1. If the join type is supported and one of the tables can be broadcast (spark.sql.autoBroadcastJoinThresold value, default is 10MB), select broadcast hash join
two。 If the parameter spark.sql.join.preferSortMergeJoin is set to false and a table is small enough (you can build a hash map), select shuffle hash join
3. If join keys is sorted, select sort-merge join
4. Select cartesian join if it is an internal connection
5. If OOM may occur or if there is no alternative enforcement policy, you will eventually choose broadcast nested loop join
For non-equivalent connections, there is a join prompt (hints), in the following order
1.broadcast hint:
Select broadcast nested loop join.
2.shuffle replicate NL hint: select cartesian product join if it is an internal connection
Without a join prompt (hints), compare the following rules one by one
1. If a table is small enough (can be broadcast), select broadcast nested loop join
two。 If it is an internal connection, select cartesian product join
3. If OOM may occur or if there is no alternative enforcement policy, you will eventually choose broadcast nested loop join
Source code snippet object JoinSelection extends Strategy selected by join policy
With PredicateHelper
With JoinSelectionHelper {
Def apply (plan: LogicalPlan): Seq [SparkPlan] = plan match {
Case j @ ExtractEquiJoinKeys (joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) = >
Def createBroadcastHashJoin (onlyLookingAtHint: Boolean) = {
GetBroadcastBuildSide (left, right, joinType, hint, onlyLookingAtHint, conf). Map {
BuildSide = >
Seq (joins.BroadcastHashJoinExec (
LeftKeys
RightKeys
JoinType
BuildSide
NonEquiCond
PlanLater (left)
PlanLater (right)
}
}
Def createShuffleHashJoin (onlyLookingAtHint: Boolean) = {
GetShuffleHashJoinBuildSide (left, right, joinType, hint, onlyLookingAtHint, conf). Map {
BuildSide = >
Seq (joins.ShuffledHashJoinExec (
LeftKeys
RightKeys
JoinType
BuildSide
NonEquiCond
PlanLater (left)
PlanLater (right)
}
}
Def createSortMergeJoin () = {
If (RowOrdering.isOrderable (leftKeys)) {
Some (Seq (joins.SortMergeJoinExec)
LeftKeys, rightKeys, joinType, nonEquiCond, planLater (left), planLater (right)
} else {
None
}
}
Def createCartesianProduct () = {
If (joinType.isInstanceOf [InnerLike]) {
Some (Seq (joins.CartesianProductExec (planLater (left), planLater (right), j.condition)
} else {
None
}
}
Def createJoinWithoutHint () = {
CreateBroadcastHashJoin (false)
.orElse {
If (! conf.preferSortMergeJoin) {
CreateShuffleHashJoin (false)
} else {
None
}
}
.orElse (createSortMergeJoin ())
.orElse (createCartesianProduct ())
.getOrElse {
Val buildSide = getSmallerSide (left, right)
Seq (joins.BroadcastNestedLoopJoinExec (
PlanLater (left), planLater (right), buildSide, joinType, nonEquiCond))
}
}
CreateBroadcastHashJoin (true)
.orElse {if (hintToSortMergeJoin (hint)) createSortMergeJoin () else None}
.orElse (createShuffleHashJoin (true))
.orElse {if (hintToShuffleReplicateNL (hint)) createCartesianProduct () else None}
.getOrElse (createJoinWithoutHint ())
If (canBuildLeft (joinType)) BuildLeft else BuildRight
}
Def createBroadcastNLJoin (buildLeft: Boolean, buildRight: Boolean) = {
Val maybeBuildSide = if (buildLeft & & buildRight) {
Some (desiredBuildSide)
} else if (buildLeft) {
Some (BuildLeft)
} else if (buildRight) {
Some (BuildRight)
} else {
None
}
MaybeBuildSide.map {buildSide = >
Seq (joins.BroadcastNestedLoopJoinExec (
PlanLater (left), planLater (right), buildSide, joinType, condition))
}
}
Def createCartesianProduct () = {
If (joinType.isInstanceOf [InnerLike]) {
Some (Seq (joins.CartesianProductExec (planLater (left), planLater (right), condition)
} else {
None
}
}
Def createJoinWithoutHint () = {
CreateBroadcastNLJoin (canBroadcastBySize (left, conf), canBroadcastBySize (right, conf))
.orElse (createCartesianProduct ())
.getOrElse {
Seq (joins.BroadcastNestedLoopJoinExec (
PlanLater (left), planLater (right), desiredBuildSide, joinType, condition))
}
}
CreateBroadcastNLJoin (hintToBroadcastLeft (hint), hintToBroadcastRight (hint))
.orElse {if (hintToShuffleReplicateNL (hint)) createCartesianProduct () else None}
.getOrElse (createJoinWithoutHint ())
Case _ = > Nil
}
At this point, the study of "what are the JOIN strategies of Spark" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.