In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
Editor to share with you how to use Spark broadcast variables and accumulators, I believe most people do not know much about it, so share this article for your reference, I hope you will learn a lot after reading this article, let's go to know it!
Broadcast variables and accumulators
Typically, when a function is passed to a Spark operation, such as map,reduce, it executes on a remote cluster node, and it uses copies of all variables in the function. These variables are copied to all machines, and variables that are not updated on the remote machine are passed back to the driver. It is inefficient to use common, read-write shared variables between tasks. However, Spark provides two limited types of shared variables, broadcast variables and accumulators.
1.1 broadcast variables:
Broadcast variables allow programmers to cache a read-only variable on each machine without passing variables between tasks. Broadcast variables can be used to effectively give each node a copy of a large input data set. Spark also tries to use an efficient broadcast algorithm to distribute variables, thereby reducing communication overhead.
The actions of Spark are performed through a series of steps separated by distributed shuffle operations. Spark automatically broadcasts the common data needed for each step and task. The broadcast data is serialized cached and deserialized before the task is run. This means that explicitly creating broadcast variables is useful when we need to use the same data between multiple phases of tasks, or when it is important to cache data in the form of deserialization.
You can create a broadcast variable by calling SparkContext.broadcast (v) on a variable v. The broadcast variable is wrapped around v and can be accessed through the value method. Examples are as follows:
Scala > val broadcastVar = sc.broadcast (Array (1,2,3)) broadcastVar: org.apache.spark.broadcast.Broadcasting [Array [int]] = Broadcast (0) scala > broadcastVar.valueres0: Array [Int] = Array (1,2,3)
After you create a broadcast variable, you should use it instead of using v. In this way, v is not transmitted between nodes more than once. In addition, in order to ensure that all nodes get the same variable, object v should not be modified after it has been broadcast.
1.2 Accumulator:
Accumulators are variables that are only accumulated by related operations, so they can be effectively supported in parallelism. It can be used to implement counters and summation. Spark natively supports only accumulators of numeric types, and programmers can add support for new types. If you specify a name when you create the accumulator, you can see it in the UI interface of Spark. This helps to understand the process of each implementation phase. (not yet supported for python)
The accumulator is created by calling SparkContext.accumulator (v) on an initialized variable v. Tasks running on the cluster can be accumulated on the accumulator through the add or "+ =" methods. However, they cannot read its value. Only the driver can read its value through the accumulator's value method.
The following code shows how to add all the elements in an array to the accumulator:
Scala > val accum = sc.accumulator (0, "My Accumulator") accum: spark.Accumulator [Int] = 0scala > sc.parallelize (Array (1,2,3,4)) .foreach (x = > accum + = x). 18:41:08 on 10-09-29 INFO SparkContext: Tasks finished in 0.317106 sscala > accum.valueres2: Int = 10
Although the above example uses the built-in supported accumulator type Int, developers can also create their own accumulator types by inheriting the AccumulatorParam class. There are two methods for the AccumulatorParam interface:
The zero method provides a value of 0 for your type.
The addInPlace method adds two values.
Suppose we have a Vector class that represents mathematical vector. We can do this as follows:
Object VectorAccumulatorParam extends AccumulatorParam [Vector] {def zero (initialValue: Vector): Vector = {Vector.zeros (initialValue.size)} def addInPlace (v1: Vector, v2: Vector): Vector = {v1 + = v2}} / / Then, create an Accumulator of this type:val vecAccum = sc.accumulator (new Vector (...)) (VectorAccumulatorParam)
In Scala, Spark provides a more general accumulation interface to accumulate data, although the type of the result and the accumulated data type may be inconsistent (for example, creating a list by collecting elements together). At the same time, the SparkContext..accumulableCollection method is used to accumulate the general collection types of Scala.
The accumulator is updated only within the action operation, and Spark ensures that each task's update operation on the accumulator is performed only once, that is, the restart task is not updated. During the conversion operation, the user must be aware that the update operation for each task to the accumulator may be performed more than once, if the task and job phases are re-executed.
The accumulator does not change the lazy evaluation model of Spark. If they are updated by an operation on RDD, their values are updated only when the RDD is calculated because of the action. Therefore, when performing a lazy conversion operation, such as map, there is no guarantee that the update to the accumulator value will actually be performed. The following code snippet demonstrates this feature:
Val accum = sc.accumulator (0) data.map {x = > accum + = x; f (x)} / / here, the value of accum is still 0, because no action causes map to be actually calculated.
II. Practical demonstration of Java and Scala versions
2.1 Java version:
/ * instance: use broadcast to filter blacklist! * check whether the new data is in the broadcast variable-blacklist, so as to filter the data. * / public class BroadcastAccumulator {/ * create a List broadcast variable * * / private static volatile Broadcast broadcastList = null; / * counter! * / private static volatile Accumulator accumulator = null; public static void main (String [] args) {SparkConf conf = new SparkConf () .setMaster ("local [2]"). SetAppName ("WordCountOnlineBroadcast"); JavaStreamingContext jsc = new JavaStreamingContext (conf, Durations.seconds (5)); / * Note: an action operation is required to distribute broadcasts. * Note: the asList of Arrays is broadcast rather than a reference to the object. There is an error in the object reference to broadcast the Array array. * use broadcast to broadcast blacklists to each Executor! * / broadcastList = jsc.sc (). Broadcast (Arrays.asList ("Hadoop", "Mahout", "Hive")); / * * accumulator as a global counter! Used to count how many blacklists have been filtered online! * instantiate here. * / accumulator = jsc.sparkContext (). Accumulator (0, "OnlineBlackListCounter"); JavaReceiverInputDStream lines = jsc.socketTextStream ("Master", 9999); / * * omit flatmap here because the list is individual! * / JavaPairDStream pairs = lines.mapToPair (new PairFunction () {@ Override public Tuple2 call (String word) {return new Tuple2 (word, 1);}}); JavaPairDStream wordsCount = pairs.reduceByKey (new Function2 () {@ Override public Integer call (Integer v1, Integer v2) {return v1 + v2;}}); / * * the first few parameters in Funtion are input parameters. * the parameter at the back. * reflected in the call method! * * / wordsCount.foreach (new Function2 () {@ Override public Void call (JavaPairRDD rdd, Time time) throws Exception {rdd.filter (new Function () {@ Override public Boolean call (Tuple2 wordPair) throws Exception {if (broadcastList.value (). Contains (wordPair._1) {/ * accumulator is not only used for counting. * it can be written to the database or cache at the same time. * / accumulator.add (wordPair._2); return false;} else {return true;}}; / * * broadcast and counter execution requires an action operation! * /}) .collect (); System.out.println ("value in broadcaster" + broadcastList.value ()); System.out.println ("value in timer" + accumulator.value (); return null;}}); jsc.start (); jsc.awaitTermination (); jsc.close ();}}
2.2 Scala version
Package com.Streamingimport java.utilimport org.apache.spark.streaming. {Duration, StreamingContext} import org.apache.spark. {Accumulable, Accumulator, SparkContext, SparkConf} import org.apache.spark.broadcast.Broadcast/** * Created by lxh on 2016-6-30. * / object BroadcastAccumulatorStreaming {/ * declares a broadcast and accumulator! * / private var broadcastList: Broadcasting [private var accumulator:Accumulator [string]] = _ private var accumulator:Accumulator [Int] = _ def main (args: Array [String]) {val sparkConf = new SparkConf (). SetMaster ("local [4]"). SetAppName ("broadcasttest") val sc = new SparkContext (sparkConf) / * * duration is ms * / val ssc = new StreamingContext (sc,Duration (2000)) / / broadcastList = ssc.sparkContext.broadcast (util.Arrays.asList ("Hadoop") "Spark") broadcastList = ssc.sparkContext.broadcast (List ("Hadoop", "Spark")) accumulator= ssc.sparkContext.accumulator (0, "broadcasttest") / * get data! * / val lines = ssc.socketTextStream ("localhost", 9999) / * 1.flatmap divides lines into words. * 2.map turns the word into tuple (word,1) * 3.reducebykey accumulative value * (4.sortBykey ranking) * 4. Filter it. Whether the value is in the accumulator. * 5. Print the display. * / val words = lines.flatMap (line = > line.split ("") val wordpair = words.map (word = > (word,1)) wordpair.filter (record = > {broadcastList.value.contains (record._1)}) val pair = wordpair.reduceByKey (_ + _) / * this pair is PairDStream * check whether this id is on the blacklist, if so The accumulator is + 1 * / / * pair.foreachRDD (rdd = > {rdd.filter (record = > {if (broadcastList.value.contains (record._1)) {accumulator.add (1) return true} else {return false})}) * / val filtedpair = pair.filter (record = > {if (broadcastList.value.contains (record._1)) {accumulator.add (record._2) true} else {false}}). Print println ("value of accumulator" + accumulator.value) / / pair.filter (record = > {broadcastList.value.contains (record._1)}) / * val keypair = pair.map (pair = > (pair._2) Pair._1)) * / / * if DStream does not have an operator of its own. By transforming transform! * / / * keypair.transform (rdd = > {rdd.sortByKey (false) / / TODO}) * / pair.print () ssc.start () ssc.awaitTermination ()} above is all the content of the article "how to use the broadcast variables and accumulators of Spark". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.