In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
1.map and mapPartitions
Map's input transformation function applies to all elements in RDD, while mapPartitions applies to all partitions. The main difference from mapPartitions lies in the call granularity.
MapPartition can be understood backwards, partition first, and then map each partition.
Applicable scenarios:
If you need to create additional objects frequently during the mapping process, using mapPartitions is much more efficient than map.
Val numbers: RDD [Int] = sc.parallelize (seqs 3) / / map numbers.map (x = > {println ("AAA") / / print 6 times x * 3}). Collect (). Foreach (println (_)) / * traversal partition (3) * / numbers.mapPartitions (par = > {println ("aaa") / / print par.map 3 times (p = > p * 3)}. Collect (). Foreach (println (_)) 2.filter
Filter operation, which satisfies that all elements in the RDD whose function function is true in filter form a new dataset
Val seqs = Seq (1, println, 2, 3, 4, 5, 5, 6) / / 4, 5, 5, 5, filter (x = > x > 3). Foreach (println (_)) 3.flatMap
Map is a functional mapping of RDD elements one by one to another RDD
The flatMap operation is to apply the function to every element in the RDD, setting the
Returns a new RDD made up of all the contents of the iterator.
The difference between flatMap and Map is that map is "mapped", while flatMap is "mapped first, then flattened".
Val seqs = Array ("aaa AAA", "bbb BBB", "ccc CCC", "ddd DDD") val numbers = sc.parallelize (seqs) scala > numbers.map (x = > x.split (")). Collect () res1: Array [Array [string]] = Array (Array (aaa, AAA), Array (bbb, BBB), Array (ccc, CCC), Array (ddd, DDD) scala > numbers.flatMap (x = > x.split (")). Collect () res2: Array [String] = Array (aaa, AAA, AAA CCC, ddd, DDD) 4.mapPartitionsWithIndex
Similar to mapPartitions, but you need to provide an integer value that represents the partition index value as a parameter, so function must be of type (int, Iterator\) = > Iterator\.
/ / elements of each partition in the statistical key-value pair val rdd = sc.parallelize (List (1)), (1)), (2), (2), (2), (3), (3) 6), (4) 7), (4) 8), (5) 9), (5) 10), 3) def mapPartIndexFunc (i1 List IntJ: Iterator [(Int,Int)]): Iterator [(Int, (Int,Int))] = {var res = List [(Int, (Int))] Int))] () while (iter.hasNext) {var next = iter.next () res=res.::} res.iterator} val mapPartIndexRDD = rdd.mapPartitionsWithIndex (mapPartIndexFunc) mapPartIndexRDD.foreach (println (_)) / / result (0, (1) (0, (1)) (0, (2)) (1, (2)) (1, (3) (1, (3) (1, (3) (2) (2) (4, (4) (8)) (2, (5), (5) 5.sample (withReplacement,fraction,seed)
The number of data randomly sampled from the specified random seeds is fraction. WithReplacement indicates whether the extracted data is put back, true is the sample with put back, and false is the sample without return.
Scala > val rdd = sc.parallelize (1 to 10) rdd: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [9] at parallelize at: 24scala > rdd.sample (true,0.57,5). Collectres10: Array [Int] = Array (8,8,8,9) 6.union (Union)
Merge, which only combines rdd1 and rdd2 logically, and does not merge data for transmission.
Scala > var rdd1 = sc.parallelize (List ("aa", "aa", "bb", "cc", "dd")); rdd1: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [4] at parallelize at: 24scala > var rdd2 = sc.parallelize (List ("aa", "dd", "ff"); rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [5] at parallelize at: 24scala > rdd1.union (rdd2). Res3: Array [String] = Array (aa, aa, bb, cc, dd, aa, dd, ff) 7.intersection
RDD1.intersection (RDD2), which returns the intersection of two RDD and removes duplicates
Intersection needs mixed washing data, which is a waste of performance.
Scala > var RDD1 = sc.parallelize (List ("aa", "aa", "bb", "cc", "dd")) RDD1: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [11] at parallelize at: 24scala > var RDD2 = sc.parallelize (List ("aa", "dd", "ff") RDD2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [12] at parallelize at: 24scala > RDD1.intersection (RDD2). Collectres5: Array [String] = Array (aa, dd) 8.distinct
Distinct is used to remove duplicates. The RDD generated by us may have duplicate elements, which can be removed by using the distinct method. However, this method involves washing, and the operation is very expensive.
Scala > var RDD1 = sc.parallelize (List ("aa", "aa", "bb", "cc", "dd") RDD1: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [7] at parallelize at: 24scala > RDD1.collectres4: Array [String] = Array (aa, aa, bb, cc, dd) scala > val distinctRDD = RDD1.distinct.collectdistinctRDD: Array [String] = Array (aa, dd, bb, cc) 9.groupByKey
GroupByKey groups RDD [key,value] according to the same key to form RDD [key,Iterable [value]], which is somewhat similar to groupby in sql, such as group_concat in mysql.
/ grouping student scores by name scala > val scoreDetail = sc.parallelize (List (("xiaoming", 75), ("xiaoming", 90), ("lihua", 95), ("lihua", 100), ("xiaofeng", 85)) scoreDetail: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [3] at parallelize at: 24scala > scoreDetail.groupByKey (). Collect (). Foreach (println (_) (lihua,CompactBuffer (95,100)) (xiaoming,CompactBuffer (75)) 90)) (xiaofeng,CompactBuffer (85)) 10.reduceByKey
Receive a function to perform the reduce operation according to the same key, similar to the reduce operation of scala
For example, RDD {(1,2), (3,4), (3,6)} reduce, key unchanged, value addition
Scala > var mapRDD = sc.parallelize (List ((Int, Int) mapRDD: org.apache.spark.rdd.RDD [(Int, Int)] = ParallelCollectionRDD [6] at parallelize at: 24scala > var reduceRDD = mapRDD.reduceByKey (_ + _) reduceRDD: org.apache.spark.rdd.RDD [(Int, Int)] = ShuffledRDD [7] at reduceByKey at: 26scala > reduceRDD.foreach (x = > println (x))
When aggregating the values of the same Key in PairRDD, a neutral initial value is also used in the aggregation process. Because aggregateByKey aggregates the values in the same Key, the final return type of the aggregateByKey function is still PairRDD, and the corresponding result is Key and the aggregated value.
Val data = sc.parallelize (List ((1Power3), (1pyr2), (1pje 4), (2pje 3), (3pr 7), (3je 8)), 1) / / println (data.partitions.size) / * * compare the same value to get the maximum value of two value, the first time is max (1Magne3), 1 is the initial value, get: 3, the second time is max (3Power2), get 3, * the third time is max (3Pie4), get: 4 So if key is 1, the result is: (1) * / def seq (a:Int, b:Int): Int = {math.max (a:Int)} / * * if there is only one partition, the value of the same key in different partitions is added. This method does not work * / def comb (a:Int, b:Int): Int = {a + b} / / aggregate print result data.aggregateByKey (1) (seq, comb) .uplot.foreach (println (_)) / / View each partition data data.mapPartitionsWithIndex {(partid, iter) = > {var part_map = scala.collection.mutable.Map [String, List [(Int) Int)]] () var part_name = "part_" + partid part_map (part_name) = List [(Int,Int)] () while (iter.hasNext) {part_map (part_name): + = iter.next () /: + = the additional element} part_map.iterator} at the end of the list. Collect (). Foreach (println (_)) 12.sortByKey
Used to sort pairRDD by key. The first parameter can be set to true or false. The default is true.
Scala > val rdd = sc.parallelize (Array ((3Power4), (1pime2), (4mem4), (2pyrr5), (6mae5), (5pyrr6) rdd: org.apache.spark.rdd.RDD [(Int, Int)] = ParallelCollectionRDD [10] at parallelize at: 24scala > rdd.sortByKey (). Collectres4: Array [(Int, Int)] = Array ((1Power2), (2p5), (3mem4), (4Quin4), (5Jing 6) (6magee 5) scala > rdd.sortByKey (true). Collectres5: Array [(Int, Int)] = Array ((1pim2), (2meme 5), (3Power4), (4collectres6 4), (5pyrrine 6), (6pyrr5)) scala > rdd.sortByKey (false). Collectres6: Array [(Int, Int)] = Array ((6Med 5), (5pr 6), (4M 4), (3J 4), (2J 5), (1J 2) 13.join
RDD1.join (RDD2), which can connect the same key in RDD1,RDD2, similar to the inner join operation in sql, and return matching data on both sides.
Scala > val RDD1 = sc.parallelize (Array (("A", "A1"), ("B", "B1"), ("C", "C1"), ("D", "D1"), ("E", "E1"), ("F", "F1")) RDD1: org.apache.spark.rdd.RDD [(String, String)] = ParallelCollectionRDD [19] at parallelize at: 24scala > val RDD2 = sc.parallelize (Array (("A", "a2"), ("B", "b2") ("C", "C1"), ("C", "c2"), ("C", "c3"), ("E", "E2") RDD2: org.apache.spark.rdd.RDD [(String, String)] = ParallelCollectionRDD [20] at parallelize at: 24scala > RDD1.join (RDD2). Collectres8: Array [(String, (String, String))] = Array ((B, (b1, String), (A, (A1), (C, (C1, C1)), (C) Scala > RDD2.join (RDD1). Collectres9: Array [(String, (String, String)] = Array ((B, (b2)), (A, (a2), (C, (C1)), (C, (c2)), (C, (c2)), (C, (c3)), (E, (e2), (E, (e2)
Other actions:
Left outer join: based on the left, records leaning to the left (left (a) must exist, records on the right (b) return Some (x), and None are not available. )
Scala > RDD1.leftOuterJoin (RDD2). Collectres11: Array [(String, (String, option [string])] = Array ((B, (b1jiSome (b2), (A, (a1jiSome (a2), (C, (c1jiSome (c2), (C, (c1PowerSome (c2), (C, (c1JSome (c3)), (E, (e1JSome (e2), (F, (f1jinone)), (D, (D1) ) scala > RDD2.leftOuterJoin (RDD1). Collectres12: Array [(String, (String, option [string])] = Array ((B, (b2jSome (b1), (A, (a2Magi Some (A1), (C, (C1), (C, (c2MagneSome (C1), (C, (c3dSome (C1), (E, (e2Magee Some (E1)
Right outer join: is based on the right, leaning to the right (the record on the right (b) must exist, the record on the left (a) returns Some (x), and there is no complement None.)
Scala > RDD1.rightOuterJoin (RDD2). Collectres13: Array [(String, (Option [String], String))] = Array ((B, (Some (b1), b2)), (A, (Some (A1), a2)), (C, (Some (C1), C1)), (C, (Some (C1), c2)), (C, (Some (C1), c3)), (E, (Some (E1), e2)) scala > RDD2.rightOuterJoin (RDD1). Collectres14: Array [(String (C1), c2)) (Option [String], String)] = Array ((B, (Some (b2), b1)), (A, (Some (a2), A1)), (C, (Some (C1), C1)), (C, (Some (c2), C1)), (C, (Some (c3), C1)), (E, (Some (e2), E1)), (F, (None,f1)), (D, (None,d1)
Full outer join: both the left and the right must exist
Scala > RDD1.fullOuterJoin (RDD2). Collectres16: Array [(String, (Option [String], option [string])] = Array ((B, (Some (b1), Some (b2), (A, (Some (A1), Some (a2)), (C, (Some (C1), Some (C1), (C, (Some (C1), Some (c2)), (C, (Some (C1), Some (c3), (E, (Some (E1)) Some (e2)), (F, (Some (F1), None), (D, (Some (D1), None)) scala > RDD2.fullOuterJoin (RDD1). Collectres17: Array [(String, (Option [String], optionString])] = Array ((B, (Some (b2), Some (b1)), (A, (Some (a2), Some (A1)), (C, (Some (C1), Some (C1), (C, (Some (C1) Some (C1)), (C, (Some (c3), Some (C1), (E, (Some (e2), Some (E1), (F, (None,Some (F1), (D, (None,Some (D1) 14.cogroup
For the KV elements in two RDD, the elements in the same key in each RDD are aggregated into a collection.
Unlike reduceByKey, you merge elements of the same key in two RDD.
In the example, the Value corresponding to the same Key in multiple RDD is grouped together. There is no element whose Key is dd in rdd1 (naturally there is no Value), and the corresponding position of rdd1 is changed in the process of combination.
Set to CompactBuffer () instead of removing it.
Scala > val rdd1 = sc.parallelize (Array (("aa", 1), ("bb", 2), ("cc", 6)) rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [6] at parallelize at: 24scala > val rdd2 = sc.parallelize (Array (("aa", 3), ("bb", 4), ("cc", 5), ("dd", 6), ("aa", 8)) rdd2: org.apache.spark.rdd.RDD [(String) Int)] = ParallelCollectionRDD [7] at parallelize at: 24scala > val rdd3 = rdd1.cogroup (rdd2). Collectrdd3: Array [(String, (Iterable [Int], CompactBuffer [Int]))] = Array ((aa, (CompactBuffer (1), CompactBuffer (3,8)), (dd, (CompactBuffer (), CompactBuffer (6), (bb, (CompactBuffer (2), CompactBuffer (4)), (cc, (CompactBuffer (6), CompactBuffer (5) 15.cartesian (Cartesian product)
RDD1.cartesian (RDD2) returns the Cartesian product of RDD1 and RDD2, which is very expensive
Scala > var RDD1 = sc.parallelize (List ("1", "2", "3")) RDD1: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [0] at parallelize at: 24scala > var RDD2 = sc.parallelize (List ("a", "b", "c")) RDD2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [1] at parallelize at: 24scala > RDD1.cartesian (RDD2). Collectres0: Array [(String, String)] = Array (1), (1) C), (2), (2), (2), (2), (3), (3), (3)) 16.pipe
A special Rdd, pipedRdd, provides calls to external programs, such as CUDA-based C++ programs, so that they can calculate faster. Caffe on spark and tensorflow on spark are also based on this mechanism.
# prepare script #! / bin/shecho "Running shell script" while read LINE Do echo ${LINE}! done# Spark RDD calls scala > val data = sc.parallelize (List ("hi", "hello", "how", "are") "you") data: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [52] at parallelize at: 24scala > val scriptPath = "/ home/hadoop/echo.sh" scriptPath: String = / home/hadoop/echo.shscala > val pipeRDD = data.pipe (scriptPath) pipeRDD: org.apache.spark.rdd.RDD [String] = PipedRDD [53] at pipe at: 28scala > pipeRDD.collect () res21: Array [String] = Array (Running shell script, hires, hellographies, howboys, areas, you!) 17.coalesce and repartition
Both of them are RDD partitions to be redivided, and repartition is just a simple implementation of shuffle as true in the coalesce interface.
The shuffle process must occur in repartition.
Coalesce is not necessarily.
Def repartition (numPartitions: Int) (implicit ord: Ordering [T] = null): RDD [T] = withScope {coalesce (numPartitions, shuffle = true)}
Suppose RDD has N partitions and needs to be redivided into M partitions
1) NM and there is little difference between N and M, (if N is 1000 and M is 100), then several of the N partitions can be merged into a new partition, and finally merged into M partitions, then shuff can be set to false, in the case of shuffl for false, if M > N, coalesce is invalid, no shuffle process is carried out, and there is a narrow dependency between the parent RDD and the child RDD. 3) if N > M and there is a great difference between the two, then if shuffle is set to false, father and son RDD is a narrow dependency, and they are in the same Stage, it may cause the parallelism of the spark program is not enough, thus affecting performance, if the M is 1, in order to make the operation before coalesce have a better degree of parallelism, you can say that shuffle is set to true. In short: if shuff is false, if the parameter passed in is greater than the number of existing partitions, the number of partitions in RDD remains the same, that is, it is impossible to increase the number of RDDde partitions without going through shuffle. 18.repartitionAndSortWithinPartitions
The RDD is repartitioned according to the given partition program, and the records are sorted by pressing the key within each generated partition. This is more efficient than calling repartition and then sorting within each partition because it presses the sort into the shuffle machine.
RepartitionAndSortWithinPartitions is an efficient operator because it is more efficient than using repartitionAnd sortByKey, and this is because it is sorted in the shuffle process, shuffle on the one hand and sorting on the other.
Package coreimport org.apache.spark.rdd.RDDimport org.apache.spark. {HashPartitioner, SparkConf, SparkContext} object TransformationDemo {def main (args: Array [String]): Unit = {val sparkConf: SparkConf = new SparkConf (). SetMaster ("local"). SetAppName ("test") val sc = new SparkContext (sparkConf) val rdd1: RDD [(Int, Int)] = sc.parallelize (7) / / 1 println (rdd1.partitions.size) / * * (0, (7)) * (0, (6) (5)) * (0, (5)) * (0, (4)) * (0, (3)) * (0, (2) * (0 (1new HashPartitioner 2)) * / rdd1.mapPartitionsWithIndex (mapPartIndexFunc) .foreach (println) / / repartition and sort (sort by default according to the ascending order of key) val rdd2: RDD [(Int, Int)] = rdd1.repartitionAndSortWithinPartitions (new HashPartitioner (3)) / / 3 println (rdd2.partitions.size) / * * (0, (6) * (0, (3)) * (3)) * (1, (7) * (1) (4)) * (1, (1)) * (2, (5) * (2) (2 mapPartIndexFunc 3)) * / rdd2.mapPartitionsWithIndex (mapPartIndexFunc) .foreach (println) / * * (3) (3) * (6) (5) * (1) (4) * (7) (2) * (2) * (5) * / rdd2.collect (). Foreach (println) sc.stop ()} / * * Take the data in each partition * @ param i1 * @ param iter * @ return * / def mapPartIndexFunc (i1:Int Iter: Iterator [(Int,Int)]): Iterator [(Int, (Int,Int))] = {var res= List [(Int, (Int,Int))] () while (iter.hasNext) {var next = iter.next () res=res.::} res.iterator}}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.