Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark note collation (5): Spark RDD persistence, broadcast variables and accumulators

2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

Spark RDD persistence how RDD persistence works

One of the most important features of Spark is the ability to persist RDD in memory. When performing a persistence operation on a RDD, each node persists the partition of the RDD it operates on into memory, and later uses the in-memory cached partition directly for repeated use of the RDD. In this way, for scenarios where multiple operations are performed repeatedly for a RDD, you only need to calculate the RDD once, and then use the RDD directly, rather than repeatedly calculating the RDD.

Clever use of RDD persistence, even in some scenarios, can improve the performance of spark applications by a factor of 10. RDD persistence is very important for iterative algorithms and fast interactive applications.

To persist a RDD, simply call its cache () or persist () method. When the RDD is calculated for the first time, it is cached directly in each node. Moreover, the persistence mechanism of Spark is fault-tolerant automatically. If any partition of the persisted RDD is lost, the Spark will automatically recalculate the partition using the transformation operation through its source RDD.

The difference between cache () and persist () is that cache () is a simplified way of persist (). The underlying layer of cache () is the no-parameter version of the called persist (), which also calls persist (MEMORY_ONLY) to persist the data into memory. If you need to remove the cache from memory, you can use the unpersist () method.

RDD persistence usage scenario

1. Load a large amount of data into RDD for the first time

2. Update RDD Cache data frequently and dynamically, so it is not suitable to use Spark Cache or Spark lineage

RDD persistence strategy

The choice of persistence strategy

​ by default, the highest performance is of course MEMORY_ONLY, but only if your memory is large enough to hold all the data for the entire RDD. Because there is no serialization and deserialization operation, this part of the performance overhead is avoided; the subsequent operator operations on this RDD are all based on data operations in pure memory, do not need to read data from disk files, and have high performance; and do not need to copy a copy of the data and transfer it to other nodes remotely. However, it must be noted that in the actual production environment, there may be limited scenarios in which this strategy can be used directly. If there is a lot of data in RDD (for example, billions), using this persistence level directly will lead to an exception of OOM memory overflow in JVM.

​ if a memory overflow occurs when using the MEMORY_ONLY level, it is recommended that you try the MEMORY_ONLY_SER level. This level serializes RDD data and then saves it in memory, where each partition is just an array of bytes, greatly reducing the number of objects and memory footprint. The main performance overhead of this level over MEMORY_ONLY is the cost of serialization and deserialization. However, the subsequent operators can operate based on pure memory, so the overall performance is still relatively high. In addition, the problem that may occur is the same as above. If there is too much data in the RDD, it is also an exception that may cause the OOM memory overflow.

​ if the level of pure memory is not available, it is recommended that you use the MEMORY_AND_DISK_SER policy instead of the MEMORY_AND_DISK policy. Because now that we have reached this stage, it means that the amount of data in RDD is so large that the memory can not be put down completely. There is less data after serialization, which can save memory and disk space. At the same time, the strategy will give priority to try to cache the data in memory as far as possible, and the memory cache will not be written to disk.

​ generally does not recommend using DISK_ONLY and levels with a suffix of _ 2: because reading and writing data entirely based on disk files can lead to sharp performance degradation, and sometimes it is better to recalculate all RDD. At the level with a suffix of _ 2, all data must be copied and sent to other nodes. Data replication and network transmission can cause greater performance overhead, which is not recommended unless the high availability of the job is required.

