In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Before learning any spark knowledge points, please correctly understand spark, you can refer to: correct understanding of spark
This article introduces the rdd java api of spark key-value type in detail.
I. how to create a RDD of key-value type
1 、 sparkContext.parallelizePairs
JavaPairRDD javaPairRDD = sc.parallelizePairs (Arrays.asList (new Tuple2 ("test", 3), new Tuple2 ("kkk", 3)); / / result: [(test,3), (kkk,3)] System.out.println ("javaPairRDD =" + javaPairRDD.collect ())
2. The way of keyBy
Public class User implements Serializable {private String userId; private Integer amount; public User (String userId, Integer amount) {this.userId = userId; this.amount = amount;} @ Override public String toString () {return "User {" + "userId='" + userId +'\'+ ", amount=" + amount +'}' }} JavaRDD userJavaRDD = sc.parallelize (Arrays.asList (new User ("U1", 20)); JavaPairRDD userJavaPairRDD = userJavaRDD.keyBy (new Function () {@ Override public String call (User user) throws Exception {return user.getUserId ();}}); / / result: [(U1Mae user {userId='u1', amount=20})] System.out.println ("userJavaPairRDD =" + userJavaPairRDD.collect ())
3. The way of zip
JavaRDD rdd = sc.parallelize (Arrays.asList 1, 2, 3, 5, 8, 13); / / two rdd zip is also a way to create a key-value type RDD JavaPairRDD zipPairRDD = rdd.zip (rdd); / / result: [(1prime1), (1magin1), (2pyr2), (3pd3), (5pyrrine 5), (8pyrrine 8), (13pyr13)] System.out.println ("zipPairRDD =" + zipPairRDD.collect ())
4. The way of groupBy
JavaRDD rdd = sc.parallelize (Arrays.asList (1, 1, 2, 3, 5, 8, 13); Function isEven = new Function () {@ Override public Boolean call (Integer x) throws Exception {return x% 2 = = 0;}; / / groups even and odd numbers to generate RDDJavaPairRDD oddsAndEvens = rdd.groupBy (isEven) of type key-value / result: [(false, [1,1,3,5,13]), (true, [2,8])] System.out.println ("oddsAndEvens =" + oddsAndEvens.collect ()); / / result: 1System.out.println ("oddsAndEvens.partitions.size =" + oddsAndEvens.partitions (). Size ()); oddsAndEvens = rdd.groupBy (isEven, 2) / result: [(false, [1,1,3,5,13]), (true, [2,8])] System.out.println ("oddsAndEvens =" + oddsAndEvens.collect ()); / / result: 2System.out.println ("oddsAndEvens.partitions.size =" + oddsAndEvens.partitions (). Size ())
II. CombineByKey
JavaPairRDD javaPairRDD = sc.parallelizePairs (Arrays.asList (new Tuple2 ("coffee", 1), new Tuple2 ("coffee", 2), new Tuple2 ("panda", 3), new Tuple2 ("coffee", 9), 2) / / when a new key is encountered in a partition, apply the function Function createCombiner = new Function () {@ Override public Tuple2 call (Integer value) throws Exception {return new Tuple2 (value, 1);} to the value corresponding to the partition. / when you encounter a key that has already applied the above createCombiner function in a partition, apply the function Function2 mergeValue = new Function2 () {@ Override public Tuple2 call (Tuple2 acc, Integer value) throws Exception {return new Tuple2 (acc._1 () + value, acc._2 () + 1) to the value corresponding to the key;}} / / apply this function Function2 mergeCombiners = new Function2 () {@ Override public Tuple2 call (Tuple2 acc1, Tuple2 acc2) throws Exception {return new Tuple2 (acc1._1 () + acc2._1 (), acc1._2 () + acc2._2 ());} when you need to aggregate data from different partitions. JavaPairRDD combineByKeyRDD = javaPairRDD.combineByKey (createCombiner, mergeValue, mergeCombiners); / / result: [(coffee, (12)), (panda, (3))] System.out.println ("combineByKeyRDD =" + combineByKeyRDD.collect ())
The data flow of combineByKey is as follows:
For a detailed explanation of the principle of combineByKey, see: detailed explanation of the principle of spark core RDD api
III. AggregateByKey
JavaPairRDD aggregateByKeyRDD = javaPairRDD.aggregateByKey (new Tuple2 (0,0), mergeValue, mergeCombiners); / / result: [(coffee, (12Power3)), (panda, (3jin1))] System.out.println ("aggregateByKeyRDD =" + aggregateByKeyRDD.collect ()) / / aggregateByKey is implemented by combineByKey. The above aggregateByKey is equal to the following combineByKeyRDDFunction createCombinerAggregateByKey = new Function () {@ Override public Tuple2 call (Integer value) throws Exception {return mergeValue.call (new Tuple2 (0,0), value);}} / / the result is: [(coffee, (12jin3)), (panda, (3pyr1))] System.out.println (javaPairRDD.combineByKey (createCombinerAggregateByKey, mergeValue, mergeCombiners) .collect ())
IV. ReduceByKey
JavaPairRDD reduceByKeyRDD = javaPairRDD.reduceByKey (new Function2 () {@ Override public Integer call (Integer value1, Integer value2) throws Exception {return value1 + value2;}}); / / result: [(coffee,12), (panda,3)] System.out.println ("reduceByKeyRDD =" + reduceByKeyRDD.collect ()) / / the underlying layer of reduceByKey is also implemented by combineByKey. The reduceByKey above is equal to the following combineByKeyFunction createCombinerReduce = new Function () {@ Override public Integer call (Integer integer) throws Exception {return integer;}}; Function2 mergeValueReduce = new Function2 () {@ Override public Integer call (Integer integer, Integer integer2) throws Exception {return integer + integer2;}} / / result: [(coffee,12), (panda,3)] System.out.println (javaPairRDD.combineByKey (createCombinerReduce, mergeValueReduce, mergeValueReduce) .collect ())
5. FoldByKey
JavaPairRDD foldByKeyRDD = javaPairRDD.foldByKey (0, new Function2 () {@ Override public Integer call (Integer integer, Integer integer2) throws Exception {return integer + integer2;}}); / / result: [(coffee,12), (panda,3)] System.out.println ("foldByKeyRDD =" + foldByKeyRDD.collect ()) / / the underlying layer of foldByKey is also implemented by combineByKey. The foldByKey above is equal to the following combineByKeyFunction2 mergeValueFold = new Function2 () {@ Override public Integer call (Integer integer, Integer integer2) throws Exception {return integer + integer2;}}; Function createCombinerFold = new Function () {@ Override public Integer call (Integer integer) throws Exception {return mergeValueFold.call (0, integer);}} / / result: [(coffee,12), (panda,3)] System.out.println (javaPairRDD.combineByKey (createCombinerFold, mergeValueFold, mergeValueFold) .collect ())
VI. GroupByKey
JavaPairRDD groupByKeyRDD = javaPairRDD.groupByKey (); / / result: [(coffee, [1,2,9]), (panda, [3])] System.out.println ("groupByKeyRDD =" + groupByKeyRDD.collect ()); / / the underlying layer of groupByKey is also implemented by combineByKey, and the groupByKey above is equal to the following combineByKeyFunction createCombinerGroup = new Function () {@ Override public List call (Integer integer) throws Exception {List list = new ArrayList (); list.add (integer); return list }}; Function2 mergeValueGroup = new Function2 () {@ Override public List call (List integers, Integer integer) throws Exception {integers.add (integer); return integers;}}; Function2 mergeCombinersGroup = new Function2 () {@ Override public List call (List integers, List integers2) throws Exception {integers.addAll (integers2); return integers;}} / / result: [(coffee, [1,2,9]), (panda, [3])] System.out.println (javaPairRDD.combineByKey (createCombinerGroup, mergeValueGroup, mergeCombinersGroup) .collect ())
It is difficult to explain the principle of api clearly with documentation. If you want to understand the principle of api more deeply and thoroughly, you can refer to: spark core RDD api principle.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.