In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Analysis of Spark SQL Join principle 1. Summary of Join problems:
There are six types of Join: inner,leftouter,rightouter,fullouter,leftsemi and leftanti. for a single version of Join operation, the problem can be described as follows:
The IterA,IterB is two Iterator, and the corresponding Row in the two Iterator is merged according to rule A, and then the merged Row is filtered according to rule B.
For example, Inner_join, its merger rule An is: for each record in IterA, generate a key, and use the key to get the corresponding records from the Map collection of IterB and merge them; while rule B can be any filter condition, such as IterA and IterB any two fields for comparison operation.
For IterA and IterB, when we use key in iterA to match one by one in IterB, we call IterA BuildIter or hashedIter. That is, we stream through every record in streamedIter and go to hashedIter to find the corresponding matching record.
And in this search process, that is, the Build process, the result of each Build operation is a JoinRow (A Magi B), where JoinRow (A) comes from streamedIter,JoinRow (B) comes from BuildIter, at this time the process is BuildRight, and if JoinRow (B) comes from streamedIter,JoinRow (A) comes from BuildIter, it is BuildLeft.
A bit of a mouthful! So why distinguish between BuildLeft and BuildRight? For leftouter,rightouter,leftsemi,leftanti, their Build type is determined, that is, left* is BuildRight,right* and BuildLeft type, but for inner operations, both BuildLeft and BuildRight can be used, and there may be significant performance differences due to different choices:
BuildIter is also known as hashedIter, that is, you need to build BuildIter as a memory Hash to speed up the matching process of Build; at this time, if the size difference between BuildIter and streamedIter is large, obviously using the small one to build Hash, the memory footprint is much smaller!
To sum up: Join consists of the following parts:
Trait Join {val joinType: JoinType / / Join type val streamedPlan: SparkPlan / / for generating streamedIter val buildPlan: SparkPlan / / for generating hashedIter val buildSide: BuildSide / / BuildLeft or BuildRight val buildKeys: Seq [Expression] / / expression for generating buildKey from streamedIter val streamedKeys: Seq [Expression] / / expression for generating streamedKey from hashedIter val condition: Option [Expression] / / A pair of joinRow filters}
Note: for fullouter,IterA and IterB, both streamedIter and hashedIter are used, that is, first IterA=streamedIter,IterB=hashedIter is used for leftouter, then IterB=streamedIter,IterA=hashedIter is used for leftouter, and then the two results are merged.
1.1 implementation of several Join 1.1.1 InnerJoin uses each srow in streamIter to find a match from hashedIter
If the match is successful, multiple JoinRow will be built, otherwise empty will be returned
StreamIter.flatMap {srow = > val joinRow = new JoinedRow joinRow.withLeft (srow) val matches = hashedIter.get (buildKeys (srow)) if (matches! = null) {matches.map (joinRow.withRight (_)) .filter (condition)} else {Seq.empty} 1.1.2 LeftOutJoinleftIter is streamIter, and RightIter is hashedIter. You cannot use each srow in streamIter to find a match from hashedIter.
If the match is successful, multiple JoinRow will be built, otherwise the Build part of the returned JoinRow will be Null.
Val nullRow = new NullRow () streamIter.flatMap {srow = > val joinRow = new JoinedRow joinRow.withLeft (srow) val matches = hashedIter.get (buildKeys (srow)) if (matches! = null) {matches.map (joinRow.withRight (_)) .filter (condition)} else {Seq (joinRow.withRight (nullRow))} 1.1.3 RightOutJoinRightIter is streamIter, and LeftIter is hashedIter. You cannot use each srow in streamIter to find a match in hashedIter.
If the match is successful, multiple JoinRow will be built, otherwise the Build part of the returned JoinRow will be Null.
Val nullRow = new NullRow () streamIter.flatMap {srow = > val joinRow = new JoinedRow joinRow.withRight (srow) / / Note the difference with LeftOutJoin val matches = hashedIter.get (buildKeys (srow)) if (matches! = null) {matches.map (joinRow.withLeft (_)) .filter (condition)} else {Seq (joinRow.withLeft (nullRow)}} 1.1.4 LeftSemileftIter is streamIter, and RightIter is hashedIter, you cannot change the use of every srow in streamIter Find a match from hashedIter If the match is successful, srow is returned, otherwise empty is returned.
Instead of returning JoinRow, it returns srow
StreamIter.filter {srow = > val matches = hashedIter.get (buildKeys (srow)) if (matches = = null) {false / / No match found} else {if (condition.isEmpty = = false) {/ / need to judge the joinrow after `imaginary `val joinRow = new JoinedRow joinRow.withLeft (srow)! Matches.map (joinRow.withLeft (_)) .filter (condition) .isEmpty} else {true}}
Logically speaking, LeftSemi is In judgment.
1.1.5 LeftAntileftIter is streamIter, and RightIter is hashedIter. You cannot use every srow in streamIter to find a match from hashedIter; its matching logic is that LeftSemi is basically the opposite, that is, it is equivalent to No In judgment. If the match is not successful, srow is returned, otherwise empty is returned.
Instead of returning JoinRow, it returns srow
StreamIter.filter {srow = > val matches = hashedIter.get (buildKeys (srow)) if (matches = = null) {true / / No match found} else {if (condition.isEmpty = = false) {/ / need to judge joinrow after `imaginary `val joinRow = new JoinedRow joinRow.withLeft (srow) matches.map (joinRow.withLeft ( (_)) .filter (condition) .isEmpty} else {false} 1.2 HashJoin and SortJoin
The Join described above requires that the BuildIter be built into a hashedIter in memory to speed up the matching process, so we also call this Join HashJoin. But building a Hash table takes up a lot of memory.
So here's the question: what if our Iter is too big to build a Hash table? In distributed Join computing, the Join process occurs in the Shuffle phase. If there is a data offset in the Key of a data set, it is easy to have a BuildIter that exceeds the memory size and cannot complete the establishment of the Hash table, which leads to the failure of HashJoin.
In the HashJoin process, hashedIter is established for BuildIter to speed up the matching process. In addition to establishing the Hash table, matching lookup sorting streamedIter and BuildIter is also an accelerated matching process, which is what we call sortJoin here.
Doesn't sorting also need memory? Yes, first of all, sorting takes up much less memory than setting up a hash table, and secondly, sorting can Spill part of the data to disk if there is not enough memory, while Hash is full memory, which will cause the entire Shuffle to fail if there is not enough memory.
Let's take the SortJoin implementation of InnerJoin as an example to show the difference between it and HashJoin:
Both streamIter and BuildIter need to be ordered.
Use each srow in the streamIter to search sequentially from the BuildIter, because both sides are ordered, so the search cost is very small.
Val buildIndex = 0streamIter.flatMap {srow = > val joinRow = new JoinedRow joinRow.withLeft (srow) / / order lookup val matches = BuildIter.search (buildKeys (srow), buildIndex) if (matches! = null) {matches.map (joinRow.withRight (_)) .filter (condition) buildIndex + = matches.length} else {Seq.empty}}
For FullOuterJoin, if you use HashJoin to implement it, it is costly, and you need to build a bidirectional Hash table. Based on SortJoin, its cost is not much different from that of other Join, so `FullOuter is implemented based on SortJon by default.
2. Join implementation in Spark
Spark provides a distributed implementation for Join, but Join operations are essentially standalone. If you want to distribute Join,Spark to two data sets, you will first Exchange the two data sets, that is, ShuffleMap operation, divide the same Key data into a partition, and then use the HashJoin/SortJoin stand-alone algorithm to Join the two partitions in the ShuffleFetch process.
In addition, if the size of the entire dataset (not an iter) on the Build side is small, you can Broadcast it, thus saving Shuffle overhead.
So Spark supports three Join algorithms of ShuffledHashJoinExec,SortMergeJoinExec,BroadcastHashJoinExec, so how does it choose?
If build-dataset supports Broadcastable, and its size is less than spark.sql.autoBroadcastJoinThreshold, and the default is 10m, then priority is given to BroadcastHashJoinExec. If dataset supports Sort and spark.sql.join.preferSortMergeJoin is True, then SortMergeJoinExec is preferred. If dataset does not support Sort, then ShuffledHashJoinExec can only be selected. If Join supports both BuildRight and BuildLeft, then Hash with a small amount of data is preferred based on the data size on both sides.
This piece of logic is described in org.apache.spark.sql.execution.JoinSelection. Ps:Spark also supports Without joining keys's Join, but it is not within the scope of this discussion.
BroadcastHashJoinExec
Val p = spark.read.parquet ("/ Users/p.parquet") val p1 = spark.read.parquet ("/ Users/p1.parquet") p.joinWith (p1, p ("to_module") = = p1 ("to_module"), "inner") since both p and p1 are small, it will default to BroadcastHashJoinExec== Physical Plan = = BroadcastHashJoin [_ 1#269.to_module], [_ 2#270.to_module], Inner, BuildRight:-Project p:-Project p1
SortMergeJoinExec
Val p = spark.read.parquet ("/ Users/p.parquet") val p1 = spark.read.parquet ("/ Users/p1.parquet") p.joinWith (p1, p ("to_module") = = p1 ("to_module"), "fullouter") fullouterJoin does not support Broadcast and ShuffledHashJoinExec, so it is ShuffledHashJoinExec== Physical Plan = = SortMergeJoin [_ 1#273.to_module], [_ 2#274.to_module], FullOuter:-Project p:-Project p1
Because ShuffledHashJoinExec will not be chosen in general, its conditions are more harsh.
/ / first of all, Broadcast cannot be performed! Private def canBroadcast (plan: LogicalPlan): Boolean = {plan.statistics.isBroadcastable | | plan.statistics.sizeInBytes
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.