Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark2.x from shallow to deep to the end series 6 RDD java api detailed explanation 2

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Before learning Spark, it is recommended to correctly understand spark, you can refer to: correct understanding of spark

This article gives a detailed description of the basic action api of JavaRDD.

First define two Comparator implementations, one is to implement ascending order, and the other is to implement descending order

/ / ascending sort comparator private static class AscComparator implements Comparator, Serializable {@ Override public int compare (java.lang.Integer o1, java.lang.Integer o2) {return o1-o2;}} / descending sort comparator private static class DescComparator implements Comparator, Serializable {@ Override public int compare (java.lang.Integer o1, java.lang.Integer o2) {return o2-o1;}}

Define another RDD:

JavaRDD listRDD = sc.parallelize (Arrays.asList (1,2,4,3,3,6), 2)

1. Collect, take, top, first

/ / results: [1, 2, 4, 3, 3, 6] collect all data from RDD to driver for small data or experiments. / / A pair of RDD with large amount of data will cause driver memory overflow System.out.println ("collect =" + listRDD.collect ()). / / result: [1, 2] the principle of collecting the first two elements of RDD to the java side / / take is roughly as follows: first see if the elements of the first partition of RDD are enough. We want the number of take / / and then estimate how many partitions of data need to be scanned according to the remaining amount of data needed for take, until take reaches the number of data we want. System.out.println ("take (2) =" + listRDD.take (2)) / result: [6,4] take the largest two elements of RDD ascending order System.out.println ("top (2) =" + listRDD.top (2)); / / result: [1,2] take the largest two elements of RDD descending order (that is, take the smallest two elements of RDD) System.out.println ("DescComparator top (2) =" + listRDD.top (2, new DescComparator () / / result: 1 the underlying implementation is take (1) System.out.println ("first =" + listRDD.first ())

2. Min, max

/ result: 1. Taking the minimum value in ascending order is the minimum value of RDD System.out.println ("min =" + listRDD.min (new AscComparator (); / / result: 6 taking the minimum value in descending order is the maximum value of RDD System.out.println ("min =" + listRDD.min (new DescComparator (); / / result: 6 taking the maximum value in ascending order is the maximum value of RDD ("max =" + listRDD.max (new AscComparator (). / / result: 1 take the maximum value according to the descending order, which is the minimum value of RDD System.out.println ("max =" + listRDD.max (new DescComparator ()

The underlying layer of min and max is implemented in reduce api. Here is the pseudo code

Min () = = reduce ((x, y) = > if (x if (x > = y) x else y)

For reduce api, let's see the following explanation

III. TakeOrdered

/ result: [1,2] returns the smallest two elements of the RDD System.out.println ("takeOrdered (2) =" + listRDD.takeOrdered (2)); / / result: [1,2] returns the first two elements of RDD in ascending order, that is, the smallest two elements of the RDD System.out.println ("takeOrdered (2) =" + listRDD.takeOrdered (2, new AscComparator () / result: [6,4] returns the first two elements of RDD in descending order, that is, the two largest elements of the RDD, System.out.println ("takeOrdered (2) =" + listRDD.takeOrdered (2, new DescComparator ()

4. Foreach and foreachPartition

Foreach is a custom function that applies to every element of RDD, while foreachPartition is a custom function that applies to every partition of RDD. You should pay attention to the following suggestions when using it.

First define a more time-consuming operation:

Public static Integer getInitNumber (String source) {System.out.println ("get init number from" + source + ", may be take much time."); try {TimeUnit.SECONDS.sleep (1);} catch (InterruptedException e) {e.printStackTrace ();} return 1 } listRDD.foreach (new VoidFunction () {@ Override public void call (Integer element) throws Exception {/ / this performance is so poor that you need to call the time-consuming getInitNumber / / when traversing every element. I suggest using foreachPartition instead of foreach operation Integer initNumber = getInitNumber ("foreach"); System.out.println ((element + initNumber) + "=");}}) The function of listRDD.foreachPartition (new VoidFunction () {@ Override public void call (Iterator integerIterator) throws Exception {/ / is the same as that of foreach api, except that the function is applied to every record, this is to apply the function to every partition / / if there is a more time-consuming operation, it only needs to be performed once in each partition. Then use this function / / this time-consuming operation can be to connect to the database and other operations, do not need to calculate every time to connect to the database, a partition only needs to connect once Integer initNumber = getInitNumber ("foreach") While (integerIterator.hasNext ()) {System.out.println ((integerIterator.next () + initNumber) + "=");})

5. Reduce and treeReduce

Integer reduceResult = listRDD.reduce (new Function2 () {@ Override public Integer call (Integer ele1, Integer ele2) throws Exception {return ele1 + ele2;}}); / / result: 19System.out.println ("reduceResult =" + reduceResult); Integer treeReduceResult = listRDD.treeReduce (new Function2 () {@ Override public Integer call (Integer integer, Integer integer2) throws Exception {return integer + integer2;}}, 3) / / this 3 means to aggregate 3 times before calculating the result / / result: 19System.out.println ("treeReduceResult =" + treeReduceResult)

The results are the same for both of them, but the execution process is different, as follows:

If the number of partitions is too large, using treeReduce for multiple aggregations can improve performance, as shown below:

VI. Fold

The function of fold is similar to that of reduce, except that fold has an initial value.

/ / similar to the function of reduce, except that you need to add an initial value of 0 when calculating each partition, and finally add the calculated value of each partition plus this initial value Integer foldResult = listRDD.fold (0, new Function2 () {@ Override public Integer call (Integer integer, Integer integer2) throws Exception {return integer + integer2;}}); / / result: 19System.out.println ("foldResult =" + foldResult)

7. Aggregate and treeAggregate

/ / first initialize an initial value of the data type we want to return / / then apply function one (acc, value) = > (acc._1 + value, acc._2 + 1) to each element in each partition to aggregate / / finally apply the function (acc1, acc2) = > (acc1._1 + acc2._1, acc1._2 + acc2._2) to aggregate Tuple2 aggregateResult = listRDD.aggregate (new Tuple2 (0) 0), new Function2 () {@ Override public Tuple2 call (Tuple2 acc, Integer integer) throws Exception {return new Tuple2 (acc._1 + integer, acc._2 + 1) }, new Function2 () {@ Override public Tuple2 call (Tuple2 acc1, Tuple2 acc2) throws Exception {return new Tuple2 (acc1._1 + acc2._1, acc1._2 + acc2._2);}}); / / result: (1913) System.out.println ("aggregateResult =" + aggregateResult) Tuple2 treeAggregateResult = listRDD.treeAggregate (new Tuple2 (0,0), new Function2 () {@ Override public Tuple2 call (Tuple2 acc, Integer integer) throws Exception {return new Tuple2 (acc._1 + integer, acc._2 + 1) }, new Function2 () {@ Override public Tuple2 call (Tuple2 acc1, Tuple2 acc2) throws Exception {return new Tuple2 (acc1._1 + acc2._1, acc1._2 + acc2._2);}}, 2); / / result: (1910) System.out.println ("treeAggregateResult =" + treeAggregateResult)

The results of the two are the same, but the execution process is different. The following is the execution process of aggregate:

If the number of partitions in RDD is very large, it is recommended to use treeAggregate. The following is the execution process of treeAggregate:

Comparison of aggregate and treeAggregate:

1: the time complexity of aggregate operation on combine is O (n). The time complexity of treeAggregate is O (lgn).

N represents the number of partitions

2: aggregate takes all the data to driver, so there is a risk of memory overflow. TreeAggregate will not.

3:aggregate uses the initial value one more time than treeAggregate in the final result of the reduce operation.

For the explanation of the principle of api above, you can refer to the principle of spark core RDD api for a detailed explanation, because it is difficult to explain the principle clearly in words, and you can't remember it deeply after reading it.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report