Test case

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.core.p3import org.apache.log4j. {Level, Logger} import org.apache.spark.storage.StorageLevelimport org.apache.spark. {SparkConf Persistence of SparkContext} / * Spark RDD * / object _ 01SparkPersistOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkPersistOps.getClass.getSimpleName () val sc = new SparkContext (conf) Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) Logger.getLogger ("org.apache.hadoop") .setLevel (Level.OFF) var start = System.currentTimeMillis () val linesRDD = sc.textFile ("D:/data/spark/sequences.txt") / / linesRDD.cache () / / linesRDD.persist (StorageLevel.MEMORY_ONLY) / / perform the calculation of the first RDD val retRDD = linesRDD.flatMap (_ .split (")) .map ((_ ReduceByKey (_ + _) / / retRDD.cache () / / retRDD.persist (StorageLevel.DISK_ONLY) retRDD.count () println ("time consumed by the first calculation: + (System.currentTimeMillis ()-start) +" ms ") / / perform the second RDD calculation start = System.currentTimeMillis () / / linesRDD.flatMap (_. Split ("). Map ((_) 1). ReduceByKey (_ + _). Count () retRDD.count () println ("time consumed by the second calculation:" + (System.currentTimeMillis ()-start) + "ms") / / after the persistence is over To uninstall data / / linesRDD.unpersist () sc.stop ()}}

Set the relevant persistence strategy, and then observe the execution time can have a more intuitive understanding.

Shared variable

Provides two limited types of shared variables, broadcast variables and accumulators.

Before the introduction, let's take a look at the following example:

Package cn.xpleaf.bigdata.spark.scala.core.p3import org.apache.log4j. {Level, Logger} import org.apache.spark. {SparkConf, SparkContext} / * * shared variables * these local variables or member variables that we declare in dirver can be used directly in transformation, * but after the transformation operation, the final result will not be reassigned to the corresponding variables in dirver. * because the operation of transformation is triggered through action, and the operation of transformation is performed through * DAGScheduler to transfer the code package serialization to the Executor in each Worker node for execution. * the variables executed in transformation are variables on their own nodes, not the original variables on dirver. We just copy the corresponding variables on * driver. This case also reflects that we need to have some variables corresponding to the operation. Sharing * * spark on driver and executor provides us with two solutions-- two shared variables * broadcast variables * accumulator * / object _ 02SparkShareVariableOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkPersistOps.getClass.getSimpleName ()) val sc = new SparkContext (conf) Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) Logger.getLogger ("org.apache.hadoop") .setLevel (Level.OFF) val linesRDD = sc.textFile ("D:/data/spark/hello.txt") val wordsRDD = linesRDD.flatMap (_ .split (")) var num = 0 val parisRDD = wordsRDD.map (word = > { Num + = 1 println ("map--- > num =" + num) (word 1)}) val retRDD = parisRDD.reduceByKey (_ + _) println ("num =" + num) retRDD.foreach (println) println ("num =" + num) sc.stop ()}}

The output is as follows:

Num = 0 hello,3-> num = 1 me,1-> num = 1-me,1-> num = 2-he,1-> num = 2-num = 3-num = 4 (hello,3) (me,1) (he,1) num = 0 broadcast variable

Another shared variable for Spark is the broadcast variable. In general, when many operations of a RDD need to use the variables defined in driver, driver will send the variables to the worker node once for each operation. If the data in this variable is very large, it will result in a high transmission load and reduce the execution efficiency. The use of broadcast variables enables programs to efficiently send a large read-only data to multiple worker nodes, and each worker node only needs to be transmitted once, and executor can directly obtain locally saved copies of data in each operation without multiple transmissions.

To understand it this way, an executor in a worker has five task running. If all five task need this share of shared data, they need to transfer this data to all five task, which is a waste of network resources and memory resources. After using the broadcast variable, you only need to pass it to the worker once.

The process for creating and using broadcast variables is as follows:

Use the SparkContext.brodcast (obj) method on an object of type T, obj, to create a broadcast variable of type Broadcast [T], obj must satisfy Serializable. Access the value of a broadcast variable through its .value () method. In addition, the broadcast process may become a bottleneck due to the serialization time process of variables or the transmission process of serialization variables, while the default Java serialization method used in Spark Scala is usually inefficient, so this process can be optimized by implementing specific serialization methods (such as Kryo) for different data types through the spark.serializer property.

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.core.p3import org.apache.log4j. {Level, Logger} import org.apache.spark.broadcast.Broadcastimport org.apache.spark. {SparkConf, SparkContext} / * * use Spark broadcast variable * * requirements: * user table: * id name age gender (0 | 1) * * requirements: output user information. Gender must be male or female. Cannot be 001SparkPersistOps.getClass.getSimpleName 1 * / object _ 03SparkBroadcastOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkPersistOps.getClass.getSimpleName ()) val sc = new SparkContext (conf) Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) Logger.getLogger ("org.apache.hadoop") .setLevel (Level.OFF) Val userList = List ("001) Val genderMap = Map ("0"-> "female", "1"-> "male") val genderMapBC: broadcast Map [string] String] = sc.broadcast (genderMap) val userRDD = sc.parallelize (userList) val retRDD = userRDD.map (info = > {val prefix = info.substring (0, info.lastIndexOf (",")) / / "001, Liu Qian, 18" val gender = info.substring (info.lastIndexOf (",") + 1) val genderMapValue = genderMapBC.valueval newGender = genderMapValue.getOrElse (gender "male") prefix + "," + newGender}) retRDD.foreach (println) sc.stop ()}

The output is as follows:

001, Liu Qian, 18, female 003, Li Zhijie, 38, female 002, Feng Jian, 28, male 004, Guo Peng, 48, male

Here is a more concise example:

Package cn.xpleaf.spark.p5import org.apache.spark.broadcast.Broadcastimport org.apache.spark. {SparkConf SparkContext} / * @ author xpleaf * @ date 2019-1-10 4:53 PM * / object SampleSpark {def main (args: Array [String]): Unit = {val conf = new SparkConf () .setAppName (s "{SampleSpark.getClass.getSimpleName}") .setMaster ("local [2]") val sc = new SparkContext (conf) val genderMap = Map ("0"-> "female") "1"-> "male") val genderMapBC:Broadcast [Map [String, String]] = sc.broadcast (genderMap) val rdd = sc.parallelize (Seq (("0", "Amy"), ("0", "Spring"), ("0", "Sunny"), ("1", "Mike"), ("1", "xpleaf")) val retRDD = rdd.map {case (sex) Name) = > val genderMapValue = genderMapBC.value (genderMapValue.getOrElse (sex, "male"), name)} retRDD.foreach (println) sc.stop ()}}

The output is as follows:

(female, Amy) (female, Sunny) (female, Spring) (male, Mike) (male, xpleaf)

Of course, this case is just a demonstration of the use of the code, and you can't see how it works.

However, we can analyze its principle. If, when performing a map operation, five task are allocated to an Executor of a Worker for calculation, without using broadcast variables, because Driver will divide our code into different stage through DAGScheduler, and let taskScheduler,taskScheduler distribute the encapsulated task to the Excutor of Worker. That is to say, in this process, our genderMap will also be encapsulated in this task. Obviously, the granularity of this process is task-level, and each task will encapsulate a genderMap. When the amount of data in this variable is small, there is no problem. Then, when the amount of data is very large, five copies of the same data are transferred to an Excutor at the same time, which is a waste of bandwidth resources in the network. The use of the broadcast variable can avoid this problem. After broadcasting the genderMap, it only needs to be sent to Excutor, and it will be saved in the BlockManager of the Excutor. At this time, the task under the Excutor can share this variable, which can obviously bring some performance improvement.

If you put a picture found on the Internet here, you will not draw it by yourself. The principle is the same as that mentioned above:

Accumulator

The Accumulator provided by Spark is mainly used for multiple nodes to share a variable. Accumulator only provides cumulative functionality. But it does provide us with the ability for multiple task to operate on a variable in parallel. However, task can only accumulate on Accumulator and cannot read its value. Only Driver programs can read the value of Accumulator.

Very similar to a Counter counter in MR, it is mainly used to count the number of calls to each program fragment and compare it with the whole to evaluate the data.

The test code is as follows:

Package cn.xpleaf.bigdata.spark.scala.core.p3import org.apache.log4j. {Level, Logger} import org.apache.spark.rdd.RDDimport org.apache.spark. {SparkConf, SparkContext} / * Spark shared variable accumulator Accumulator * * need to note that The execution of the accumulator must require an Action trigger * / object _ 04SparkAccumulatorOps {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName (_ 01SparkPersistOps.getClass.getSimpleName ()) val sc = new SparkContext (conf) Logger.getLogger ("org.apache.spark") .setLevel (Level.OFF) Logger.getLogger ("org.apache.hadoop") .setLevel (Level.OFF) / / to * 7 for all of these variables Also count the number of numbers divisible by 3 val list = List (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 13) val listRDD:RDD [Int] = sc.parallelize (list) var counter = 0 val counterAcc = sc.accumulator [Int] (0) val mapRDD = listRDD.map (num = > {counter + = 1 if (num% 3 = = 0) {counterAcc.add (1)} num * 7}) / / the following operation performs another RDD calculation So you can consider the above scheme, reducing the calculation of RDD / / val ret = mapRDD.filter (num = > num% 3 = = 0). Count () mapRDD.foreach (println) println ("counter===" + counter) println ("counterAcc===" + counterAcc.value) sc.stop ()}}

The output is as follows:

4956763147021772884359142counter===0counterAcc===4

Here is a more concise example:

Package cn.xpleaf.spark.p5import org.apache.spark. {SparkConf SparkContext} / * * @ author xpleaf * @ date 2019-1-10 6:14 PM * / object SampleSpark2 {def main (args: Array [String]): Unit = {val conf = new SparkConf () .setAppName (s "{SampleSpark2.getClass.getSimpleName}") .setMaster ("local [2]") val sc = new SparkContext (conf) / / Accumulator Used to count even numbers val counterAcc = sc.accumulator [Int] (0) / / ordinary counter variable var counter = 0 val rdd = sc.parallelize (List (1, 2, 3, 4, 5) in rdd 6)) / / need to trigger the execution of transformation rdd.map {num = > if (num% 2 = = 0) {/ / Accumulator and normal counter variables both add 1 counterAcc.add (1) counter + = 1}}. Count () Println (s "counterAcc: ${counterAcc.value}) Counter: $counter ") sc.stop ()}}

The output is as follows:

CounterAcc: 3, counter: 0

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report