Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Persistence and shared variables of spark

2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Persistence operator cache

Introduction to   : normally, a RDD does not contain real data, but only contains information describing the RDD metadata. If the cache method is called on the RDD, then the data of the RDD still has no real data. Until the first call to an action operator triggers the data generation of the RDD, then the cache operation will store the data in memory, so the second time the RDD is reused. The calculation will be much faster.

The main storage levels are:

/ / not stored in memory and not on disk val NONE = new StorageLevel (false, false) / / stored on disk val DISK_ONLY = new StorageLevel (true, false, false, false) / / stored on disk Save 2 copies of val DISK_ONLY_2 = new StorageLevel (true, false, 2) / / store in memory val MEMORY_ONLY = new StorageLevel (false, true, false, true) / / store 2 copies of val MEMORY_ONLY_2 = new StorageLevel (false, true, false, true, 2) in memory and serialize val MEMORY_ONLY_SER = new StorageLevel (false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel (false, true, false, false) 2) / / use val MEMORY_AND_DISK = new StorageLevel (true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel (true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel (true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel (true, true, false, false, 2) / / stored in out-of-heap memory val OFF_HEAP = new StorageLevel (true, false, 1)

The corresponding action:

/ / set persistence listRDD.cache () / / remove persistence listRDD.unpersist ()

two。 Shared variable

   introduction: in a Spark program, when a function passed to a Spark operation (such as map and reduce) runs on a remote node, the Spark operation actually operates on an independent copy of the variable used by the function. These variables are copied to each machine, and all updates to these variables on the remote machine are not passed back to the driver. Cross-task read and write variables are generally inefficient, but Spark provides two limited shared variables for two common usage patterns: broadcast variables (Broadcast Variable) and accumulators (Accumulator).

  (1) broadcast variable

When not using broadcast variables:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext (conf) val list = List (("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18) val listRDD: RDD [(String) Int)] = sc.parallelize (list) / / this code is executed in driver It is equivalent to that this variable is in the driver process. The val axi3 / * kv._2+a code is executed in executor, * where the variable an is carried in the process of serialization with f. * and each task will be copied. It is conceivable that if this a variable is a large object, it is a disaster * / listRDD.map (kv= > (kv._1,kv._2+a))}}.

When using broadcast variables:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext (conf) val list = List (("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18) val listRDD: RDD [(String) Int)] = sc.parallelize (list) / / this code is executed in driver It is equivalent to that this variable is in the driver process. Val broadcast 3 / / sets the broadcast variable. Each task in executor shares a broadcast variable val broadcast: Broadcast [Int] = sc.broadcast (a) listRDD.map (kv= > {/ / get broadcast variable val aa=broadcast.value (kv._1,kv._2+aa)})}}

Summary: if the Driver variable is used on the executor side, if you do not use the broadcast variable, there will be as many copies of the variable on the Driver side as there are task on the Executor. If the Driver variable is used on the Executor side, if the broadcast variable is used, there is only one copy of the variable on the Driver side in each Executor.

Conditions for broadcast variables used:

  -broadcast variables can only be defined on the driver side, not in executor

  -the value of the broadcast variable can be modified on the Driver side, but not on the Executor side.

  -the larger the value of the broadcast variable, the greater the advantage of using the broadcast variable

The more   -task, the more obvious the advantage of using broadcast variable.

  (2) accumulator

Introduction to   : in Spark applications, we often have such requirements, such as exception monitoring, debugging, and recording the number of data that meets a certain characteristic, which requires counters. If a variable is not declared as an accumulator, it will not make a global summary on the driver side when it is changed, that is, each task runs only a copy of the original variable when it is distributed. The value of the original variable cannot be changed, but when the variable is declared as an accumulator, the variable has the function of distributed counting.

Case study:

Object SparktTest {def main (args: Array [String]): Unit = {val conf: SparkConf = new SparkConf () conf.setAppName ("SparktTest") conf.setMaster ("local [2]") val sc: SparkContext = new SparkContext (conf) / / number of lines val hdfsRDD: RDD [String] = sc.textFile ("/ data/word.txt") / / Settings Accumulator val mysum: LongAccumulator = sc.longAccumulator ("Mysum") hdfsRDD.map (line= > {mysum.add (1) line}). Collect () / trigger commit operation / / get the value of the accumulator println (mysum.value) / / reset accumulator mysum.reset ()}}

Considerations for using accumulator

The   -accumulator assigns the initial value on the Driver side, and the accumulator can only read the last value on the Driver side and update it on the Excutor side.

The   -accumulator is not a tuning operation, because if you don't, the result is wrong.

The   -accumulator is not a tuning operation, because if you don't, the result is wrong.

The   -accumulator is not a tuning operation, because if you don't, the result is wrong.

  -if the accumulator that comes with the system does not meet the requirements, you can also customize the accumulator

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report