Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the source code of spark join

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

In this issue, the editor will bring you a source code analysis on how to carry out spark join. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

Import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf, SparkContext} object JoinDemo {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetAppName (this.getClass.getCanonicalName.init) .setMaster ("local [*]") val sc = new SparkContext (conf) sc.setLogLevel ("WARN") val random = scala.util.Random val col1 = Range (1,50) .map (idx = > (random.nextInt (10)) S "user$idx")) val col2 = Array ((0, "BJ"), (1, "SH"), (2, "GZ"), (3, "SZ"), (4, "TJ"), (5, "CQ"), (6, "HZ"), (7, "NJ"), (8, "WH"), (0, "CD")) val rdd1 [(Int ") String)] = sc.makeRDD (col1) val rdd2: RDD [(Int, String)] = sc.makeRDD (col2) val rdd3: RDD [(Int, (String, String))] = rdd1.join (rdd2) println (rdd3.dependencies) val rdd4: RDD [(Int, (String, String)] = rdd1.partitionBy (newHashPartitioner (3)) .join (rdd2.partitionBy (newHashPartitioner (3)) println (rdd4.dependencies) sc.stop ()} 1. Two print statements: List (org.apache.spark.OneToOneDependency@63acf8f6) List (org.apache.spark.OneToOneDependency@d9a498) corresponding dependencies: rdd3 corresponds to wide dependencies, rdd4 corresponds to narrow dependencies: 1) as can be seen from the DAG diagram with reference to webUI, the first join and the previous one are clearly divided into separate Satge. We can see that this is a wide dependence. The second join,partitionBy followed by join is not divided into a separate stage, which shows that it is a narrow dependency.

Join of rdd3

Join of rdd4

2) Code parsing: a. The first is the default join method, where a default divider is used

/ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this`and * (k, v2) is in `other`. Performs a hash join across the cluster. * / def join [W] (other: RDD [(K, W)]): RDD [(K, (V, W))] = self.withScope {join (other, defaultPartitioner (self, other))}

b. The default partition returns a HashPartitioner with the total number of computer core partitions for the first join. The second join will return the HashPartitioner we set (number of partitions 3)

Def defaultPartitioner (rdd: RDD [_] Others: RDD [_] *): Partitioner = {val rdds = (Seq (rdd) + + others) val hasPartitioner = rdds.filter (_ .partitioner.parties (_ .numPartitions > 0)) val hasMaxPartitioner: option [RDD [_]] = if (hasPartitioner.nonEmpty) {Some (hasPartitioner.maxBy (_ .partitions.length)} else {None} val defaultNumPartitions = if (rdd.context.conf.contains ("spark.default.parallelism")) {rdd.context.defaultParallelism} else {rdds.map (_ .partitions.length) .max} / / If the existing max partitioner is an eligible one Or its partitions number is larger / / than the default number of partitions, use the existing partitioner. If (hasMaxPartitioner.nonEmpty & & (isEligiblePartitioner (hasMaxPartitioner.get, rdds) | | defaultNumPartitions

< hasMaxPartitioner.get.getNumPartitions)) { hasMaxPartitioner.get.partitioner.get } else { new HashPartitioner(defaultNumPartitions) } } c.走到了实际执行的join方法,里面flatMapValues是一个窄依赖,所以说如果有宽依赖应该在cogroup算子中 /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair =>

For (v if (rdd.partitioner = = Some (part)) {logDebug ("Adding one-to-one dependency with" + rdd) new OneToOneDependency (rdd)} else {logDebug ("Adding shuffle dependency with" + rdd) new ShuffleDependency [K, Any, CoGroupCombiner] (rdd.asInstanceOf [RDD [_)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report