Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark Notes arrangement (6): Spark Advanced sorting and TopN problem Disclosure

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

Introduce

The key point is how to sort the counted words according to the number of words in wordcount.

As follows:

Scala > val retRDD = sc.textFile ("hdfs://ns1/hello"). FlatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _) scala > val retSortRDD = retRDD.map (pair = > (pair._2, pair._1)) .sortByKey (false) .map (pair = > (pair._2, pair._1)) scala > retSortRDD.collect (println). (hello,3) (me,1) (you,1) (he,1)

The following tests require the introduction of maven dependencies

Org.scala-lang scala-library 2.10.5 org.apache.spark spark-core_2.10 1.6.2Spark Secondary sorting Test data and description

The data formats that require secondary sorting are as follows:

Field_1' 'field_2 (split by spaces) 20 21 50 51 50 52 50 53 50 50 54 60 51 60 53 60 52 60 56 57 70 58 60 61 70 54

The code comments below are explained in detail, and it is pointed out here that in the following sorting process, Java and Scala are used for sorting, respectively, and:

Java version 1: make elements comparative-- > need to use SecondarySort object mode 2: provide comparators-- > need to use SecondarySort objects no matter which way you use, you need to use a new variable object SecondarySortScala version 1: to make elements comparable, it is actually a scala implementation of Java version 1-- > you need to use SecondarySort object mode 2: the first way to use sortBy Sort based on raw data-> do not need to use SecondarySort object mode 3: use sortBy the second way to transform raw data-> need to use SecondarySort object

So this example of secondary sorting contains a total of 5 versions of Java and Scala, which is very valuable!

Public object

It is actually a SecondarySort object, as follows:

Package cn.xpleaf.bigdata.spark.java.core.domain;import scala.Serializable;public class SecondarySort implements Comparable, Serializable {private int first; private int second; public SecondarySort (int first, int second) {this.first = first; this.second = second;} public int getFirst () {return first;} public void setFirst (int first) {this.first = first } public int getSecond () {return second;} public void setSecond (int second) {this.second = second;} @ Override public int compareTo (SecondarySort that) {int ret = this.getFirst ()-that.getFirst (); if (ret = = 0) {ret = that.getSecond ()-this.getSecond ();} return ret } @ Override public String toString () {return this.first + "" + this.second;}} Java version

The test code is as follows:

