In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to create Spark RDD". In daily operation, I believe many people have doubts about how to create Spark RDD. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to create Spark RDD". Next, please follow the editor to study!
Overview of RDD 1.1.What is RDD?
RDD (Resilient Distributed Dataset), called elastic distributed dataset, is the most basic data abstraction in Spark. It represents an immutable, partitioned set of elements that can be computed in parallel. RDD has the characteristics of data flow model: automatic fault tolerance, location-aware scheduling and scalability. RDD allows users to cache worksets explicitly in memory when executing multiple queries, and subsequent queries can be reused, which greatly improves query speed.
1.2 Properties of RDD
(1) A group of fragments (Partition), that is, the basic unit of a data set. For RDD, each shard is processed by a computing task and determines the granularity of parallel computing. You can specify the number of RDD shards when creating a RDD. If not, the default value will be used. The default value is the number of CPU Core assigned to the program.
(2) A function that calculates each partition. RDD in Spark is calculated in fragments, and each RDD implements the compute function to achieve this purpose. The compute function composes iterators, eliminating the need to save the results of each calculation.
(3) the dependency relationship between RDD. Each transformation of RDD generates a new RDD, so there is a pipeline-like front-and-back dependency between RDD. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD.
(4) A Partitioner, that is, the slicing function of RDD. Currently, two types of sharding functions are implemented in Spark, one is hash-based HashPartitioner and the other is range-based RangePartitioner. Only for RDD of key-value will there be Partitioner, and the value of Parititioner of non-key-value RDD is None. The Partitioner function determines not only the number of slices for RDD itself, but also the number of slices for parent RDD Shuffle output.
(5) A list that stores the priority location (preferred location) for accessing each Partition. For a HDFS file, this list holds the location of the block where each Partition is located. According to the concept of "mobile data is not as good as mobile computing", Spark assigns computing tasks to the storage location of the data blocks it wants to process as much as possible when scheduling tasks.
1.3 WordCount coarse graphic RDD
Among them, hello.txt
Back to the top.
Second, the creation method of RDD 2.1 is generated by reading the file
Created by datasets of external storage systems, including local file systems, as well as all datasets supported by Hadoop, such as HDFS, Cassandra, HBase, etc.
Scala > val file = sc.textFile ("/ spark/hello.txt")
2.2 create RDD through parallelization
Created by an existing Scala collection.
Scala > val array = Array array: Array [Int] = Array scala > val rdd = sc.parallelize (array) rdd: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [27] at parallelize at: 26 scala >
2.3 other ways
Read the database and other operations. You can also generate RDD.
RDD can be converted through other RDD.
Back to the top.
3. RDD programming API
Spark supports two types of (operator) operations: Transformation and Action
3.1 Transformation
The main thing to do is to generate an existing RDD into another RDD. Transformation has the lazy feature (delayed loading). The code for the Transformation operator is not actually executed. It is only when we encounter an action operator in our program that the code is actually executed. This design allows Spark to run more efficiently.
Commonly used Transformation:
Conversion
Meaning
Map (func)
Returns a new RDD consisting of each input element converted by the func function
Filter (func)
Returns a new RDD consisting of input elements whose value is true after being calculated by the func function
FlatMap (func)
Similar to map, but each input element can be mapped to 0 or more output elements (so func should return a sequence, not a single element)
MapPartitions (func)
Similar to map, but runs independently on each shard of RDD, so when running on a RDD of type T, the function type of func must be Iterator [T] = > Iterator [U]
MapPartitionsWithIndex (func)
Similar to mapPartitions, but func takes an integer argument to represent the index value of the shard, so when running on a RDD of type T, the function type of func must be
(Int, Interator [T]) = > Iterator [U]
Sample (withReplacement, fraction, seed)
The data is sampled according to the proportion specified by fraction, and you can choose whether to replace it with a random number. Seed is used to specify the random number generator seed.
Union (otherDataset)
Returns a new RDD after the union of the source RDD and the parameter RDD
Intersection (otherDataset)
Returns a new RDD after intersecting the source RDD and the parameter RDD
Distinct ([numTasks]))
Deduplicates the source RDD and returns a new RDD
GroupByKey ([numTasks])
Call on a RDD of (KQuery V) and return a RDD of (K, Iterator [V])
ReduceByKey (func, [numTasks])
Call on a (KQuery V) RDD and return a (KMagazine V) RDD. Use the specified reduce function to aggregate the values of the same key. Similar to groupByKey, the number of reduce tasks can be set by a second optional parameter.
AggregateByKey (zeroValue) (seqOp, combOp, [numTasks])
First aggregate according to the partition, and then the total aggregation communicates with the initial value each time, for example: aggregateByKey (0) (_ + _, _ + _) operates on the RDD of k Maple y.
SortByKey ([ascending], [numTasks])
When called on a (KMagi V) RDD, K must implement the Ordered interface and return a RDD sorted by key (KMagol V).
SortBy (func, [ascending], [numTasks])
Similar to sortByKey, but more flexible, the first parameter is based on what sort, the second is how to sort false, and the third is the same number of partitions as the original RDD by default.
Join (otherDataset, [numTasks])
Called on a RDD with types of (K, (V) W) and (K, (V, W)) RDD that returns a pair of elements corresponding to the same key is equivalent to an inner join (finding intersection)
Cogroup (otherDataset, [numTasks])
Called on a RDD of type (K _ Iterable,Iterable V) and (K _ Magol W) and returns a RDD of type (K, (Iterable,Iterable))
Cartesian (otherDataset)
The Cartesian product of two RDD becomes a lot of K-hand V.
Pipe (command, [envVars])
Call external program
Coalesce (numPartitions)
The first parameter of repartition is how many zones to divide, and whether the second parameter is shuffle default false less partition to more partition true more partition to less partition false
Repartition (numPartitions)
The shuffle parameter must be the number of zones to be divided into fewer regions and more regions.
RepartitionAndSortWithinPartitions (partitioner)
Re-partitioning + sorting is more efficient than partitioning first and then sorting to operate the RDD of Kramp V
FoldByKey (zeroValue) (seqOp)
This function is used for K aggregate V to do folding and merging. Parameters similar to the first parenthesis are applied to each V value. The second parenthesis function is an aggregate such as: _ + _
CombineByKey
Merge the values of the same key rdd1.combineByKey (x = > x, (a: Int, b: Int) = > a + b, (m: Int, n: Int) = > m + n)
PartitionBy (partitioner)
Partition RDD partitioner is a divider such as new HashPartition (2
Cache
RDD cache, which can avoid double calculation and reduce time. The difference is: persist operator is called internally in cache, cache has a cache level MEMORY-ONLY by default, while persist can choose cache level.
Persist
Subtract (rdd)
Returns the rdd of the rdd where the previous rdd element is not in the last rdd
LeftOuterJoin
LeftOuterJoin is similar to the left outer correlation left outer join in SQL. The returned result is dominated by the previous RDD, and the records that are not associated are empty. It can only be used for associations between two RDD. If you need more than one RDD association, you can associate it several times.
RightOuterJoin
RightOuterJoin is similar to the externally related right outer join in SQL. The returned result is mainly the RDD in the parameters, and the records that are not associated are empty. It can only be used for associations between two RDD. If you need multiple RDD associations, you can associate them several times.
SubtractByKey
SubstractByKey is similar to subtract in basic transformation operations, except that it is for K and returns elements that appear in the main RDD and do not appear in the otherRDD
3.2 Action
To trigger the code to run, we need at least one action operation in a piece of spark code.
Commonly used Action:
action
Meaning
Reduce (func)
All the elements in the RDD are aggregated by the func function, which must be class-switched and parallel-connected.
Collect ()
In the driver, return all elements of the dataset as an array
Count ()
Returns the number of elements of RDD
First ()
Returns the first element of RDD (similar to take (1))
Take (n)
Returns an array of the first n elements of the dataset
TakeSample (withReplacement,num, [seed])
Returns an array consisting of num elements randomly sampled from the dataset. You can choose whether to replace the insufficient parts with random numbers. Seed is used to specify the random number generator seed.
TakeOrdered (n, [ordering])
SaveAsTextFile (path)
Save the elements of the dataset as textfile to the HDFS file system or other supported file system. For each element, Spark will call the toString method to replace it with text in the file.
SaveAsSequenceFile (path)
Saving the elements in the dataset to a specified directory in Hadoop sequencefile format allows HDFS or other file systems supported by Hadoop.
SaveAsObjectFile (path)
CountByKey ()
For a RDD of type (KMagna V), return a map of (KMagneInt), which represents the number of elements corresponding to each key.
Foreach (func)
On each element of the dataset, run the function func to update.
Aggregate
First operate on the partition, in the overall operation
ReduceByKeyLocally
Lookup
Top
Fold
ForeachPartition
3.3 Spark WordCount coding
Using maven to build a project
(1) use scala to write
To view the official website, you need to import 2 dependent packages
Detailed code
SparkWordCountWithScala.scala
Import org.apache.spark.rdd.RDD import org.apache.spark. {SparkConf, SparkContext} object SparkWordCountWithScala {def main (args: Array [String]): Unit = {val conf = new SparkConf () / * if this parameter is not set Default is that you are running in cluster mode * if set to local represents running local mode * / conf.setMaster ("local") / / set the task name conf.setAppName ("WordCount") / / create the SparkCore program entry val sc = new SparkContext (conf) / / read the file to generate RDD val file: RDD [String] = sc.textFile ( "E:\\ hello.txt") / / put each row of data according to Split val word: RDD [String] = file.flatMap (_ .split (",") / / Let each word appear once val wordOne: RDD [(String, Int)] = word.map ((_, 1)) / / word count val wordCount: RDD [(String, Int)] = wordOne.reduceByKey (_ + _) / / sort by the number of word occurrences in descending order val sortRdd: RDD [(String) Int)] = wordCount.sortBy (tuple = > tuple._2,false) / / Save the final result sortRdd.saveAsTextFile ("E:\\ result") sc.stop ()}
Running result
(2) use java jdk7 to write
SparkWordCountWithJava7.java
Import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator Public class SparkWordCountWithJava7 {public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setMaster ("local"); conf.setAppName ("WordCount"); JavaSparkContext sc = new JavaSparkContext (conf); JavaRDD fileRdd = sc.textFile ("E:\\ hello.txt") JavaRDD wordRDD = fileRdd.flatMap (new FlatMapFunction () {@ Override public Iterator call (String line) throws Exception {return Arrays.asList (line.split (",")) .iterator ();}}) JavaPairRDD wordOneRDD = wordRDD.mapToPair (new PairFunction () {@ Override public Tuple2 call (String word) throws Exception {return new Tuple2 (word, 1);}}) JavaPairRDD wordCountRDD = wordOneRDD.reduceByKey (new Function2 () {@ Override public Integer call (Integer i1, Integer i2) throws Exception {return i1 + i2;}}) JavaPairRDD count2WordRDD = wordCountRDD.mapToPair (new PairFunction () {@ Override public Tuple2 call (Tuple2 tuple) throws Exception {return new Tuple2 (tuple._2, tuple._1);}}); JavaPairRDD sortRDD = count2WordRDD.sortByKey (false) JavaPairRDD resultRDD = sortRDD.mapToPair (new PairFunction () {@ Override public Tuple2 call (Tuple2 tuple) throws Exception {return new Tuple2 (tuple._2, tuple._1);}); resultRDD.saveAsTextFile ("E:\\ result7");}}
(3) use java jdk8 to write
Lambda expression
SparkWordCountWithJava8.java
Import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class SparkWordCountWithJava8 {public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setAppName ("WortCount"); conf.setMaster ("local"); JavaSparkContext sc = new JavaSparkContext (conf) JavaRDD fileRDD = sc.textFile ("E:\\ hello.txt"); JavaRDD wordRdd = fileRDD.flatMap (line-> Arrays.asList (line.split (","). Iterator ()); JavaPairRDD wordOneRDD = wordRdd.mapToPair (word-> new Tuple2 (word, 1)); JavaPairRDD wordCountRDD = wordOneRDD.reduceByKey ((x, y)-> x + y); JavaPairRDD count2WordRDD = wordCountRDD.mapToPair (tuple-> new Tuple2 (tuple._2, tuple._1)) JavaPairRDD sortRDD = count2WordRDD.sortByKey (false); JavaPairRDD resultRDD = sortRDD.mapToPair (tuple-> new Tuple2 (tuple._2, tuple._1)); resultRDD.saveAsTextFile ("E:\\ result8");}
3.4 WordCount execution process diagram
Back to the top.
IV. The wide and narrow dependencies of RDD 4.1 the essential inside story of RDD dependencies
Because RDD is a coarse-grained operational dataset, each Transformation operation generates a new RDD, so there is a pipelined front-back dependency between RDD; there are two different types of relationships between RDD and its parent RDD (s), namely narrow dependency and wide dependency. The dependencies between RDD are shown in the figure.
As can be seen from the picture:
Narrow dependency: one Partition of each parent RDD is used by one Partition of the largest quilt RDD, such as map, filter, union and other operations will produce narrow dependency; (only child)
Wide dependency: it means that the Partition of a parent RDD will be used by the Partition of multiple child RDD, such as groupByKey, reduceByKey, sortByKey, etc.
It is important to note that there are two situations for join operations:
(1) join in the left half of the figure: if two RDD are performing join operations, the partition of one RDD is only join with the known number of Partition in the other RDD, then this type of join operation is narrowly dependent, such as the join operation (join with inputs co-partitioned) in the left half of figure 1.
(2) join in the right half of the figure: the join operation in other cases is a wide dependency, such as the join operation (join with inputs not co-partitioned) in the right half of figure 1. Since all the partition of the parent RDD is required for join conversion, this involves shuffle, so this type of join operation is also a wide dependency.
Summary:
Here we define narrow and wide dependencies in terms of the number of partition of the parent RDD, so it can be summed up in one sentence: if a Partition quilt of the parent RDD is used by a Partition of the RDD, otherwise it is a wide dependency. Because it is a fixed number of partition dependencies, the dependency relationship between RDD is narrow dependency; from this we can draw a corollary: narrow dependency includes not only one-to-one narrow dependency, but also a fixed number of narrow dependencies.
An understanding of a fixed number of narrow dependencies: that is, the number of Partition that the partition of the child RDD depends on the parent RDD does not change with the change of the RDD data size. In other words, no matter the amount of data is 100T or 1p, in the narrow dependency, the number of partition of the parent RDD that the child RDD depends on is determined, while the wide dependency is shuffle level, and the larger the amount of data, the more the number of parent RDD that the child RDD depends on, so the number of partition of the parent RDD that the child RDD depends on will become more and more.
4.2 View of data flow under dependency
In spark, the DAG graph (directed acyclic graph) is divided into different stages according to the dependency relationship between RDD. For narrow dependencies, because of the certainty of partition dependencies, the conversion processing of partition can be completed in the same thread, and narrow dependencies are divided into the same stage by spark. For wide dependencies, the next stage can start the next calculation only after the parent RDD shuffle processing is completed.
Therefore, the overall idea of spark dividing the stage is: push back to forward, break when it encounters a wide dependency, and divide it into a stage; that will add the RDD to the stage if it encounters a narrow dependency. Therefore, in figure 2, RDD C department RDD D department RDD E D journal RDDF is built in a stage, RDD An is built in a separate Stage, and RDD B and RDD G are built in the same stage.
In spark, there are two types of Task: ShuffleMapTask and ResultTask
Simply put, the last phase of DAG generates a ResultTask for each resulting partition, that is, the number of Task in each Stage is determined by the number of Partition of the last RDD in that Stage! All other stages generate ShuffleMapTask; so that it is called ShuffleMapTask because it needs to shuffle its calculation results to the next stage; that is, the stage1 and stage2 in the figure above are equivalent to the Mapper in mapreduce, and the stage3 represented by ResultTask is equivalent to the reducer in mapreduce.
In the previous hands-on operation of a wordcount program, it can be known that the basic equivalent operators of Mapper and Reducer in MapReduce operation in Hadoop are map and reduceByKey; in spark, but the difference lies in: MapReduce in Hadoop is inherently sorted; and reduceByKey is only reduce according to Key, but spark has other operators in addition to these two operators; therefore, in this sense, Spark is more abundant than Hadoop.
At this point, the study on "how to create Spark RDD" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.