In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Recently, we need to use the accumulator of Spark in the project, and we need to customize the accumulator to implement Spark, so as to meet the needs of production. In view of this, the implementation mechanism of Spark accumulator is tracked and studied.
This series of articles will analyze the Spark accumulator from the following aspects:
The basic concept of Spark Accumulator
The composition of key classes of accumulator
Source code analysis of accumulator
The execution process of the accumulator
Pits in use of accumulators
Implementation of Custom Accumulator
Basic concepts of Spark accumulator
The Accumulator provided by Spark is mainly used for multiple nodes to share a variable. Accumulator only provides cumulative functions, which can only be accumulated, and can not reduce that accumulators can only be built on the driver side, and the results can only be read from the driver side, and can only be accumulated on the task side.
As for why it can only be accumulated in Task? The following content will be introduced in detail, first briefly introduced:
On the Task node, exactly that is on the executor; each Task will have an accumulator variable, which will be serialized and transferred to the executor and then returned to run independently; if you get the value on the Task side, you can only get the current Task, and there will be no effect between Task and Task
The accumulator will not change the characteristics of Spark lazy computing, but will only perform relevant accumulation operations when Job is triggered.
Existing accumulator types:
Introduction of key classes of Accumulator
Class Accumulator extends Accumulable
Source code (the role of this class has been explained in great detail in the source code):
/ * A simpler value of [[Accumulable]] where the result type being accumulated is the same * as the types of elements being merged, i.e. Variables that are only "added" to through an * associative operation and can therefore be efficiently supported in parallel. They can be used * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric * value types, and programmers can add support for new types. * * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. * However, they cannot read its value. Only the driver program can read the accumulator's value, * using its value method. * * @ param initialValue initial value of accumulator * @ param param helper object defining how to add elements of type `T` * @ tparam T result type * / class Accumulator [T] private [spark] (@ transient private [spark] val initialValue: t, param: AccumulatorParam [T], name: Option [String], internal: Boolean) extends Accumulable [T, T] (initialValue, param, name, internal) {def this (initialValue: t, param: AccumulatorParam [T], name: Option [String]) = {this (initialValue) Param, name, false)} def this (initialValue: t, param: AccumulatorParam [T]) = {this (initialValue, param, None) False)}} mainly implements the initialization of the accumulator and encapsulates the relevant accumulator operation methods while registering the add operation of the accumulator with Accumulators when the class object is built, the return value type and the passed value type can be different, so it is necessary to define two-step operations (that is, the add method): accumulator operation / merge operation.
Object Accumulators
This method manages the accumulator on the driver side, and also includes the aggregate operation of the accumulator.
Trait AccumulatorParam [T] extends AccumulableParam [T, T]
Source code:
/ * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be * available when you create Accumulators of a specific type. * * @ tparam T type of value to accumulate * / trait AccumulatorParam [T] extends AccumulableParam [T, T] {def addAccumulator (T1: t, T2: t): t = {addInPlace (T1, T2)}} the addAccumulator operation of AccumulatorParam encapsulates the specific implementation or needs to implement the addInPlace method custom implementation accumulator in the concrete implementation class.
Object AccumulatorParam
Source code:
Object AccumulatorParam {/ / The following implicit objects were in SparkContext before 1.2 and users had to / / `import SparkContext._ `to enable them. Now we move them here to make the compiler find / / them automatically. However, as there are duplicate codes in SparkContext for backward / / compatibility, please update them accordingly if you modify the following implicit objects. Implicit object DoubleAccumulatorParam extends AccumulatorParam [Double] {def addInPlace (T1: Double, T2: Double): Double = T1 + T2 def zero (initialValue: Double): Double = 0.0} implicit object IntAccumulatorParam extends AccumulatorParam [Int] {def addInPlace (T1: Int, T2: Int): Int = T1 + T2 def zero (initialValue: Int): Int = 0} implicit object LongAccumulatorParam extends AccumulatorParam [Long] {def addInPlace (T1: Long) T2: Long): Long = T1 + T2 def zero (initialValue: Long): Long = 0L} implicit object FloatAccumulatorParam extends AccumulatorParam [Float] {def addInPlace (T1: Float, T2: Float): Float = T1 + T2 def zero (initialValue: Float): Float = 0f} / / TODO: Add AccumulatorParams for other types, e.g. Lists and strings} a large number of implicit keywords from the source code It can be found that this class mainly performs implicit type conversion.
TaskContextImpl
Our accumulator is managed on the executor side, and the accumulator is the source code parsing of the accumulator returned through this class.
Driver end
accumulator method
Use the accumulator method in the following code as the entry point to enter the corresponding source code
Val acc = new Accumulator (initialValue, param, Some (name))
Source code:
Class Accumulator [T] private [spark] (@ transient private [spark] val initialValue: t, param: AccumulatorParam [T], name: Option [String], internal: Boolean) extends Accumulable [T, T] (initialValue, param, name, internal) {def this (initialValue: t, param: AccumulatorParam [T], name: Option [String]) = {this (initialValue, param, name, false)} def this (initialValue: t, param: AccumulatorParam [T]) = {this (initialValue, initialValue, initialValue) False)}}
Accumulable inherited by [T, T]
Source code:
Class Accumulable [R, T] private [spark] (initialValue: r, param: AccumulableParam [R, T], val name: Option [String], internal: Boolean) extends Serializable {... / / the _ value here does not support serialization / / Note: anyone with @ transient will not be serialized @ volatile @ transient private var value_: r = initialValue / / Current value on master … / / registered the current accumulator Accumulators.register (this). ,}
Accumulators.register ()
Source code:
/ / pass in parameters, register accumulator def register (a: Accumulable [_, _]): Unit = synchronized {/ / construct WeakReferenceoriginals (a.id) = new WeakReference [Accumulable [_, _]] (a)}
At this point, the initialization of the driver side has been completed
Executor end
Deserialization on the executor side is a process of getting our object. Initialization is completed at the time of deserialization, and the registration of Accumulator to TaskContextImpl is also completed during deserialization.
Run method in TaskRunner
/ / during the calculation, the RDD and function will be serialized and passed to the executor side private [spark] class Executor (executorId: String, executorHostname: String, env: SparkEnv, userClassPath: Seq [URL] = Nil, isLocal: Boolean = false) extends Logging {. Class TaskRunner (execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable {… Override def run (): Unit = {… Val (value, accumUpdates) = try {/ / call the task.run method in TaskRunner, triggering the operation of task val res = task.run (taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res} finally {... }... }
The collectAccumulators () method in Task
Private [spark] abstract class Task [T] (final def run (taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): (T, AccumulatorUpdates) = {... Try {/ / returns the accumulator and runs task / / to call TaskContextImpl's collectAccumulators. The type of return value is a Map (runTask (context), context.collectAccumulators ())} finally {... }... })
RunTask method in ResultTask
Override def runTask (context: TaskContext): U = {/ / Deserialize the RDD and the func using the broadcast variables. Val deserializeStartTime = System.currentTimeMillis () val ser = SparkEnv.get.closureSerializer.newInstance () / / deserialization is done when calling ResultTask's runTask method / / will deserialize RDD and self-defined function val (rdd, func) = ser.deserialize [(RDD [T], (TaskContext, Iterator [T]) = > U)] (ByteBuffer.wrap (taskBinary.value)) Thread.currentThread.getContextClassLoader) _ executorDeserializeTime = System.currentTimeMillis ()-deserializeStartTime metrics = Some (context.taskMetrics) func (context, rdd.iterator (partition, context))}
ReadObject method in Accumulable
/ / the Accumulable.readObject method / / Called by Java when deserializing an object private def readObject (in: ObjectInputStream) is called during deserialization: Unit = Utils.tryOrIOException {in.defaultReadObject () / / value the initial value is zero; and the value is value_ = zero deserialized = true / / Automatically register the accumulator when it is deserialized with the task closure that will be serialized. / Note internal accumulators sent with task are deserialized before the TaskContext is created / / and are registered in the TaskContext constructor. Other internal accumulators, such SQL / / metrics, still need to register here. Val taskContext = TaskContext.get () if (taskContext! = null) {/ / the object resulting from the current deserialization will be registered in TaskContext / / so that TaskContext can get the accumulator / / after the task has finished running, it can be returned to executor taskContext.registerAccumulator (this)} through context.collectAccumulators ()
Executor.scala
/ / after getting the uplouUpdates value at the executor, it will construct a DirectTaskResultval directResult = new DirectTaskResult (valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize (directResult) val resultSize = serializedDirectResult.limit... / / finally sent to the Driver side by the statusUpdate method of ExecutorBackend / / ExecutorBackend is a Trait with multiple implementations of execBackend.statusUpdate (taskId, TaskState.FINISHED, serializedResult)
StatusUpdate method in CoarseGrainedExecutorBackend
/ / send data to override def statusUpdate (taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate (executorId, taskId, state, data) driver match {case Some (driverRef) = > driverRef.send (msg) case None = > logWarning (s "Drop $msg because has not yet connected to driver")}} through an implementation class of CoarseGrainedExecutorBackend
Receive method in CoarseGrainedSchedulerBackend
/ / after receiving the message, the Driver side will call the receive method override def receive in CoarseGrainedSchedulerBackend: PartialFunction [Any, Unit] = {case StatusUpdate (executorId, taskId, state, data) = > / / will return the result to scheduler.statusUpdate (taskId, state, data.value) in the handleTaskCompletion method of DAGScheduler. }
StatusUpdate method of TaskSchedulerImpl
Def statusUpdate (tid: Long, state: TaskState, serializedData: ByteBuffer) {… If (state = = TaskState.FINISHED) {taskSet.removeRunningTask (tid) / / team taskResultGetter.enqueueSuccessfulTask (taskSet, tid, serializedData)} else if (Set (TaskState.FAILED, TaskState.KILLED, TaskState.LOST). Steps (state)) {taskSet.removeRunningTask (tid) taskResultGetter.enqueueFailedTask (taskSet, tid, state, serializedData)} … }
EnqueueSuccessfulTask method of TaskResultGetter
Def enqueueSuccessfulTask (taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {… Result.metrics.setResultSize (size) scheduler.handleSuccessfulTask (taskSetManager, tid, result)...
HandleSuccessfulTask method of TaskSchedulerImpl
Def handleSuccessfulTask (taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult [_]): Unit = synchronized {taskSetManager.handleSuccessfulTask (tid, taskResult)}
TaskEnded method of DAGScheduler
Def taskEnded (task: Task [_], reason: TaskEndReason, result: Any, accumUpdates: Map [Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = {eventProcessLoop.post (/ / send a CompletionEvent / / this CompletionEvent will be received by handleTaskCompletion method CompletionEvent (task, reason, result, accumUpdates, taskInfo, taskMetrics))}
HandleTaskCompletion method of DAGScheduler
/ / corresponds to the receive method section in the above CoarseGrainedSchedulerBackend / / in the handleTaskCompletion method, the receiving CompletionEvent//, whether ResultTask or ShuffleMapTask, calls the updateAccumulators method, updating the accumulator value private [scheduler] def handleTaskCompletion (event: CompletionEvent) {... Event.reason match {case Success = > listenerBus.post (SparkListenerTaskEnd (stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics) stage.pendingPartitions-= task.partitionId task match {case rt: ResultTask [_ _] = > / / Cast to ResultStage here because it's part of the ResultTask / / TODO Refactor this out to a function that accepts a ResultStageval resultStage = stage.asInstanceOf [ResultStage] resultStage.activeJob match {case Some (job) = > if (! job.finished (rt.outputId)) {updateAccumulators (event) case smt: ShuffleMapTask = > Val shuffleStage = stage.asInstanceOf [ShuffleMapStage] updateAccumulators (event)}... }
UpdateAccumulators method of DAGScheduler
Private def updateAccumulators (event: CompletionEvent): Unit = {val task = event.task val stage = stageIdToStage (task.stageId) if (event.accumUpdates! = null) {try {/ / the add method Accumulators.add (event.accumUpdates) of the accumulator is called
Add method of Accumulators
Def add (values: Map [Long, Any]): Unit = synchronized {/ / the value passed in for ((id, value) accum.asInstanceOf [Accumulable [Any, Any]] + + = value case None = > throw new IllegalAccessError ("Attempted to access garbage collected Accumulator.")}} else {logWarning (s "Ignoring accumulator update for unknown accumulator id $id")}
+ + = method of Accumulators
Def + + = (term: r) {value_ = param.addInPlace (value_, term)}
Value method of Accumulators
Def value: r = {if (! deserialized) {value_} else {throw new UnsupportedOperationException ("Can't read accumulator value in task")}}
At this point, our application can get the value of the counter by .value.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.