Package cn.xpleaf.bigdata.spark.java.core.p3;import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Serializable;import scala.Tuple2;import java.util.Comparator / * * Java version of the secondary sort * field_1' 'field_2 (using space division) * 20 21 50 51 50 52 50 53 50 50 53 54 60 51 60 53 52 60 56 60 57 70 58 60 61 70 54 demand: first sort according to the ascending order of the first column, if the first column is equal, sort according to the descending order of the second column: to sort, use sortByKey, you can also use sortBy if you use sortByKey, you can only sort according to key Now we use the first column as key? Or the second column? According to the requirement, we can only use the compound key (including both the first column and the second column), because the composite key must be comparable, or the operation provides a comparator when viewing the operation, but does not provide us with a comparator We have no choice but to make elements comparable and use custom objects. You can use the comprable interface * / public class _ 01SparkSecondarySortOps {public static void main (String [] args) {SparkConf conf = new SparkConf () .setMaster ("local [2]") .setAppName (_ 01SparkSecondarySortOps.class.getSimpleName ()) JavaSparkContext jsc = new JavaSparkContext (conf); JavaRDD linesRDD = jsc.textFile ("D:/data/spark/secondsort.csv"); JavaPairRDD * DD = linesRDD.mapToPair (new PairFunction () {@ Override public Tuple2 call (String line) throws Exception {String [] fields = line.split (""); int first = Integer.valueOf (fields [0] .trim ()) Int second = Integer.valueOf (fields [1] .trim ()); SecondarySort ss = new SecondarySort (first, second); return new Tuple2 (ss, ");}}); / * / / the first way: make the element comparative JavaPairRDD sbkRDD = * DD.sortByKey (true, 1) / / set partition to 1 so that the data can be ordered as a whole, otherwise it is only the second way to order * / / * in partition, which provides a comparator * contrary to the previous method, this time is: the first column is in descending order The second column is in ascending order * / JavaPairRDD sbkRDD = * DD.sortByKey (new MyComparator () {@ Override public int compare (SecondarySort o1, SecondarySort O2) {int ret = o2.getFirst ()-o1.getFirst () If (ret = = 0) {ret = o1.getSecond ()-o2.getSecond ();} return ret;}}, true, 1) SbkRDD.foreach (new VoidFunction () {@ Override public void call (Tuple2 tuple2) throws Exception {System.out.println (tuple2._1);}}); jsc.close () }} / * to make an intermediate transition interface * it is necessary to implement the serialization interface, otherwise an exception will be reported * the adapter Adapter pattern (Adapter Pattern) is used as a bridge between two incompatible interfaces. This is a very good example. * / interface MyComparator extends Comparator, Serializable {}

The output is as follows:

740 58730 54530 54203 2174 5873 5771 5571 5670 5470 5570 5670 5770 5863 5160 5260 5360 5660 5,760 5760 5760 5150 5150 5350 5350 5450 6250 51250 52240 51131 4220 2120 5320 52217 825 63 41 2Scala version

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.core.p3import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySortimport org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf SparkContext} import scala.reflect.ClassTagobject _ 05SparkSecondarySortOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 05SparkSecondarySortOps.getClass.getSimpleName) val sc = new SparkContext (conf) val linesRDD = sc.textFile ("D:/data/spark/secondsort.csv") / * val * * DD:RDD [(SecondarySort) String)] = linesRDD.map (line = > {val fields = line.split (") val first = Integer.valueOf (fields (0). Trim ()) val second = Integer.valueOf (fields (1). Trim () val ss = new SecondarySort (first, second) (ss,")}) / / the first way Use elements with comparative val sbkRDD:RDD [(SecondarySort, String)] = * DD.sortByKey (true, 1) sbkRDD.foreach {case (ss:SecondarySort, str:String) = > {/ / use pattern matching println (ss)}} * / / * / the first way to use sortBy Based on the original data val retRDD = linesRDD.sortBy (line = > line, numPartitions = 1) (new Ordering [String] {override def compare (x: String) Y: String): Int = {val xFields = x.split ("") val yFields = y.split ("") var ret = xFields (0). ToInt-yFields (0). ToInt if (ret = 0) {ret = yFields (1). ToInt-xFields (1). ToInt} Ret}} ClassTag.Object.asInstanceOf [ClassTag [string]]) * / / the second way to use sortBy Convert the original data-> sortBy () the function of the first parameter is to convert the data val retRDD:RDD [String] = linesRDD.sortBy (line = > {/ / f: (T) = > K / / where the type of T is String K is the SecondarySort type val fields = line.split (") val first = Integer.valueOf (fields (0). Trim ()) val second = Integer.valueOf (fields (1). Trim () val ss = new SecondarySort (first, second) ss}, true, 1) (new Ordering [SecondarySort] {override def compare (x: SecondarySort) Y: SecondarySort): Int = {var ret = x.getFirst-y.getFirst if (ret = = 0) {ret = y.getSecond-x.getSecond} ret}, ClassTag.Object.asInstanceOf [ClassTag [secondarySort]] retRDD.foreach (println) sc.stop ()}

The output is as follows:

1 23 45 67 827812 21120 52220 5320 2131 4240 51150 52250 51250 6250 5450 5350 5350 5250 5160 5760 5760 5,660 5260 5163 6170 5870 5870 5770 5570 5571 5571 5774 58203 21530 54730 54740 58TopN problem requirements and instructions

The requirements and data are described as follows:

* description of TopN problem: * TopN problem can obviously be accomplished using action operator take, but because take needs to pull all the data to Driver to complete the operation, * the memory pressure of Driver is very high, so it is not recommended to use take. * * to analyze the TopN problem here, the data and requirements are as follows: * chinese ls 91 * english ww 56 * chinese zs 90 * chinese zl 76 * english zq 88 * chinese wb 95 * chinese sj 74 * english ts 87 * english ys 67 * english mz 77 * chinese yj 98 * english gk 96 * * requirements: list the top three of each subject

The following respectively use low-performance groupByKey and high-performance combineByKey to operate, detailed instructions have been given in the code, pay attention to its ideas is very important, especially the use of combineByKey to solve groupByKey performance problems, if you are interested, you can read the code, and its ideas, because these are closely related to the theory of Spark itself.

Use groupByKey to solve

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.core.p3import org.apache.log4j. {Level, Logger} import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf, SparkContext} import scala.collection.mutable/** * TopN problem description: * the TopN problem can obviously be accomplished by using the action operator take, but because take needs to pull all the data to Driver to complete the operation, * so the memory pressure of Driver is very high. Take is not recommended. * * TopN problem analysis will be carried out here. The data and requirements are as follows: * chinese ls 91 * english ww 56 * chinese zs 90 * chinese zl 76 * english zq 88 * chinese wb 95 * chinese sj 74 * english ts 87 * english ys 67 * english mz 77 * chinese yj 98 * english gk 96 * * requirements: first convert the map operation to the tuple of (subject, name + score) * and then groupByKey according to the key subject. In this way, you can get gbkRDD * and then perform map operations on it, and use treeSet in map operations to get the top three (both size control and sorting) * * question: * the above solution is used cautiously in the production process * because executing groupByKey will pull all the same data of key into the same partition, and then perform the operation. * the process of pulling is shuffle, which is a distributed performance killer! In addition, if there is too much data corresponding to key, it is likely to cause data skew, or OOM, * then this mode of operation needs to be avoided as much as possible. * how do you do that? You can refer to the idea of the TopN problem in MR. In MR, the data is filtered in each map task. Although you still need to shuffle to a node in the end, the amount of data will be greatly reduced. * the idea referred to in Spark is that the data can be filtered in each partition, and then the filtered data from each partition can be merged and sorted again to get the final sorting result. * obviously, this can solve the problem that the data mentioned earlier is in the same partition, which leads to too much data! Because the work of partition filtering can greatly reduce the amount of data. * so what operators can do this in Spark? That is combineByKey or aggregateByKey, and its specific usage can be found in my previous blog post, where I use combineByKey to operate. * / object _ 06SparkTopNOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 06SparkTopNOps.getClass.getSimpleName ()) val sc = new SparkContext (conf) Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) Logger.getLogger ("org.apache.hadoop") .setLevel (Level.OFF) Logger.getLogger ("org.spark_project") .setLevel (Level.OFF) / / 1. Convert to linesRDD val linesRDD:RDD [String] = sc.textFile ("D:/data/spark/topn.txt") / / 2. Convert to pairsRDD val pairsRDD:RDD [(String, String)] = linesRDD.map (line = > {val fields = line.split (") val subject = fields (0). Trim () val name = fields (1). Trim () val score = fields (2). Trim () (subject, name +" + score) / / ("chinese") "zs 90")}) / / 3. Convert to gbkRDD val gbkRDD:RDD [(String, Iterable [string])] = pairsRDD.groupByKey () println ("= TopN =") gbkRDD.foreach (println) / / (english,CompactBuffer (ww 56, zq 88, ts 87, ys 67, mz 77, gk 96)) / / (chinese,CompactBuffer (ls 91, zs 90, zl 76, wb 95, sj 74, yj 98)) / 4. Convert to retRDD val retRDD:RDD [(String, Iterable [string])] = gbkRDD.map (tuple = > {var ts = new mutable.TreeSet [String] () (new MyOrdering ()) val subject = tuple._1 / / chineseval nameScores = tuple._2 / / ("ls 91", "ww 56", "zs 90",...) For (nameScore 3) {/ / if the size is greater than 3 Then pop up the last score ts = ts.dropRight (1)} (subject, ts)}) println ("= TopN after =") retRDD.foreach (println) sc.stop ()} / / the sort comparison rules of treeSet used for sorting, according to the demand. The format for descending class MyOrdering extends Ordering [String] {override def compare (x: String, y: String): Int = {/ / x or y is: "zs 90" val xFields = x.split ("") val yFields = y.split (") val xScore = xFields (1). ToInt val yScore = yFields (1). ToInt val ret = yScore-xScore ret}}

The output is as follows:

= before TopN = (chinese,CompactBuffer (ls 91, zs 90, zl 76, wb 95, sj 74, yj 98)) (english,CompactBuffer (ww 56, zq 88, ts 87, ys 67, mz 77, gk 96)) = after TopN = (chinese,TreeSet (yj 98, wb 95, ls 91)) (english,TreeSet (gk 96, zq 88, ts 87)) solve with combineByKey

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.core.p3import org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, SparkContext} import org.apache.spark.rdd.RDDimport scala.collection.mutable/** * use the combineByKey operator to optimize the previous TopN problem * about the use of the combineByKey operator, you can refer to my blog article, which has very detailed examples * be sure to master Because it is very important * / object _ 07SparkTopNOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 07SparkTopNOps.getClass (). GetSimpleName () val sc = new SparkContext (conf) Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) Logger.getLogger ("org.apache.hadoop") .setLevel (Level. OFF) Logger.getLogger ("org.spark_project") .setLevel (Level.OFF) / / 1. Convert to linesRDD val linesRDD:RDD [String] = sc.textFile ("D:/data/spark/topn.txt") / / 2. Convert to pairsRDD val pairsRDD:RDD [(String, String)] = linesRDD.map (line = > {val fields = line.split (") val subject = fields (0). Trim () val name = fields (1). Trim () val score = fields (2). Trim () (subject, name +" + score) / / ("chinese") "zs 90")}) println ("= TopN before =") pairsRDD.foreach (println) / / (chinese,sj 74) / / (chinese,ls 91) / / (english,ts 87) / / (english,ww 56) / / (english,ys 67) / / (chinese,zs 90) / / (english) Mz 77) / / (chinese,zl 76) / / (chinese,yj 98) / / (english,zq 88) / / (english,gk 96) / / (chinese,wb 95) / / 3. Convert to cbkRDD val cbkRDD:RDD [(String, mutable.TreeSet [string])] = pairsRDD.combineByKey (createCombiner, mergeValue, mergeCombiners) println ("= TopN =") cbkRDD.foreach (println) / / (chinese,TreeSet (yj 98, wb 95, ls 91)) / / (english,TreeSet (gk 96, zq 88, ts 87)} / / create a container. A treeSet is returned here. Container def createCombiner (nameScore: String) as value of the same key in each partition: mutable.TreeSetString = {/ / nameScore format: "zs 90" / / specify collation MyOrdering, sort descending val ts = new mutable.TreeSet [String] () (new MyOrdering ()) ts.add (nameScore) ts} / / merge value with the same key in the partition At the same time, use treeSet to sort def mergeValue (ts:mutable.TreeSet [String], nameScore:String): mutable.TreeSet [string] = {ts.add (nameScore) if (ts.size > 3) {/ / if there are more than 3, delete one and return the collection in ts.dropRight (1) / / scala. But a new collection} ts} / merges value collections with the same key in different partitions, and uses treeSet to sort def mergeCombiners (ts1:mutable.TreeSet [String], ts2:mutable.TreeSet [String]): var newTS. Treeset [string] = {var newTS = new mutable.TreeSet [String] () (new MyOrdering ()) / / add the value of the collection in partition 1 to the new treeSet Simultaneously sort and control the size for (nameScore 3) {/ / if the number is greater than 3, delete one and then assign a value to itself newTS = newTS.dropRight (1)}} / / add the value of the collection in partition 2 to the new treeSet Sort and control the size for (nameScore 3) {/ / if the number is greater than 3, delete one and then assign it to itself newTS = newTS.dropRight (1)}} newTS}}

The output is as follows:

= before TopN = (chinese,ls 91) (chinese,sj 74) (english,ww 56) (english,ts 87) (chinese,zs 90) (english,ys 67) (chinese,zl 76) (english,mz 77) (english,zq 88) (chinese,yj 98) (chinese,wb 95) (english,gk 96) = after TopN = (english,TreeSet (gk 96, zq 88, ts 87)) (chinese,TreeSet (yj 98, wb 95, ls 91))

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report