Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark Series (6)-- Accumulator and broadcast variables

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

I. brief introduction

In Spark, two types of shared variables are provided: accumulator (accumulator) and broadcast variable (broadcast variable):

Accumulator: used to aggregate information, mainly used in scenarios such as cumulative counting; broadcast variables: mainly used to efficiently distribute large objects between nodes. 2. Accumulator

Let's first look at a specific scenario. For normal cumulative summation, if you use the following code to calculate in cluster mode, you will find that the execution result is not expected:

Var counter = 0val data = Array (1,2,3,4,5) sc.parallelize (data) .foreach (x = > counter + = x) println (counter)

The final result of counter is 0, and the main cause of this problem is closure.

2.1 understanding closures

1. The concept of closure in Scala

Let's first introduce the concept of closures in Scala:

Var more = 10val addMore = (x: Int) = > x + more

As shown above, there are two variables x and more in the function addMore:

X: is a binding variable (bound variable) because it is the input parameter of the function and is clearly defined in the context of the function; more: is a free variable (free variable) because the function numeral quantity does not give any meaning to the more.

By definition: when creating a function, if you need to capture a free variable, the function that contains a reference to the captured variable is called a closure function.

2. Closures in Spark

In the actual calculation, Spark will decompose the RDD operation into Task,Task and run on Worker Node. Spark closes the task before execution, and if a free variable is involved in the closure, the program copies it and places the copy variable in the closure, which is then serialized and sent to each executor. Therefore, when you reference counter in the foreach function, it will no longer be the counter on the Driver node, but the copy counter in the closure. By default, the updated value of the copy counter will not be passed back to Driver, so the final value of counter is still zero.

It should be noted that in Local mode, the Worker Node that is likely to execute foreach is in the same JVM as Diver and references the same original counter, so the update may be correct at this time, but it must not be correct in cluster mode. Therefore, we should give priority to using the accumulator when we encounter such problems.

The principle of the accumulator is actually simple: the final value of each copy variable is passed back to Driver, which is aggregated by Driver to get the final value, and the original variable is updated.

2.2 use the accumulator

All methods for creating accumulators are defined in SparkContext, and it is important to note that accumulator methods that are crossed out of the middle line are identified as obsolete after Spark 2.0.0.

The usage example and the execution results are as follows:

Val data = Array (1, 2, 3, 4, 5) / / define accumulator val accum = sc.longAccumulator ("My Accumulator") sc.parallelize (data) .foreach (x = > accum.add (x)) / / get the value accum.value of the accumulator

III. Broadcast variables

In the process of the closure introduced above, we said that the closure of each Task task will hold a copy of the free variable. If the variable is very large and there are many Task tasks, it will inevitably put pressure on the network IO. In order to solve this situation, Spark provides broadcast variables.

The practice of broadcasting variables is simple: instead of distributing copy variables to each Task, distribute them to all Task in each Executor,Executor to share a copy variable.

/ / define an array as a broadcast variable val broadcastVar = sc.broadcast (Array (1, 2, 3, 4, 5)) / / use the broadcast variable first instead of the original value sc.parallelize (broadcastVar.value). Map (_ * 10). Collect () Resources

RDD Programming Guide

For more articles in big data's series, please see the GitHub Open Source Project: big data's getting started Guide.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report