In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Brief introduction
When spark performs operations, you can use variables defined in the driver program Driver, but sometimes this default use is not ideal.
Each task running in the cluster is connected to a drive to get variables. If you get a large variable, the execution efficiency will be very inefficient. Each task gets a new copy of these variables, and updating the values of these copies does not affect the corresponding variables in the drive. This is not feasible if the drive needs to get the result value of the variable.
To solve these two problems, spark provides two types of shared variables: broadcast variables (broadcast variable) and accumulators (accumulator).
Broadcast variables are used to distribute larger objects efficiently. A large object is cached locally for each actuator, avoiding connecting the drive to get it each time. The accumulator is used to aggregate data results in the drive. Broadcast variable principle
Broadcast variables can only be defined on the driver side, not on the executor side. The value of the broadcast variable can be modified on the driver side, but not on the executor side. If you do not use broadcast variables, you will have as many copies of variables on the driver side as there are task in Executor; if you use broadcast variables, there will be only one copy of the variables on the driver side in each Executor. Usage creates a BroadCast [T] object by calling SparkContext.broadcast on an object of type T, and any serializable type can do so. Accessing the value variable of the object through the value property is sent to each node only once and should be treated as a read-only value. (changing this value will not affect other nodes) instance
queries the number of call signs in each country
Python# uses the call sign prefix (country code) as the broadcast variable signPrefixes = sc.broadcast (loadCallSignTable ()) def processSignCount (sign_count, signPrefixes): country = lookupCountry (sign_count [0], signPrefixes.value) count = sign_count [1] return (country, count) countryContactCounts = (contactCounts.map (processSignCount). ReduceByKey ((lambda x) ) countryContactCounts.saveAsTextFile (outputDir + "/ countries.txt") scala// uses the call sign prefix (country code) as the broadcast variable val signPrefixes = sc.broadcast (loadCallSignTable ()) def processSignCount (sign_count, signPrefixes): country = lookupCountry (sign_count [0], signPrefixes.value) count = sign_count [1] return (country, count) val countryContactCounts = contactCounts.map {case (sign, count) = > {val country = lookupInArray (sign, signPrefixes.value) (country Count)} .reduceByKey ((x, y) = > xroomy) countryContactCounts.saveAsTextFile (outputDir + "/ countries.txt") java// takes the call sign prefix (country code) as the broadcast variable final Broadcast signPrefixes = sc.broadcast (loadCallSignTable ()) JavaPairRDD countryContactCounts = contactCounts.mapToPair (new PairFunction () {public Tuple2 call (Tuple2 callSignCount) {String sign = callSignCount._1 (); String country = lookupCountry (sign, signPrefixes.value ()); return new Tuple2 (country, callSignCount._2 ();}}) .reduceByKey (new SumInts ()); countryContactCounts.saveAsTextFile (outputDir + "/ countries.txt"); Accumulator principle
The accumulator assigns an initial value on the driver side definition. The accumulator can only read the last value on the driver side and update it on the Excutor side. Usage creates an accumulator with initial values by calling the sc.accumulator (initivalValue) method. The return value is the org.apache.spark.Accumulator [T] object, where T is the type of the initial value initialValue. The executor code in the Spark closure can use the + = method of the accumulator to increase the value of the accumulator. The driver program can call the value property of the accumulator to access the value instance of the accumulator.
accumulates blank lines
Pythonfile = sc.textFile (inputFile) # create Accumulator [Int] and initialize to 0blankLines = sc.accumulator (0) def extractCallSigns (line): global blankLines # access the global variable if (line = ""): blankLines + = 1 return line.split ("") callSigns = file.flatMap (extractCallSigns) callSigns.saveAsTextFile (outputDir + "/ callsigns") print "Blank lines:% d"% blankLines.valuescalaval file = sc.textFile ("file.txt") val BlankLines = sc.accumulator (0) / / create Accumulator [Int] and initialize to 0val callSigns = file.flatMap (line = > {if (line = "") {blankLines + = 1 / / accumulator plus 1} line.split (")}) callSigns.saveAsTextFile (" output.txt ") println (" Blank lines: "+ blankLines.value) javaJavaRDD rdd = sc.textFile (args [1]) Final Accumulator blankLines = sc.accumulator (0); JavaRDD callSigns = rdd.flatMap (new FlatMapFunction () {public Iterable call (String line) {if (".equals (line)) {blankLines.add (1);} return Arrays.asList (line.split ("));}}); callSigns.saveAsTextFile ("output.text"); System.out.println ("Blank lines:" + blankLines.value ())
Loyal to technology, love sharing. Welcome to the official account: java big data programming to learn more technical content.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.