In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
First, basic concepts
An accumulator is a variable of Spark, which, as its name implies, can only be incremented. It has the following characteristics:
1. Accumulators can only be built on the driver side and can only be read by Driver, and Task can only be accumulated.
2. The accumulator will not change the characteristics of Spark Lazy computing. The relevant accumulation operation will only be performed when the Job is triggered.
3, the type of existing accumulator.
I believe there are many Taoist friends who learn from big data. Here I would like to tell you about my Di Qun, big data massive knowledge sharing, 784789432. Here I guarantee, absolutely big data's practical information, waiting for your arrival, we together from entry to proficiency!
Second, the use of accumulators
The Driver side initializes and gets the value after the Action.
Val accum = sc.accumulator (0, "test Accumulator")
Accum.value
Calculation is carried out on the Executor side
Accum+=1
Third, the key classes of accumulators
Class Accumulator extends Accumulable
It mainly realizes the initialization of the accumulator and encapsulates the relevant operation methods of the accumulator. At the same time, the accumulator is registered with our Accumulators when the class object is built. The return value type of the add operation of the accumulator can be different from the value type we passed in. Therefore, we must define how to accumulate and merge values. That is, add method
Object Accumulators:
This method manages our accumulator on the driver side and also includes aggregate operations for specific accumulators.
Trait AccumulatorParam [T] extends AccumulableParam [T, T]:
For the generic encapsulation of the addAccumulator operation of AccumulatorParam, the specific implementation is to implement the addInPlace method in the concrete implementation class.
Object AccumulatorParam:
It is mainly the operation of implicit type conversion.
TaskContextImpl:
Manages our accumulator on the executor side.
Fourth, the source code analysis of the accumulator
1, initialization of driver side
Val accum = sc.accumulator (0, "test Accumulator")
Val acc = new Accumulator (initialValue, param, Some (name))
It is mainly called in Accumulable (Accumulator) so that we can use Accumulator.
Accumulators.register (this)
2, the process of deserialization on the executor side to get our object
First of all, our value_ can see that it does not support serialization
@ volatile @ transient private var value_: r = initialValue / / Current value on master
The initialization is done when we deserialize, and deserialization also completes the registration of Accumulator to our TaskContextImpl.
Deserialization is done when calling the RunTask method of ResultTask
Val (rdd, func) = ser.deserialize [(RDD [T], (TaskContext, Iterator [T]) = > U)] (
ByteBuffer.wrap (taskBinary.value), Thread.currentThread.getContextClassLoader)
Will be called during the procedure
Private def readObject (in: ObjectInputStream): Unit = Utils.tryOrIOException {
In.defaultReadObject ()
Value_ = zero
Deserialized = true
/ / Automatically register the accumulator when it is deserialized with the task closure.
/ /
/ / Note internal accumulators sent with task are deserialized before the TaskContext is created
/ / and are registered in the TaskContext constructor. Other internal accumulators, such SQL
/ / metrics, still need to register here.
Val taskContext = TaskContext.get ()
If (taskContext! = null) {
TaskContext.registerAccumulator (this)
}
}
3, the accumulation of accumulators
Accum+=1
Param.addAccumulator (value_, term)
There are different implementations of AccumulableParam according to different accumulator parameters
For example, the int type. The addAccumulator method of the AccumulatorParam attribute that is finally called.
Trait AccumulatorParam [T] extends AccumulableParam [T, T] {
Def addAccumulator (T1: t, T2: t): t = {
AddInPlace (T1, T2)
}
}
Then, the addInPlace method of each concrete implementation is called.
Implicit object IntAccumulatorParam extends AccumulatorParam [Int] {
Def addInPlace (T1: Int, T2: Int): Int = T1 + T2
Def zero (initialValue: Int): Int = 0
}
The value_ value of our Accumulators is updated when it is returned.
4 aggregation operation after the accumulation of each node of the Accumulator
Obtained and returned in the run method of the Task class
(runTask (context), context.collectAccumulators ())
Finally, updateAccumulators (event) is called in DAGScheduler.
In the updateAccumulators method
Accumulators.add (event.accumUpdates)
The details are as follows:
Def add (values: Map [Long, Any]): Unit = synchronized {
For ((id, value) accum.asInstanceOf [Accumulable [Any, Any]] + + = value
Case None = >
Throw new IllegalAccessError ("Attempted to access garbage collected Accumulator.")
}
} else {
LogWarning (s "Ignoring accumulator update for unknown accumulator id $id")
}
}
}
5, and finally we can get the value of the accumulator
Accum.value
Fifth, points for attention in the use of accumulators
The accumulator will not change the features of our RDD's Lazy, and then Action will complete the calculation and update.
But if two Action share a conversion operation, such as map, accumulating in the map, then each action will accumulate, resulting in some results we don't need.
6. Custom accumulator
Custom accumulator output
Seventh, sum up
The main points involved are serialization and class loading execution, which is necessary to play spark deeply.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.