Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

RDD of Spark Core

2025-03-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

(1) introduction of RDD

   

   RDD (Resilient Distributed Dataset), called distributed dataset, is the most basic data abstraction in Spark. It represents a collection of immutable (data in RDD, cannot be added or deleted), partitioned, and elements can be calculated in parallel.

   has the characteristics of data flow model, automatic fault tolerance, location-aware scheduling and scalability. RDD allows users to cache working sets in memory as they appear when executing multiple queries. Subsequent queries can reuse working sets, which greatly improves the query speed.

   RDD can be understood in three ways:

   -dataset: a RDD is an abstraction of a data set and a logical view of data that exists on a complex physical medium. From an external point of view, RDD can indeed be seen as an encapsulated collection of data with extended features such as fault tolerance.

   -distributed: RDD data may be stored on multiple nodes on disk or in memory, also known as multi-level storage.

   -resiliency: although the data stored inside RDD is read-only, we can modify the partition structure of parallel computing units, that is, the number of partitions, through repartition conversion operations.

   in short: RDD is a large collection that loads all the data into memory, making it easy to reuse multiple times. Its data can be on multiple nodes, and the RDD can be saved in memory, when a certain stage of RDD is lost, there is no need to recalculate, only need to extract the last RDD, in the corresponding calculation.

(2) attributes of RDD

  

  1) A list of partitions (a group of fragments, the basic unit of a data set)

   A partition is usually associated with a task direction, and the number of partitions determines the granularity of parallelism. The number of partitions can be specified when the RDD is created, and if not, it is determined by the number of cores of the node by default. Eventually each partition will be mapped to a Block in the BlockManager, and this Block will be used by the next task for calculation.

  2) A function for computing each split (operator)

Every RDD of    implements compute, which is used for partition calculation.

  3) A list of dependencies on other RDDs (dependency between RDD)

Each transformation of    RDD generates a new RDD, so there is a pipelined front-and-back dependency between RDD. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD.

Wide and narrow dependencies:

Narrow dependency (complete dependency): a parent partition uniquely corresponds to one child partition, for example: map operation

Wide dependency (partial dependency): a parent partition corresponds to multiple child partitions, such as reduce and group operations

Distinguish between wide and narrow dependencies: whether there is a shuffle operation during the execution of this operator.

  4) Optionally a Partitioner for key-value RDDs (partition function)

Partitioner is available only for the RDD of key-value, and the value of Parititioner for non-key-value RDD is None. The Partitioner function determines not only the number of slices for RDD itself, but also the number of slices for parent RDD Shuffle output.

  5) Optionally a list of preferred locations to compute each split on

   a list that stores the priority location (preferred location) for each Partition. According to the concept of "mobile data is not as good as mobile computing", Spark assigns computing tasks to the storage location of the data blocks it wants to process as much as possible when scheduling tasks. And this list stores the priority location of each partition.

(3) API of RDD (correlation operator)

There are two medium forms in    RDD programming: Transformation (transformation) and Action (action).

   Transformation: means to put a RDD-> RDD.

   Action: means to put a RDD----  collection or scala object.

  1) creation of RDD: object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext () / / val arr=Array is created from an existing Scala data set (1Jue 2) 3) val arr1RDD: RDD [Int] = sc.parallelize (arr) val arr2RDD: RDD [Int] = sc.makeRDD (arr) / / created by data from external storage systems (HDFS, HBase...) val HDFSRDD: RDD [String] = sc.textFile ("/ data/input")}   2) Transformation:

   official website: http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

   Note: all Transformation in RDD are delayed loading, that is, they do not calculate the results directly, on the contrary, they just remember a conversion action applied to the underlying data set, and these transformations actually run only when a request to return a Driver action occurs.

Map () operator:

Val HDFSRDD: RDD [String] = sc.textFile ("/ data/input") / * map operator, which returns a new RDD. The RDD is composed of * / val mapRDD: RDD [(String, Int)] = HDFSRDD.map (ele= > (ele,1)) after each input element is transformed by the function function.

FlatMap () operator:

