Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand aggregate and aggregateByKey in Spark operation

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

Today, I will talk to you about how to understand aggregate and aggregateByKey in Spark operation. Many people may not understand it very well. In order to make you understand better, the editor has summarized the following contents for you. I hope you can get something according to this article.

1. Aggregate function

Aggregate the elements in each partition, and then use the combine function to combine the result and initial value (zeroValue) of each partition. The type returned by this function does not need to be the same as the element type in RDD.

The seqOp operation aggregates the elements in each partition, and then the combOp operation aggregates the aggregate results of all partitions again, and the initial value of both operations is zeroValue. The operation of seqOp is to traverse all the elements (T) in the partition, the first T operates with zeroValue, and then acts as a zeroValue with the second T until the entire partition is traversed. The combOp operation is to aggregate and reaggregate the results of each partition. The aggregate function returns a value of a different type than RDD. Therefore, one operation seqOp is needed to merge the element T in the partition into a single U, and the other operation combOp aggregates all U.

Example program:

Scala > val rdd = List (1) rdd: List [Int] = List (1) scala > rdd.par.aggregate ((0)) ((acc,number) = > (acc._1 + number, acc._2 + 1), (par1,par2) = > (par1._1 + par2._1, par1._2 + par2._2) res0: (Int, Int) = (45) scala > res0._1 / res0._2res1: Int = 5

The process goes something like this:

First of all, the initial value is (0d0), which will be used in the next two steps. Then, (acc,number) = > (acc._1 + number, acc._2 + 1), number is the T in the function definition, and here is the element in List. So the process of acc._1 + number,acc._2 + 1 is as follows.

1. 0'1, 0'12. 1'2, 1'13. 3'3, 2'14. 6'4, 3'15. 10'5, 4'16. 15'6, 5'17. 21'7, 6'18'8, 7'19. 36'9, 8'1

The result is (45pd9). What is demonstrated here is a single-threaded computing process. The actual Spark execution is distributed computing, and the List may be divided into multiple partitions. If there are 3, p1 (1, 2, 3, 4), p2 (5, 6, 7, 8), p3 (9), the calculated results of each partition (10, 4), (26, 4), (9,), and so on. Execution (par1,par2) = > (par1._1 + par2._1, par1._2 + par2._2) is (10, 26, 9, 4, 4, 4, 1), that is, 45 (9), and then it is easy to find the average.

2. AggregateByKey function:

The same key value in PairRDD is aggregated, and a neutral initial value is also used in the aggregation process. Similar to the aggregate function, the type of the aggregateByKey return value does not need to be the same as the type of value in RDD. Because aggregateByKey aggregates the values in the same Key, the final return type of the aggregateByKey' function is still PairRDD, and the corresponding result is Key and the aggregated value, while the aggregate function directly returns a non-RDD result.

Example program:

Import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject AggregateByKeyOp {def main (args: array [string]) {val sparkConf: SparkConf = new SparkConf (). SetAppName ("AggregateByKey"). SetMaster ("local") val sc: SparkContext = new SparkContext (sparkConf) val data=List ((1) SparkContext 3), (1) (2), (1) val rdd=sc.parallelize (data, 2)

/ / combining the values in different partition, the data type def combOp (def combOp string): String= {println ("combOp:" + a + "\ t" + b) aplimb} is obtained.

/ / the values merged in the same partition, the data type of an is the data type of zeroValue, and the data type of b is the data type def seqOp of the original value: String= {println ("SeqOp:" + a + "\ t" + b) aplimb} rdd.foreach (println)

/ / zeroValue: neutral value, defines the type that returns value, and participates in the operation

/ / seqOp: used to merge values in the same partition

/ / combOp: used to merge values in different partiton

Val aggregateByKeyRDD=rdd.aggregateByKey (seqOp, combOp) sc.stop ()}}

Running result:

Split the data into two partitions

/ / Partition 1 data (1p3) (1pc2) / / Partition 2 data (1p4) (2p3)

/ / Partition one merges data from the same key seq: 1003

/ / (1) start merging with neutral values and merging result is 1003seq: 10032 / / (1p2) merge result is 10032

/ / Partition 2 merging data from the same key seq: 100 4

/ / (1pr 4) start merging with neutral values 1004seq: 1003 / / (2pr 3) start merging with neutral values 1003

Merge the results of two partitions /

If / key is 2, it only exists in one partition and does not need to be merged (2meme 1003) (2pr 1003)

/ / key is 1, exists in two partitions, and the data type is the same, merge comb: 100321004 (1Magne 100321004)

After reading the above, do you have any further understanding of the understanding of aggregate and aggregateByKey in Spark operation? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report