In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "what are the RDD, operator and persistence operator of spark". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "what are the RDD, operator and persistence operators of spark?"
One: introduction of RDD
RDD (Resilient Distributed Dateset), flexible distributed dataset.
Five features of RDD:
1.RDD is made up of a series of partition.
two。 The function acts on each partition (split).
There are a series of dependencies between 3.RDD.
4. The divider is used on the RDD of KMagazine V format.
5.RDD provides a range of best computing locations.
Note:
The underlying package of the textFile method is to read the MR to read the file. Split the file before reading, and the default split size is a block size.
RDD does not actually store data, which is easy to understand here, which is temporarily understood as storing data.
What is RDD in KMagar V format?
If all the data stored in the RDD are binary objects, then this RDD is called the RDD in KMagne V format.
Where does RDD reflect the resilience (fault tolerance)?
There is no limit to the number and size of partition, which reflects the flexibility of RDD.
RDD dependencies, RDD can be recalculated based on the previous RDD.
Where does RDD reflect the distribution?
RDD is composed of Partition, and partition is distributed on different nodes. RDD provides the best location for computing, reflecting data localization. It embodies the concept of "computing mobile data does not move" in big data.
How RDD is created
XX.parallelize () XX.makeRDD ()
Second, operator
The Transformations conversion operators are as follows:
Filter filters the number of records that meet the criteria, true retains, and false filters out map to transform each data item in a RDD into a new element feature through function mapping in map: input one, output a data flatMap first map and then flat. Similar to map, each input item can be mapped to a sample random sampling operator of 0 to multiple output items. According to the decimal number passed in, the sampling reduceByKey will process the same Key according to the corresponding logic, sortByKey/sortBy will act on the RDD of KMagi V format, and key will be sorted in ascending or descending order on the RDD of KMagazine V format. Connect according to K, and return (K, (VMagne W)) for (KMagazine V) join (KMagol W). The number of partitions after join is the same as the one with more parent RDD partitions. Union merges two datasets. The two datasets should be of the same type, and the number of partitions that return the new RDD is the sum of the combined RDD partitions. Intersection takes the intersection of two data sets and returns the same set of new RDD and parent RDD partitions as the same subtract takes the difference of two data sets. As a result, the number of partitions of RDD is the same as that of the RDD before subtract. MapPartitions is similar to map in that the unit of traversal is the data on each partition. What distinct actually uses internally is map+reduceByKey+map, which means cogroup returns a dataset (K, (Iterable,Iterable)) when calling data of type (KMagol V) and (KMagol W). The partition of the child RDD is similar to that of the parent RDD, mapPartitionWithIndex is similar to mapPartitions, in addition, it also carries the index value of the partition. Repartition increases or decreases partitions. It produces shuffle. (multiple partitions to a partition will not produce shuffle.) coalesce is used to reduce partitions, and the second parameter is whether shuffle is generated in the process of reducing partitions. True does not generate shuffle in order to generate shuffle,false. The default is false. If the number of partitions set by coalesce is more than the number of partitions of the original RDD, the second parameter set to false will not work. If set to true, the effect is the same as repartition. That is to say, repartition (numPartitions) = coalesce (numPartitions,true) groupByKey acts on the RDD of KMagna V format. Grouped according to Key. Act on (KMagol V) and return (KMagol Iterable). Zip changes the elements in two RDD (KV format / non-KV format) into a RDD in KV format, and the number of elements in each partition of the two RDD must be the same zipWithIndex. This function combines the element in RDD and the index number of this element in RDD (starting with 0) into a (KMagneV) pair.
The Action action operators are as follows:
Count returns the number of elements in the dataset. After the result calculation is completed, the take (n) will return a set firstfirst=take (1) containing the first n elements of the dataset, return the first element of the dataset, foreach loop through each element of the dataset, and run the corresponding logical collect to retrieve the calculation results to the driver side.
The difference between conversion operator and action operator:
In most cases, the heart returned by the conversion operator is the RDD type, and most of the types returned by the action operator are often ordinary types. When you don't know which type the operator belongs to, you can use this conjecture.
The last one is called the control operator, which is cache, persist, checkpoint,cache and persist are all lazy. There must be an action class operator that triggers execution. The checkpoint operator can not only persist RDD to disk, but also cut off the dependency between RDD.
III. Case list
Case column 1:cache
Request data: words data has about 17 million records and the file size is about 200m
Results:
Not used for persistence operator processing: 56995 milliseconds cache: 274 milliseconds
Scala code
Def main (args: Array [String]): Unit = {/ * cache*/ val conf = new SparkConf () conf.setAppName ("spark04") conf.setMaster ("local") val context = new SparkContext (conf) val sc = context.textFile (". / words") val result = sc.flatMap (_ .split (")) .map ((_ 1). ReduceByKey ((_ + _)) val startTime1 = System.currentTimeMillis () / / persists the data of RDD to memory by default. Cache is lazy execution, when using persist, with cache, the efficiency should be higher / / cannot be written like this: rdd.cache (). Count () returns not persistent RDD, but a value of result.count () println ("not used as a persistence operator:" + (System.currentTimeMillis ()-startTime1) + "millisecond") val startTime2 = System.currentTimeMillis () / / persist RDD data to memory by default. Cache is lazy execution result.cache () / / cache and persist are lazy execution. There must be an action class operator that triggers the execution of result.count () println ("cache:" + (System.currentTimeMillis ()-startTime2) .toString + "millisecond")}.
List 2: persist
Data: words data has about 17 million records, and the file size is about 200m.
Results:
Not used for persistence operator processing: 55350 milliseconds persist: 312 milliseconds
Scala code
Def main (args: Array [String]): Unit = {/ * cache*/ val conf = new SparkConf () conf.setAppName ("spark04") conf.setMaster ("local") val context = new SparkContext (conf) val sc = context.textFile (". / words") val result = sc.flatMap (_ .split (")) .map ((_ 1). ReduceByKey ((_ + _)) val startTime1 = System.currentTimeMillis () result.count () println ("not used as a persistence operator:" + (System.currentTimeMillis ()-startTime1) + "millisecond") val startTime2 = System.currentTimeMillis () / / persists the RDD data to memory by default. Cache is lazy execution, when using persist, when used in conjunction with cache, the efficiency should be high / / cannot be written like this: rdd.persist (). Count () returns not a persistent RDD, but a value of / / persist () internal is to call cache () result.persist () / / 1cache and persist are lazy execution, there must be an action class operator to trigger execution. Result.count () println ("persist:" + (System.currentTimeMillis ()-startTime2) .toString + "millisecond")}}
List 3: checkpoint
Data: words data has about 17 million records, and the file size is about 200m.
Results:
Not used for persistence operator processing: 55575 milliseconds checkpoint: 55851 milliseconds at the same time more data files persisted to the hard disk in the specified output path, the speed is much slower because it needs to be persisted to the hard disk, the running result checkpoint: 54621 milliseconds is more than 1000 milliseconds less than the previous one after using result.cache (). After result uses cache, the previously processed data is put into the cache, so it is slightly faster.
Scala code
Def main (args: Array [String]): Unit = {/ * cache*/ val conf = new SparkConf () conf.setAppName ("spark04") conf.setMaster ("local") val context = new SparkContext (conf) / / set checkpoint output path context.setCheckpointDir (". / checkpoint_file") / / data source val sc = context.textFile (". / words") val result = sc.flatMap (_ .split (")) .map ((_) 1). ReduceByKey ((_ + _)) val startTime1 = System.currentTimeMillis () result.count () println ("not used as a persistence operator:" + (System.currentTimeMillis ()-startTime1) + "millisecond") val startTime2 = System.currentTimeMillis () / / can't do this: rdd.checkpoint (). Count () result.checkpoint () / * An action atom will start a job 1. When the job of RDD is finished, it will be traced back from finalRDD to front. two。 When the checkpoint method is called back to a RDD, a tag is made to the current RDD. The 3.Spark framework automatically launches a new job, recalculates the RDD's data, and persists the data to the HDFS. Optimization: before performing checkpoint on RDD, it is best to execute cache on this RDD, so that the newly started job only needs to copy the data in memory to HDFS, eliminating the recalculation step * / result.cache () result.count () println ("checkpoint:" + (System.currentTimeMillis ()-startTime2) .toString + "millisecond")}.
Summary:
Note for cache and persist: both 1.cache and persist are lazy execution, and there must be an action class operator to trigger execution. The return value of the 2.cache and persist operators can be assigned to a variable, and to use this variable directly in other job is to use persistent data. The unit of persistence is partition. 3.cache and persist operators cannot immediately follow action operators. Data persisted by the 4.cache and persist operators will be purged when the applilcation execution is complete. Error: rdd.cache (). Count () does not return a persisted RDD, but a numeric value. Checkpointcheckpoint persists RDD to disk and cuts off dependencies between RDD. Checkpoint directory data will not be erased after application execution. The implementation principle of checkpoint: 1. When the job of RDD is finished, it will be traced back from finalRDD to front. two。 When the checkpoint method is called back to a RDD, a tag is made to the current RDD. The 3.Spark framework automatically launches a new job, recalculates the RDD's data, and persists the data to the HDFS. Optimization: before performing checkpoint on RDD, it is best to execute cache on this RDD first, so that the newly started job only needs to copy the data in memory to HDFS, omitting the step of recalculation to this point. I believe that we have a deeper understanding of "spark's RDD, operator, persistence operator respectively", might as well come to the actual operation! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
QString str;LPXXWSRE wStr = str.toStdString () .c_str ()
© 2024 shulou.com SLNews company. All rights reserved.