Val arr=Array ("hive hbase hadoop", "spark hadoop", "yarn hdfs") val lineRDD: RDD [String] = sc.parallelize (arr) / * flagMap: similar to map, but the input elements of each element can be * mapped to 0 or more output elements (a sequence is returned

Filter () operator:

Val arr=Array val arrRDD: RDD [Int] = sc.parallelize (arr) / * filter filter: returns a new RDD consisting of input elements with a value of * true calculated by the func function * / val filterRDD: RDD [Int] = arrRDD.filter (num= > num%2==0)

MapPartitions () operator:

Val hdfsRDD: RDD [String] = sc.textFile ("/ data/input") / * the only difference between mapPartitions and map is What mapPartitions iterates over is a partition, * and every element that map iterates through, the mapPartitions parameter is an iterative object, and also returns an iterative object * / val partitionRDD: RDD [String] = hdfsRDD.mapPartitions ((x: Iterator [String])) = > {val temp = x.toList.map (line = > line + "!") Temp.toIterator})

MapPartitionsWithIndex () operator:

Val hdfsRDD: RDD [String] = sc.textFile ("/ data/input") / * the first parameter is the partition number: the partition number is an uninterrupted serial number starting with 0 * the second parameter is the same as mapPartitions * / val partitionRDD: RDD [String] = hdfsRDD.mapPartitionsWithIndex ((parnum:Int) X: Iterator [String]) = > {println (parnum) / / partition number val temp = x.toList.map (line = > line + "!") Temp.toIterator})

Sample () operator:

The val list=1 to 5000 / * sample method has three parameters: * withReplacement: indicates whether there is a returned extraction (false does not put back, true: put back) * fraction: the percentage of the sample space to the population, (in the form of scores) 0x+y) sumRDD.foreach (println) / / print: (hbase,36) (math,18) (hbase,18)

SortByKey () operator:

