Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Dealing with key-value pairs RDD

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The RDD that saves the Key/Value pair is called Pair RDD.

1. Create a Pair RDD:

1.1 how to create a Pair RDD:

Many data formats generate Pair RDD directly when importing RDD. We can also use map () to convert the normal RDD mentioned earlier into Pair RDD.

1.2 Pair RDD conversion instance:

In the following example, change the original RDD to the first word to do Key, and the whole line to do the Pair RDD of Value.

There is no tuple type in Java, so use scala's scala.Tuple2 class to create tuple. Create tuple: new Tuple2 (elem1,elem2); access elements of tuple: access using. _ 1 () and. _ 2 () methods.

Also, you can use the basic map () function in your Python and Scala implementations, and java needs to use the function mapToPair ():

/ * convert ordinary basic RDD into a PairRDD, business logic: take the first word of each line as Key, and the whole sentence return Key/Value PairRDD as Value. * @ param JavaRDD * @ return JavaPairRDD * / public JavaPairRDD firstWordKeyRdd (JavaRDD input) {JavaPairRDD pair_rdd = input.mapToPair (new PairFunction () {@ Override public Tuple2 call (String arg0)) Throws Exception {/ / TODO Auto-generated method stub return new Tuple2 (arg0.split (") [0] Arg0) }}); return pair_rdd;}

When creating a PairRDD from a collection in memory, Python and Scala need to use the function SparkContext.parallelize (); Java uses the function SparkContext.parallelizePairs ().

Conversion operation of 2.Pair RDD:

2.1 list of common conversion operations in Pair RDD:

The conversion operations used by the underlying RDD can also be used in Pair RDD. Because tuple is used in Pair RDD, you need to pass a function that operates on tuple to Pair RDD.

The following table lists the conversion operations commonly used by Pair RDD (case RDD content: {(1,2), (3,4), (3,6)})

The function name function call example returns the result reduceByKey (func) Combine values with the same key.rdd.reduceByKey ((x, y) = > x + y) {(1 2), (3je 10)} groupByKey () Group values with the same key.rdd.groupByKey () {(1, [2]), (3, [4J 6])} combineByKey (createCombiner,mergeValue, mergeCombiners,partitioner) Combine values with the same key using a different result type.

MapValues (func) Apply a function to each value of a pair RDD without changing the key.rdd.mapValues (x = > Xen1) {(1pd3), (3pd5), (3pc7)} flatMapValues (func)

Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization.

Rdd.flatMapValues (x = > (x to 5) {(1 2), (1rdd.flatMapValues 3), (1Power4), (1pje 5), (3je 4)} keys () Return an RDD of just the keys.rdd.keys () {1Return an RDD of just the keys.rdd.keys 3,3} values () Return an RDD of just the values.rdd.values () {2,4,6} sortByKey () Return an RDD sorted by the key.rdd.sortByKey () {(1pr 2), (3pr 4), (3pm 6)}

The following table lists the conversion operations between the two RDD (rdd = {(1,2), (3,4), (3,6)} other = {(3jin9)}):

The function name function call example returns the result subtractByKeyRemove elements with a key present in the other RDD.rdd.subtractByKey (other) {(1,2)} joinPerform an inner join between two RDDs.rdd.join (other) {(3, (4,9)), (3, (6,9)} rightOuterJoinPerform a join between two RDDs where the key must be present in the first RDD.rdd.rightOuterJoin (other) {(3, (Some (4), 9)), (3, (Some (6)) (9))} leftOuterJoinPerform a join between two RDDs where the key must be present in the other RDD.rdd.leftOuterJoin (other) {(1, (2menone)), (3, (4 recollection Some (9), (3, (6 dint Some (9))} cogroupGroup data from both RDDs sharing the same key.rdd.cogroup (other) {(1, ([2], [])), (3, ([4,6], [9]))}

2.2 Pair RDD filter operation:

PairRDD is also RDD, so the operations described earlier (such as filter) also apply to PairRDD. The following procedure filters rows greater than 20 in length:

/ * PairRDD filters rows with a length greater than 20. * @ param JavaPairRDD * @ return JavaPairRDD * / public JavaPairRDD filterMoreThanTwentyLines (JavaPairRDD input) {JavaPairRDD filter_rdd = input.filter (new Function () {@ Override) Public Boolean call (Tuple2 arg0) throws Exception {/ / TODO Auto-generated method stub return (arg0._2.length () > 20) }}); return filter_rdd;}

2.3 aggregation operations:

. / spark-shell-- jars / spark/alluxio-1.2.0/core/client/target/alluxio-core-client-1.2.0-jar-with-dependencies.jar

Pair RDD provides the following methods:

1. ReduceByKey () method: you can reduce the data corresponding to each key separately.

2. The join () method: you can combine two elements with the same key in RDD into a single RDD.

First, create a Pair RDD:

When you need to convert a normal RDD to a Pair RDD, you can use the map () function to do so.

Program 4-1: create a Pair RDD using the first word as a key:

Val text1 = sc.textFile ("file:///spark/spark/README.md")

Val pair1 = text1.map (x = > (x.split (") (0), x))

Println (pair1.collect () .mkString ("))

Program 4-2: filter the second element of Pair RDD:

Val text2 = sc.textFile ("file:///spark/spark/README.md")

Val pair_base2 = text2.map (x = > (x.split (") (0), x))

Val pair2 = pair_base2 .filter {case (key,value) = > value.length (xMagol 1)) .reduceByKey ((xmemy) = > (x.quarter1filter.

Println (text3_final.collect () .mkString ("))

Calling reduceByKey () and foldByKey () automatically merges locally on each machine before each key calculates the overall global result.

Among them, the output results obtained by mapValues (x = > (xmem1)) are: "panda", (0recover1); "pink", (3jing1); "pirate", (3jor1); "panda", (1jue 1); and "pink", (4jue 1).

The next step is to reduceByKey ((xpencil y) = > (x.fugi1roomy.fug1rex.fut2fuy.room2); automatically merge the data of the same key. For example, for panda, (0re1), (1mag1) = > (0fu1,1fu1) {explain: the first data of (0pen1) plus the first data of (1pen1) as the first data, and the second data of (0pen1) plus the second data as the second data}, that is, (1pd2).

The output of program 4-3 is similar to: (pink, (7)) (pirate, (3)) (panda, (1)); get the sum of each key and the number of occurrences of each key.

Program 4-4: achieve word counting:

Val text4 = sc.textFile ("file:///spark/spark/README.md")

Val words = text4.flatMap (x = > x.split (""))

Words.map (x = > (xprimel)) .reduceByKey ((xmemy) = > xonomy)

CombineByKey () is the most commonly used function for key-based aggregation. CombineByKey () allows the user to return a return value that is different from the type of input data.

If a new element (key) appears for the first time (in each partition instead of the entire RDD for the first time), the createCombiner () function is used to create the initial value of the accumulator for that key.

If it is a key that has been encountered before processing the current partition, use the mergeValue () method to merge the current value of the accumulator for that key with the new value.

CombineByKey () has several parameters corresponding to each stage of the aggregation operation, so it is very suitable to explain the functional division of each stage of the aggregation operation.

Program 4-5: use combineByKey () to calculate the average of each key:

Val input = sc.parallelize (List (("panda", 0), ("pink", 3), ("pirate", 3), ("panda", 1), ("pink", 4)

Val result = input.combineByKey ((v) = > (vPol 1))

(acc: (Int,Int), v) = > (acc._1+v,acc._2+1)

(acc1: (Int,Int), acc2: (Int,Int)) = > (acc1._1+acc2._1,acc1._2+acc2._2)

). Map {case (key,value) = > (key,value._1/value._2.toFloat)}

Result.collectAsMap () .map (println (_)

Where combineByKey () receives three functions:

The first function is createCombiner (), which means that a new key is encountered for the first time in a partition. For example, if panda appears for the first time, the createCombiner function is called.

The second function is mergeValue (), which is called in the same partition if there is a key that has appeared before, such as panda.

The third function is mergeCombiners ()

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report