Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Accumulator implementation Mechanism and Custom Accumulator of spark Source Code Series

2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

First, basic concepts

An accumulator is a variable of Spark, which, as its name implies, can only be incremented. It has the following characteristics:

1. Accumulators can only be built on the driver side and can only be read by Driver, and Task can only be accumulated.

2. The accumulator will not change the characteristics of Spark Lazy computing. The relevant accumulation operation will only be performed when the Job is triggered.

3, the type of existing accumulator.

I believe there are many Taoist friends who learn from big data. Here I would like to tell you about my Di Qun, big data massive knowledge sharing, 784789432. Here I guarantee, absolutely big data's practical information, waiting for your arrival, we together from entry to proficiency!

Second, the use of accumulators

The Driver side initializes and gets the value after the Action.

Val accum = sc.accumulator (0, "test Accumulator")

Accum.value

Calculation is carried out on the Executor side

Accum+=1

Third, the key classes of accumulators

Class Accumulator extends Accumulable

It mainly realizes the initialization of the accumulator and encapsulates the relevant operation methods of the accumulator. At the same time, the accumulator is registered with our Accumulators when the class object is built. The return value type of the add operation of the accumulator can be different from the value type we passed in. Therefore, we must define how to accumulate and merge values. That is, add method

Object Accumulators:

This method manages our accumulator on the driver side and also includes aggregate operations for specific accumulators.

Trait AccumulatorParam [T] extends AccumulableParam [T, T]:

For the generic encapsulation of the addAccumulator operation of AccumulatorParam, the specific implementation is to implement the addInPlace method in the concrete implementation class.

Object AccumulatorParam:

It is mainly the operation of implicit type conversion.

TaskContextImpl:

Manages our accumulator on the executor side.

Fourth, the source code analysis of the accumulator

1, initialization of driver side

Val accum = sc.accumulator (0, "test Accumulator")

Val acc = new Accumulator (initialValue, param, Some (name))

It is mainly called in Accumulable (Accumulator) so that we can use Accumulator.

Accumulators.register (this)

2, the process of deserialization on the executor side to get our object

First of all, our value_ can see that it does not support serialization

@ volatile @ transient private var value_: r = initialValue / / Current value on master

The initialization is done when we deserialize, and deserialization also completes the registration of Accumulator to our TaskContextImpl.

Deserialization is done when calling the RunTask method of ResultTask

Val (rdd, func) = ser.deserialize [(RDD [T], (TaskContext, Iterator [T]) = > U)] (

ByteBuffer.wrap (taskBinary.value), Thread.currentThread.getContextClassLoader)

Will be called during the procedure

Private def readObject (in: ObjectInputStream): Unit = Utils.tryOrIOException {

In.defaultReadObject ()

Value_ = zero

Deserialized = true

/ / Automatically register the accumulator when it is deserialized with the task closure.

/ /

/ / Note internal accumulators sent with task are deserialized before the TaskContext is created

/ / and are registered in the TaskContext constructor. Other internal accumulators, such SQL

/ / metrics, still need to register here.

Val taskContext = TaskContext.get ()

If (taskContext! = null) {

TaskContext.registerAccumulator (this)

}

}

3, the accumulation of accumulators

Accum+=1

Param.addAccumulator (value_, term)

There are different implementations of AccumulableParam according to different accumulator parameters

For example, the int type. The addAccumulator method of the AccumulatorParam attribute that is finally called.

Trait AccumulatorParam [T] extends AccumulableParam [T, T] {

Def addAccumulator (T1: t, T2: t): t = {

AddInPlace (T1, T2)

}

}

Then, the addInPlace method of each concrete implementation is called.

Implicit object IntAccumulatorParam extends AccumulatorParam [Int] {

Def addInPlace (T1: Int, T2: Int): Int = T1 + T2

Def zero (initialValue: Int): Int = 0

}

The value_ value of our Accumulators is updated when it is returned.

4 aggregation operation after the accumulation of each node of the Accumulator

Obtained and returned in the run method of the Task class

(runTask (context), context.collectAccumulators ())

Finally, updateAccumulators (event) is called in DAGScheduler.

In the updateAccumulators method

Accumulators.add (event.accumUpdates)

The details are as follows:

Def add (values: Map [Long, Any]): Unit = synchronized {

For ((id, value) accum.asInstanceOf [Accumulable [Any, Any]] + + = value

Case None = >

Throw new IllegalAccessError ("Attempted to access garbage collected Accumulator.")

}

} else {

LogWarning (s "Ignoring accumulator update for unknown accumulator id $id")

}

}

}

5, and finally we can get the value of the accumulator

Accum.value

Fifth, points for attention in the use of accumulators

The accumulator will not change the features of our RDD's Lazy, and then Action will complete the calculation and update.

But if two Action share a conversion operation, such as map, accumulating in the map, then each action will accumulate, resulting in some results we don't need.

6. Custom accumulator

Custom accumulator output

Seventh, sum up

The main points involved are serialization and class loading execution, which is necessary to play spark deeply.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report