Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Analysis of Spark accumulators commonly used in production

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Driver end

The Driver initializes the construction Accumulator and initializes the Accumulator registration. When Accumulators.register (this), the Accumulator will send it to the executor Driver after serialization and receive the status update completed by ResultTask. After receiving the status update completed by ResultTask, the Value value will be updated and the Accumulator value can be obtained after the Action operation is executed.

Executor end

After receiving the Task, the Executor side will deserialize and deserialize to get RDD and function. At the same time, the Accumulator is deserialized at the same time as deserialization (completed in the readObject method). At the same time, after completing the registration task calculation with TaskContext, the Task result is returned to Driver combined with source code analysis.

Driver side initialization

The    Driver side mainly completes the initialization operation through the following steps:

Val accum = sparkContext.accumulator (0, "AccumulatorTest") val acc = new Accumulator (initialValue, param, Some (name)) Accumulators.register (this)

Deserialization of Executor to get Accumulator

   deserialization is done when the runTask mode of ResultTask is called:

/ will deserialize RDD and self-defined functionval (rdd, func) = ser.deserialize [(RDD [T], (TaskContext, Iterator [T]) = > U)] (ByteBuffer.wrap (taskBinary.value), Thread.currentThread.getContextClassLoader)

During deserialization,    calls the readObject method in Accumulable:

Private def readObject (in: ObjectInputStream): Unit = Utils.tryOrIOException {in.defaultReadObject () / / value the initial value is zero;. This value is value_ = zero deserialized = true / / Automatically register the accumulator when it is deserialized with the task closure that will be serialized. / Note internal accumulators sent with task are deserialized before the TaskContext is created / / and are registered in the TaskContext constructor. Other internal accumulators, such SQL / / metrics, still need to register here. Val taskContext = TaskContext.get () if (taskContext! = null) {/ / the object resulting from the current deserialization will be registered in TaskContext / / so that TaskContext can get the accumulator / / after the task has finished running, it can be returned to executor taskContext.registerAccumulator (this)} through context.collectAccumulators ()

Be careful

Value_, in Accumulable.scala is not serialized, and the @ transient keyword modifies

@ volatile @ transient private var value_: r = initialValue / / accumulate operations of the Current value on master accumulator at each node

For different operations in the function, there should be different calling methods. The following are listed (in Accumulator.scala):

Def + = (term: t) {value_ = param.addAccumulator (value_, term)} def add (term: t) {value_ = param.addAccumulator (value_, term)} def + + = (term: r) {value_ = param.addInPlace (value_, term)}

There are different implementations of AccumulableParam (in Accumulator.scala) depending on the accumulator parameters:

Trait AccumulableParam [R, T] extends Serializable {/ * * def addAccumulator (r: r, t: t): R def addInPlace (R1: r, R2: r): R def zero (initialValue: r): r}

The different implementations are shown in the following figure:

Take IntAccumulatorParam as an example:

Implicit object IntAccumulatorParam extends AccumulatorParam [Int] {def addInPlace (T1: Int, T2: Int): Int = T1 + T2 def zero (initialValue: Int): Int = 0}

We found that IntAccumulatorParam implements trait AccumulatorParam [T]:

Trait AccumulatorParam [T] extends AccumulableParam [T, T] {def addAccumulator (T1: t, T2: t): t = {addInPlace (T1, T2)}

After the accumulation operation on each node is completed, the value_ value of the updated Accumulators is returned immediately.

Aggregation operation

The run method in Task.scala executes as follows:

/ / return the accumulator and run task// to call TaskContextImpl's collectAccumulators. The type of return value is a Map (runTask (context), context.collectAccumulators ()).

A series of operations have been completed on the executor side, and their values need to be returned to the driver side for aggregate summary, as shown in the figure accumulator executes the process:

According to the execution process, we can find that after executing the collectAccumulators method, updateAccumulators (event) will eventually be called in DAGScheduler, and in this method, the add method of Accumulators will be called to complete the aggregation operation:

Def add (values: Map [Long, Any]): Unit = synchronized {/ / traverse the passed value for ((id, value) accum.asInstanceOf [Accumulable [Any, Any]] + + = value case None = > throw new IllegalAccessError ("Attempted to access garbage collected Accumulator.")}} else {logWarning (s "Ignoring accumulator update for unknown accumulator id $id")} get the value of the accumulator

The value of the accumulator can be obtained by the accum.value method

At this point, the execution of the accumulator is complete.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report