In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Big data development from the implementation of cogroup to see whether join is a wide or narrow dependency, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can gain something.
Let's look at the join implementation of cogroup from the source code point of view.
1. Analyze the following code import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf, SparkContext} object JoinDemo {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName (this.getClass.getCanonicalName.init) .setMaster ("local [*]") val sc = new SparkContext (conf) sc.setLogLevel ("WARN") val random = scala.util.Random val col1 = Range (1,50) .map (idx = > (random.nextInt (10)) S "user$idx")) val col2 = Array ((0, "BJ"), (1, "SH"), (2, "GZ"), (3, "SZ"), (4, "TJ"), (5, "CQ"), (6, "HZ"), (7, "NJ"), (8, "WH"), (0, "CD")) val rdd1 [(Int ") String)] = sc.makeRDD (col1) val rdd2: RDD [(Int, String)] = sc.makeRDD (col2) val rdd3: RDD [(Int, (String, String))] = rdd1.join (rdd2) println (rdd3.dependencies) val rdd4: RDD [(Int, (String, String)] = rdd1.partitionBy (new HashPartitioner (3)) .join (rdd2.partitionBy (new HashPartitioner (3)) println (rdd4.dependencies) sc.stop ()}}
Analyze the above code, what is the print result, whether this join is a wide or narrow dependency, and why?
two。 View the operation from the ui interface of spark
With regard to the relationship between stage partition and wide and narrow dependencies, we can know the correspondence between stage and wide dependencies from 2.1.3 how to distinguish wide dependencies from narrow dependencies, so we can distinguish wide dependencies from the dependency graphs of rdd3 and rdd4's stage. We can see that join partitions generate wide dependencies in addition to the new stage, and rdd1.partitionBy (new HashPartitioner (3)) .join (new HashPartitioner (3)) is another dependency graph. So you can see that partitionBy has not divided a new stage after that, so it is a narrow dependency.
Source code implementation of 3.join
I know the conclusion before, which is seen in the ui diagram. Now let's see how the join source code is implemented (based on spark2.4.5).
Enter the entry method first, in which the method of withScope can be understood as a decorator, in order to display more information in sparkUI. So you can achieve your goal by wrapping all the created RDD methods and using RDDOperationScope to record the operation history and associations of the RDD.
/ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this`and * (k, v2) is in `other`. Performs a hash join across the cluster. * / def join [W] (other: RDD [(K, W)]): RDD [(K, (V, W))] = self.withScope {join (other, defaultPartitioner (self, other))}
Let's take a look at the implementation of defaultPartitioner, which aims to take a larger one between the default value and the divider and return the divider.
Def defaultPartitioner (rdd: RDD [_], others: RDD [_] *): Partitioner = {val rdds = (Seq (rdd) + + others) / / determine whether the divider partitioner val hasPartitioner = rdds.filter (_ .partitioner.partition (_ .numPartitions > 0)) / / if partitioner is set Then take the maximum number of partitions for setting partitioner val hasMaxPartitioner: option [RDD [_]] = if (hasPartitioner.nonEmpty) {Some (hasPartitioner.maxBy (_ .partitions.length))} else {None} / / to determine whether spark.default.parallelism is set. If set, take spark.default.parallelism val defaultNumPartitions = if (rdd.context.conf.contains ("spark.default.parallelism")) {rdd.context.defaultParallelism} else {rdds.map (_ .partitions.length). Max} / / If the existing max partitioner is an eligible one, or its partitions number is larger / / than the default number of partitions, use the existing partitioner. / / mainly determine whether the default partitioner is set for the incoming rdd and whether the set partitioner is legal / / or whether the number of partitioner partitions set is greater than the default number of partitions / / if the condition is true, take the maximum number of partitions passed in rdd, otherwise the default number of partitions if (hasMaxPartitioner.nonEmpty & & (isEligiblePartitioner (hasMaxPartitioner.get, rdds) | | defaultNumPartitions)
< hasMaxPartitioner.get.getNumPartitions)) { hasMaxPartitioner.get.partitioner.get } else { new HashPartitioner(defaultNumPartitions) } } private def isEligiblePartitioner( hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { val maxPartitions = rdds.map(_.partitions.length).max log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1 }} 再进入join的重载方法,里面有个new CoGroupedRDD[K](Seq(self, other), partitioner) def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair =>For (v if (rdd.partitioner = = Some (part)) {logDebug ("Adding one-to-one dependency with" + rdd) new OneToOneDependency (rdd)} else {logDebug ("Adding shuffle dependency with" + rdd) new ShuffleDependency [K, Any, CoGroupCombiner] (rdd.asInstanceOf [RDD [_)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.