In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the usage of Join in Spark". In daily operation, I believe many people have doubts about the usage of Join in Spark. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about the usage of Join in Spark! Next, please follow the editor to study!
In the process of data analysis and processing, we often use Join operations to associate two data sets. As a general analysis engine, Spark can support a variety of Join application scenarios.
The input of the Join operation is two datasets, An and B. each record in dataset An is compared with each record in dataset B. when a qualified record is found, a new record is returned. The fields in the new record can only come from An or B, or can be taken from An and B respectively. Therefore, the record after Join can represent the combination of the records in the two datasets.
Three factors affecting Spark Join operation
Specific to the execution of Join operations in Spark, there are three major factors: the size of the input data set, the Join condition, and the type of Join.
Enter the size of the dataset
The size of the input dataset directly affects the efficiency and reliability of the Join operation, not only the absolute size, but also the relative size between the datasets.
Join condition
The Join condition is usually a logical comparison of fields in two datasets, which can be divided into equivalent Join and unequal Join.
An equivalent Join can contain an equality condition or multiple equality conditions that need to be satisfied at the same time, such as:
An equality condition: A.x = = B.x
Multiple equality conditions: A.x = = B.x and A.y = = B.y
Note: X and y are fields in datasets An and B.
Unequal Join uses unequal conditions or equal conditions that cannot be satisfied at the same time, such as:
Unequal condition: A.x
< B.x 不能同时满足的相等条件:A.x == B.x or A.y == B.y Join类型 Join类型影响Join操作的输出,大致包括以下几类: Inner Join:Inner Join只输出匹配的记录(满足Join条件),记录来自两个数据集 Outer Join:Outer Join除了输出匹配的记录,也输出未匹配的记录,根据如何输出未匹配的记录,outer Join可以进一步分为left out join、right out join和full outer join,记录来自两个数据集 Semi Join:Semi Join输出的记录只来自一个数据集,要么是匹配的记录,要么是未匹配的记录。如果输出的是未匹配的记录,也叫做Anti Join Cross Join:Cross Join输出两个数据集中所有记录可能的组合,例如,A集合中有m条记录,B集合中有n条记录,则结果为m*n条记录,Cross Join又称为笛卡尔积。 根据上面的三个因素,Spark会选择合适的执行机制来完成Join操作。 Spark Join的执行机制 Spark提供了五种执行Join操作的机制,分别是: Shuffle Hash Join Broadcast Hash Join Sort Merge Join Cartesian Join Broadcast Nested Join Hash Join Broadcast Hash Join和Shuffle Hash Join都基于Hash Join,Hash Join是单机上的Join操作。想象一道LeetCode算法题,数据量分别为m和n的两个数组,怎么找到两个数组的公共元素?第一种方法:对两个数组进行嵌套循环的遍历,发现相等元素则输出。第二种方法:用空间换时间,将其中一个数组转化成集合(Python的set或者Java的HashSet,实现都基于哈希表),然后遍历第二个数组中的每一个元素,判断是否包含在第一个集合中。Hash Join和第二种方法类似,将较小的数据集分区构造成哈希表,用Join的key作为哈希表的key,key所对应的记录作为哈希表的value,然后遍历较大的数据集分区,在哈希表中寻找对应的key,找到两个分区key相同的记录将其输出。因为使用了哈希表,所以叫做Hash Join。 根据进行Join的两个数据集的大小关系,Spark支持两种Hash Join。 Broadcast Hash Join 当其中一个数据集足够小时,采用Broadcast Hash Join,较小的数据集会被广播到所有Spark的executor上,并转化为一个Hash Table,之后较大数据集的各个分区会在各个executor上与Hash Table进行本地的Join,各分区Join的结果合并为最终结果。 Broadcast Hash Join 没有Shuffle阶段、效率最高。但为了保证可靠性,executor必须有足够的内存能放得下被广播的数据集,所以当进两个数据集的大小都超过一个可配置的阈值之后,Spark不会采用这种Join。控制这个阈值的参数为spark.sql.autoBroadcastJoinThreshold,最新版本(3.0.1)中默认值为10M。 Shuffle Hash Join 当两个数据集都小于可以使用Broadcast Hash Join的阈值时,采用Shuffle Join,先对两个数据集进行Shuffle,Shuffle是意思是根据key的哈希值,对两个数据集进行重新分区,使得两个数据集中key的哈希值相同的记录会被分配到同一个executor上,此时在每个executor上的分区都足够小,各个executor分别执行Hash Join即可。 Shuffle操作会带来大量的网络IO开销,因此效率会受到影响。同时,在executor的内存使用方面,如果executor的数量足够多,每个分区处理的数据量可以控制到比较小。 Sort Merge Join Sort Merge Join和Shuffle Hash Join类似,会有一个Shuffle阶段,将key相同的记录重分配同一个executor上,不同的是,在每个executor上,不再构造哈希表,而是对两个分区进行排序,然后用两个下标同时遍历两个分区,如果两个下标指向的记录key相同,则输出这两条记录,否则移动key较小的下标。 Sort Merge Join也有Shuffle阶段,因此效率同样不如Broadcast Hash Join。在内存使用方面,因为不需要构造哈希表,需要的内存比Hash Join要少。 Cartesian Join Cartesian Join机制专门用来实现cross join,结果的分区数等于输入数据集的分区数之积,结果中每一个分区的数据对应一个输入数据集的一个分区和另外一个输入数据集的一个分区。 Cartesian Join会产生非常多的分区,但如果要进行cross join,别无选择。 Broadcast Nested Loop Join Broadcast Nested Join将一个输入数据集广播到每个executor上,然后在各个executor上,另一个数据集的分区会和第一个数据集使用嵌套循环的方式进行Join输出结果。 Broadcast Nested Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。 Spark如何选择Join机制 Spark根据以下的因素选择实际执行Join的机制: 参数配置 hint参数 输入数据集大小 Join类型 Join条件 其中,hint参数是一种在join时手动指定join机制的方法,例如: df1.hint("broadcast").join(df2, ...) 下面介绍在什么情况下使用何种Join机制。 何时使用Broadcast Hash Join 必需条件: 只用于等值Join 不能用于Full Outer Join 以下条件需要满足一个: 左边的数据集使用了broadcast hint,Join类型是Right Outer,Right Semi或Inner 没使用hint,但左边的数据集小于spark.sql.autoBroadcastJoinThreshold参数,Join类型是Right Outer,Right Semi或Inner 右边的数据集使用了broadcast hint,Join类型是Left Outer,Left Semi或Inner 没使用hint,但右边的数据集小于spark.sql.autoBroadcastJoinThreshold参数,Join类型是Left Outer,Left Semi或Inner 两个数据集都使用了broadcast hint,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner 没使用hint,但两个数据集都小于spark.sql.autoBroadcastJoinThreshold参数,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner 何时使用Shuffle Hash Join 必需条件: 只用于等值Join 不能用于Full Outer Join spark.sql.join.prefersortmergeJoin 参数默认值为true,设置为false 以下条件需要满足一个: 左边的数据集使用了shuffle_hash hint,Join类型是Right Outer,Right Semi或Inner 没使用hint,但左边的数据集比右边的数据集显著小,Join类型是Right Outer,Right Semi或Inner 右边的数据集使用了shuffle_hash hint,Join类型是Left Outer,Left Semi或Inner 没使用hint,但右边的数据集比左边的数据集显著小,Join类型是Left Outer,Left Semi或Inner 两边的数据集都使用了shuffle_hash hint,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner 没使用hint,两个数据集都比较小,Join类型是Left Outer,Left Semi,Right Outer,Right Semi或Inner 何时使用Sort Merge Join 必需条件: 只用于等值Join Join条件中的key是可排序的 spark.sql.join.prefersortmergeJoin 参数默认值为true,设置为true 以下条件需要满足一个: 有一个数据集使用了merge hint,Join类型任意 没有使用merge hint,Join类型任意 何时使用Cartesian Join 必需条件: Cross Join 以下条件需要满足一个: 使用了shuffle_replicate_nl hint,是等值或不等值Join均可 没有使用hint,等值或不等值Join均可 何时Broadcast Nested Loop Join Broadcast Nested Loop Join是默认的Join机制,当没有选用其他Join机制被选择时,用它来进行任意条件任意类型的Join。 当有多种Join机制可用时,选择的优先级为Broadcast Hash Join >Sort Merge Join > Shuffle Hash Join > Cartesian Join.
When performing Inner Join and unequal Join, if there is a dataset that can be broadcast, Broadcast Nested Loop Join has a higher priority than Cartesian Join.
At this point, the study of "the use of Join in Spark" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.