Val list=List (("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) / * sortByKey: it is called on a (KQuery V) RDD, and K must implement the Ordered interface. * return a RDD * / / grouping sorted by key, and calculate the total score. Sort val stuRDD: RDD [(String, Int)] = sc.parallelize (list) val sumRDD: RDD [(String, Int)] = stuRDD.reduceByKey ((x, y) = > xPowery) sumRDD.foreach (println) / / print: (hbase,36) (math) 18) (hbase,18) val sortRDD: RDD [(String, Int)] = sumRDD.map (kv= > (kv._2,kv._1)) .sortByKey () .map (kv= > (kv._2,kv._1)) sortRDD.foreach (println)

SortBy operator:

Val list=List (("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) / * sortBy (func, [ascending], [numTasks]) * similar to sortByKey, but more flexible * what sort is the first parameter * how is the second sort, true positive order False reverse order * third sorted partition number, the default is the same as the original RDD * / / grouping, find the total score, sort val stuRDD: RDD [(String, Int)] = sc.parallelize (list) val sumRDD: RDD [(String, Int)] = stuRDD.reduceByKey ((x, y) = > x _ partition) sumRDD.foreach (println) / / print: (hbase,36) (math) 18) (hbase,18) val sortRDD: RDD [(String, Int)] = sumRDD.sortBy (kv= > kv._2,false,2)

AggregateByKey () operator:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext () / * * aggregateByKey (zeroValue) (seqOp,combOp, [numTasks]) * aggregate first and then aggregate by partition Each time to communicate with the initial value * zeroValue: initial value * seqOp: iterative operation, merge each element in the RDD with the initial value * combOp: the final merge of the partition result * numTasks: number of partitions * aggregate+groupByKey=aggregateByKey * aggregate for a single value RDD,aggregateByKey pair (K V) RDD * / / aggregateval list = Array (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD: RDD [Int] = sc.parallelize (list) / / average / * seqOp: (U, T) = > U * combOp: (U, U) = > U * u: (Int,Int) sum Total number of times * / val result: (Int, Int) = listRDD.aggregate (0,0) ((u: (Int, Int), x: Int) = > {(u.room1 + x, u.room2 + 1)}, (U1: (Int, Int), U2: (Int, Int)) = > {(u1.room1 + u2.room1) ) println (result._1 / result._2) / / aggregateByKey has been based on (k) V) k for grouping, the following operations Is the average val list1 = List (("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) val stuRDD: RDD [(String, Int)] = sc.parallelize (list1) val reslutRDD2: RDD [(String, (Int, Int))] = stuRDD.aggregateByKey ((0,0)) ((x: (Int, Int)) Y: Int) = > {(x.room1 + y, x.room2 + 1)}, (x: (Int, Int), y: (Int, Int)) = > {(x.room1 + yy.room1) X.room2 + y.room2) reslutRDD2.foreach (kv= > {val name=kv._1 val avg=kv._2._1.toDouble/kv._2._2})}}

FoldLeft () operator: (not the operator of spark, but the advanced operation of scala)

/ * foldLeft * (zeroValue: t) initial value * (B, A) = > B B is a tuple, B.incremental 1 represents cumulative elements, B.room2 represents the number A denotes the next element * / aggregateval list = Array (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val result: (Int, Int) = list.foldLeft ((0Magin0)) ((x, y) = > {(x.class1) println (result._1.toDouble/result._2))

CombineByKey () operator:

X.room2 + 1)}, (x: (Int, Int), y: (Int, Int) = > {(x.room1 + Int, Int)}) resultRDD.foreach {case (name, (sum) Count)) = > {val avg=sum.toDouble/count println (s "${name}: ${avg}")}

Connect operation:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext (conf) val arr1 = Array (1,2,4,5) val arr1RDD = sc.parallelize (arr1) val arr2 = Array (4,5,6) 7) val arr2RDD = sc.parallelize (arr2) / / cartesian Cartesian product val cartesianRDD: RDD [(Int, Int)] = arr1RDD.cartesian (arr2RDD) / / union: connect val unionRDD: RDD [Int] = arr1RDD.union (arr2RDD) / / subtract Difference val sbutractRDD: RDD [Int] = arr1RDD.subtract (arr2RDD) / / join val list1 = List (("a", 1), ("b", 2), ("c", 3)) val list1RDD = sc.parallelize (list1) val list2 = List (("a", "zs"), ("b") "sl") val list2RDD = sc.parallelize (list2) / * perform join operation based on key in tuple The same key returns RDD [(String, (Int, String))] (key) to the connection * Connection result) * / val joinRDD: RDD [(String, (Int, String))] = list1RDD.join (list2RDD) / / cogroup / * * (String key, * (Iterable [Int] arr1) set of corresponding key all value * Iterable [string])) set of all value of corresponding key in arr2 * / val cogroupRDD: RDD [(String, (Iterable [Int], Iterable [string]))] = list1RDD.cogroup (list2RDD)}}

Partition operation:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext (conf) val hdfsRDD: RDD [String] = sc.textFile ("/ data/word.txt") / * indicates that after the filter operation is performed A large amount of data is filtered, resulting in the previously set number of task partitions. * processing the remaining data leads to a waste of resources. In order to make reasonable and efficient use of resources, * task can be redefined. The number of partitions in the coalesce method must be less than the previously set number of partitions. * / hdfsRDD.coalesce (2) / / disturb the data and re-partition. The partition rule is random partition hdfsRDD.repartition (3) / / Custom partition rules (Note Only available in RDD with key-value) var arr1 = Array (("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2) ("b", 2), ("e", 2), ("b", 2), ("f", 2), ("g", 2) ("h", 2)) val arrRDD: RDD [(String, Int)] = sc.parallelize (arr1,4) arrRDD.partitionBy (new MyPartitioner (3))}} class MyPartitioner (val numPTN:Int) extends Partitioner {/ / number of partitions override def numPartitions: Int = numPTN / / partition rules override def getPartition (key: Any): Int = {val num=key.hashCode () & Integer.MAX_VALUE%numPTN return num}

Summary:

  -Transformation still returns a RDD

  -it uses the design pattern of chained calls, which calculates one RDD and transforms it into another RDD, and then the RDD can be converted again. The process is distributed.

  3) Action:

Common actions:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext (conf) val list = List (("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18) val listRDD: RDD [(String) Int)] = sc.parallelize (list) / / action rdd-map listRDD.reduceByKeyLocally ((XMagney) = > xperiy) / / the purpose of calling collect is to trigger all calculations Finally, collect all the data of the current caller RDD and return it to the client. If the amount of data is large, Carefully use listRDD.collect () / / count the number of records in RDD listRDD.count () / / take out the first record in RDD listRDD.first () / / take out the first few records in RDD listRDD.take (5) / / Random sampling listRDD.takeSample (false,20) / / according to a certain format The first few listRDD.top (50) / / in ascending or descending order after sorting Take the corresponding number of records (in which elements must inherit Ordered) listRDD.takeOrdered (3) / count how many listRDD.countByKey () / / count how many elements there are in each key listRDD.countByValue () / / traverse each element in RDD listRDD.foreach (kv= > {}) / / traverse RDD by partition The element in listRDD.foreachPartition (kv= > {}) / / sets the result of RDD Save to the appropriate file system (note that this directory must be a nonexistent directory) listRDD.saveAsTextFile ("/ data/output")}}

Summary: the return value of Action is not a RDD. It is either a collection of scala, a value, or empty. Eventually return to the Driver program, or write RDD to the file system.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 260

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report