Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Shared variables of 9.spark core

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Brief introduction

When    spark performs operations, you can use variables defined in the driver program Driver, but sometimes this default use is not ideal.

Each task running in the cluster is connected to a drive to get variables. If you get a large variable, the execution efficiency will be very inefficient. Each task gets a new copy of these variables, and updating the values of these copies does not affect the corresponding variables in the drive. This is not feasible if the drive needs to get the result value of the variable.

To solve these two problems,    spark provides two types of shared variables: broadcast variables (broadcast variable) and accumulators (accumulator).

Broadcast variables are used to distribute larger objects efficiently. A large object is cached locally for each actuator, avoiding connecting the drive to get it each time. The accumulator is used to aggregate data results in the drive. Broadcast variable principle

Broadcast variables can only be defined on the driver side, not on the executor side. The value of the broadcast variable can be modified on the driver side, but not on the executor side. If you do not use broadcast variables, you will have as many copies of variables on the driver side as there are task in Executor; if you use broadcast variables, there will be only one copy of the variables on the driver side in each Executor. Usage creates a BroadCast [T] object by calling SparkContext.broadcast on an object of type T, and any serializable type can do so. Accessing the value variable of the object through the value property is sent to each node only once and should be treated as a read-only value. (changing this value will not affect other nodes) instance

   queries the number of call signs in each country

Python# uses the call sign prefix (country code) as the broadcast variable signPrefixes = sc.broadcast (loadCallSignTable ()) def processSignCount (sign_count, signPrefixes): country = lookupCountry (sign_count [0], signPrefixes.value) count = sign_count [1] return (country, count) countryContactCounts = (contactCounts.map (processSignCount). ReduceByKey ((lambda x) ) countryContactCounts.saveAsTextFile (outputDir + "/ countries.txt") scala// uses the call sign prefix (country code) as the broadcast variable val signPrefixes = sc.broadcast (loadCallSignTable ()) def processSignCount (sign_count, signPrefixes): country = lookupCountry (sign_count [0], signPrefixes.value) count = sign_count [1] return (country, count) val countryContactCounts = contactCounts.map {case (sign, count) = > {val country = lookupInArray (sign, signPrefixes.value) (country Count)} .reduceByKey ((x, y) = > xroomy) countryContactCounts.saveAsTextFile (outputDir + "/ countries.txt") java// takes the call sign prefix (country code) as the broadcast variable final Broadcast signPrefixes = sc.broadcast (loadCallSignTable ()) JavaPairRDD countryContactCounts = contactCounts.mapToPair (new PairFunction () {public Tuple2 call (Tuple2 callSignCount) {String sign = callSignCount._1 (); String country = lookupCountry (sign, signPrefixes.value ()); return new Tuple2 (country, callSignCount._2 ();}}) .reduceByKey (new SumInts ()); countryContactCounts.saveAsTextFile (outputDir + "/ countries.txt"); Accumulator principle

The accumulator assigns an initial value on the driver side definition. The accumulator can only read the last value on the driver side and update it on the Excutor side. Usage creates an accumulator with initial values by calling the sc.accumulator (initivalValue) method. The return value is the org.apache.spark.Accumulator [T] object, where T is the type of the initial value initialValue. The executor code in the Spark closure can use the + = method of the accumulator to increase the value of the accumulator. The driver program can call the value property of the accumulator to access the value instance of the accumulator.

   accumulates blank lines

Pythonfile = sc.textFile (inputFile) # create Accumulator [Int] and initialize to 0blankLines = sc.accumulator (0) def extractCallSigns (line): global blankLines # access the global variable if (line = ""): blankLines + = 1 return line.split ("") callSigns = file.flatMap (extractCallSigns) callSigns.saveAsTextFile (outputDir + "/ callsigns") print "Blank lines:% d"% blankLines.valuescalaval file = sc.textFile ("file.txt") val BlankLines = sc.accumulator (0) / / create Accumulator [Int] and initialize to 0val callSigns = file.flatMap (line = > {if (line = "") {blankLines + = 1 / / accumulator plus 1} line.split (")}) callSigns.saveAsTextFile (" output.txt ") println (" Blank lines: "+ blankLines.value) javaJavaRDD rdd = sc.textFile (args [1]) Final Accumulator blankLines = sc.accumulator (0); JavaRDD callSigns = rdd.flatMap (new FlatMapFunction () {public Iterable call (String line) {if (".equals (line)) {blankLines.add (1);} return Arrays.asList (line.split ("));}}); callSigns.saveAsTextFile ("output.text"); System.out.println ("Blank lines:" + blankLines.value ())

Loyal to technology, love sharing. Welcome to the official account: java big data programming to learn more technical content.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report