In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Abstract
The scheduling of spark has always been something I want to figure out, as well as the generation process of directed acyclic graph, the scheduling of task, the delayed execution of rdd, and the stage in which the compute of RDD calls and executes the functions we define. These are very basic and difficult. It took a while to finally figure out the mystery. To sum up, so that it can be further improved in the future. The scheduling of spark is divided into two levels: DAGSchedule and TaskSchedule. DAGSchedule is based on job to generate interdependent stages, and then the stages is passed to TaskSchedule in the form of TaskSet for task distribution process, the details will be explained slowly, longer.
This article catalogue
1. RDD logic execution chain of spark
2. The division of job and stage of spark.
3. Scheduling of DAGScheduler of spark
4. Scheduling of TaskSchedule of spark
5. How does executor execute task and the functions we define
Logical execution chain of spark's RDD
It is said that spark delays execution and generates corresponding Stage through RDD's DAG. The formation of RDD's DAG is accomplished through dependencies. Each RDD will generate one or more sub-RDD when it passes through the conversion operator. When passing the conversion operator, when creating a new RDD, it will also create a dependency relationship between them. Therefore, they are connected through Dependencies, and the dependency of RDD is not our focus. If you want to understand the dependency of RDD, you can divide the dependency of google,RDD into: the RangeDependency of OneToOneDependency,m:1 at 1:1, and the ShuffleDependencies of MOneToOneDependency,m:1, in which OneToOneDependency and RangeDependency are also known as NarrowDependency. The granularity of 1RDD here is for the partition of RDD.
The most fundamental part of the dependency is to retain the parent RDD, and the rdd method is the method that returns the parent RDD. In this way, it forms a structure in the form of a linked list, which can be traced back to all parent classes RDD through the last RDD based on dependencies.
Let's take map as an example to see how dependencies are created.
Actually created a RDD of MapPartitonsRDD through map.
Then we take a look at the main constructor of MapPartitonsRDD, which assigns a value to RDD, where the parent RDD is the RDD specified by the above this object, and let's take a look at the constructor of the class RDD:
It also calls the main constructor of RDD
In fact, dependencies are formed in the constructor of RDD.
The DAG diagram of RDD quota is formed through the above dependency transformation.
A DAG diagram of RDD is generated:
The division of job and stage of spark
Spark's Application division job is actually quite simple. An Application is divided into several job. We have to see how many Action operators there are in this Application, and one Action operator corresponds to a job. This can be seen from the source code. The conversion operator forms one or more RDD, while the Action operator triggers the submission of the job.
For example, the above map transformation operator is like this.
The Action operator goes like this:
Submit the job through the runJob method. The division of stage is based on whether or not to conduct the shuflle process, which will be discussed later.
Scheduling of DAGScheduler of spark
When we submit jobs to the spark cluster through the client, if the resource manager is yarn, then the client submits to spark the machine that applies to run the driver process. In fact, driver has no specific class in spark. The driver machine is mainly used to run the code written by the user, complete DAGScheduler and TaskSchedule, and track the running status of task. Remember that the main function written by the user runs in driver, but the RDD conversion and execution are done on different machines. In fact, driver is mainly responsible for job scheduling and distribution. The partition from Action operator to stage and the completion process of DAGScheduler.
When we run the user-defined main function in the driver process, we first create the SparkContext object, which is the entry point for our interaction with the spark cluster. It initializes many environments needed to run, most importantly, DAGScheduler and TaskSchedule.
We use such a RDD logic execution diagram to analyze the whole DAGScheduler process.
Since DAGScheduler occurs in the driver process, we start by running the user-defined main function against the Driver process. In the figure above, RDD9 is the last RDD and calls the Action operator, which triggers the submission of the job. It calls the runjob function of SparkContext, which is encapsulated by a series of runJob and calls the runJob of DAGScheduler.
There are runJob methods in SparkContext.
-
Def runJob [T, U: ClassTag] (
Rdd: RDD [T], / / rdd executes the RDD9 in the diagram for the RDD logic mentioned above
Func: (TaskContext, Iterator [T]) = > U. This method is also the function passed in by RDD9 calling the Action operator.
Partitions: Seq [Int]
ResultHandler: (Int, U) = > Unit): Unit = {
If (stopped.get ()) {
Throw new IllegalStateException ("SparkContext has been shutdown")
}
Val callSite = getCallSite
Val cleanedFunc = clean (func)
LogInfo ("Starting job:" + callSite.shortForm)
If (conf.getBoolean ("spark.logLineage", false)) {
LogInfo ("RDD's recursive dependencies:\ n" + rdd.toDebugString)
}
DagScheduler.runJob (rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
ProgressBar.foreach (_ .finishAll ())
Rdd.doCheckpoint ()
}
-
RunJob of DAGScheduler
-
Def runJob [T, U] (
Rdd: RDD [T], / / RDD9
Func: (TaskContext, Iterator [T]) = > U
Partitions: Seq [Int]
CallSite: CallSite
ResultHandler: (Int, U) = > Unit
Properties: Properties): Unit = {
Val start = System.nanoTime
/ / A job daemon waiter is generated here, which is used to wait for whether the job submission execution is completed. It calls submitJob, which is the following generation
/ / the codes are all log log information used to locate the running results.
Val waiter = submitJob (rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady (waiter.completionFuture, Duration.Inf)
Waiter.completionFuture.value.get match {
Case scala.util.Success (_) = >
LogInfo ("Job% d finished:% s, took% f s" .format
(waiter.jobId, callSite.shortForm, (System.nanoTime-start) / 1e9))
Case scala.util.Failure (exception) = >
LogInfo ("Job% d failed:% s, took% f s" .format
(waiter.jobId, callSite.shortForm, (System.nanoTime-start) / 1e9))
/ / SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
Val callerStackTrace = Thread.currentThread () .getStackTrace.tail
Exception.setStackTrace (exception.getStackTrace + + callerStackTrace)
Throw exception
}
}
-
SubmitJob source code
-
Def submitJob [T, U] (
Rdd: RDD [T]
Func: (TaskContext, Iterator [T]) = > U
Partitions: Seq [Int]
CallSite: CallSite
ResultHandler: (Int, U) = > Unit
Properties: Properties): JobWaiter [U] = {
/ / check whether the partition of RDD is legal
Val maxPartitions = rdd.partitions.length
Partitions.find (p = > p > = maxPartitions | | p
< 0).foreach { p =>Throw new IllegalArgumentException (
Attempting to access a non-existent partition: "+ p +". "+
"Total number of partitions:" + maxPartitions)
}
Val jobId = nextJobId.getAndIncrement ()
If (partitions.size = = 0) {
/ / Return immediately if the job is running 0 tasks
Return new JobWaiter [U] (this, jobId, 0, resultHandler)
}
Assert (partitions.size > 0)
/ / this block encapsulates our job into JobSubmitted, and then puts it into a process pool. Spark will start a thread to process me.
/ / homework submitted by us
Val func2 = func.asInstanceOf [(TaskContext, Iterator []) = >]
Val waiter = new JobWaiter (this, jobId, partitions.size, resultHandler)
EventProcessLoop.post (JobSubmitted (
JobId, rdd, func2, partitions.toArray, callSite, waiter
SerializationUtils.clone (properties)
Waiter
}
-
There is a DAGSchedulerEventProcessLoop class in the DAGScheduler class that receives message events that handle DAGScheduler
JobSubmitted object, so the first operation, handleJobSubmitted, is performed. Here we will talk about the type of Stage. There are two types of stage in spark: one is ShuffleMapStage, and the other is ResultStage. The Stage corresponding to the last RDD is ResultStage. The RDD that encounters the Shuffle process is called ShuffleMapStage.
-
Private [scheduler] def handleJobSubmitted (jobId: Int
FinalRDD: RDD [], / / corresponding RDD9
Func: (TaskContext, Iterator []) = > _
Partitions: Array [Int]
CallSite: CallSite
Listener: JobListener
Properties: Properties) {
Var finalStage: ResultStage = null
Try {
/ / create a ResultStage first.
FinalStage = createResultStage (finalRDD, func, partitions, jobId, callSite)
} catch {
Case e: Exception = >
LogWarning ("Creating new stage failed due to exception-job:" + jobId, e)
Listener.jobFailed (e)
Return
}
Val job = new ActiveJob (jobId, finalStage, callSite, listener, properties)
ClearCacheLocs ()
LogInfo ("Got job s (s) with d output partitions" .format (
Job.jobId, callSite.shortForm, partitions.length))
LogInfo ("Final stage:" + finalStage + "(" + finalStage.name + ")")
LogInfo ("Parents of final stage:" + finalStage.parents)
LogInfo ("Missing parents:" + getMissingParentStages (finalStage))
Val jobSubmissionTime = clock.getTimeMillis ()
JobIdToActiveJob (jobId) = job
ActiveJobs + = job
FinalStage.setActiveJob (job)
Val stageIds = jobIdToStageIds (jobId) .toArray
Val stageInfos = stageIds.flatMap (id = > stageIdToStage.get (id) .map (_ .latestInfo))
ListenerBus.post (
SparkListenerJobStart (job.jobId, jobSubmissionTime, stageInfos, properties))
SubmitStage (finalStage)
}
-
The createResultStage above is actually the process of converting RDD to Stage, as follows
-
/ *
When you create a ResultStage, it calls the relevant functions
, /
Private def createResultStage (
Rdd: RDD [], / / corresponds to the RDD9 of the above figure
Func: (TaskContext, Iterator []) = > _
Partitions: Array [Int]
JobId: Int
CallSite: CallSite): ResultStage = {
Val parents = getOrCreateParentStages (rdd, jobId)
Val id = nextStageId.getAndIncrement ()
Val stage = new ResultStage (id, rdd, func, partitions, parents, jobId, callSite)
StageIdToStage (id) = stage
UpdateJobIdStageIdMaps (jobId, stage)
Stage
}
/ * *
The parent Stage that forms the ResultStage dependency
, /
Private def getOrCreateParentStages (rdd: RDD [_], firstJobId: Int): List [Stage] = {
GetShuffleDependencies (rdd). Map {shuffleDep = >
GetOrCreateShuffleMapStage (shuffleDep, firstJobId)
}. ToList
}
/ * * using depth-first traversal to find the wide dependency in the parent dependency of the Action operator is the most important method. To understand this method, in fact, the latter is easy to understand. It is better to combine the RDD logical dependency graph given above with this example, than *
It is easy to see that according to the RDD logical dependency diagram above, the returned ShuffleDependency is the basis of RDD2 and RDD1,RDD7 and RDD6.
Lai, if there is A
/ / if it is not a shuffle dependency, press its parent RDD into the stack to be accessed, thus looping
WaitingForVisit.push (dependency.rdd)
}
}
}
Parents
}
/ create shuffleMapStage, and create two shuffleMapStage according to the two Shuffle objects obtained above
/
/
Def createShuffleMapStage (shuffleDep: ShuffleDependency [, _], jobId: Int): ShuffleMapStage = {
/ / this RDD is actually RDD1 and RDD6
Val rdd = shuffleDep.rdd
Val numTasks = rdd.partitions.length
Val parents = getOrCreateParentStages (rdd, jobId) / / check whether the Stage of the parent Shuffle exists in these two ShuffleMapStage
Val id = nextStageId.getAndIncrement ()
/ / create a ShuffleMapStage. The following is to update the status of SparkContext.
Val stage = new ShuffleMapStage (
Id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
StageIdToStage (id) = stage
ShuffleIdToMapStage (shuffleDep.shuffleId) = stage
UpdateJobIdStageIdMaps (jobId, stage)
If (! mapOutputTracker.containsShuffle (shuffleDep.shuffleId)) {
/ / Kind of ugly: need to register RDDs with the cache and map output tracker here
/ / since we can't do it in the RDD constructor because # of partitions is unknown
LogInfo ("Registering RDD" + rdd.id + "(" + rdd.getCreationSite + ")")
MapOutputTracker.registerShuffle (shuffleDep.shuffleId, rdd.partitions.length)
}
Stage
}
-
Through the above source code analysis, combined with the logical execution diagram of RDD, we can see that this job has three Stage, one ResultStage, two ShuffleMapStage, one RDD in ShuffleMapStage is RDD1, and the other RDD in stage is RDD6. Thus, the segmentation of RDD to Stage is completed above. When the sharding is complete, at the end of the handleJobSubmitted method, call the method that submits the stage.
The submitStage source code is relatively simple, it will check whether the parent stage on which our current stage depends has been executed, and if it does not complete, it will loop to its parent stage to wait for its parent stage execution to complete before submitting our current stage for execution.
-
Private def submitStage (stage: Stage) {
Val jobId = activeJobForStage (stage)
If (jobId.isDefined) {
LogDebug ("submitStage (" + stage + ")")
If (! waitingStages (stage) & &! runningStages (stage) & &! failedStages (stage)) {
Val missing = getMissingParentStages (stage) .sortBy (_ .id)
LogDebug ("missing:" + missing)
If (missing.isEmpty) {
LogInfo ("Submitting" + stage + "(" + stage.rdd + "), which has no missing parents")
SubmitMissingTasks (stage, jobId.get)
} else {
For (parent outputCommitCoordinator.stageStart (stage = s.id, maxPartitionId = s.numPartitions-1) case s: ResultStage = > outputCommitCoordinator.stageStart (stage = s.id, maxPartitionId = s.rdd.partitions.length-1)}
/ / get the specific location of each partition, that is, the machine on which the partition data is located in the cluster.
Val taskIdToLocations: Map [Int, Seq [TaskLocation]] = try {
Stage match {
Case s: ShuffleMapStage = >
PartitionsToCompute.map {id = > (id, getPreferredLocs (stage.rdd, id))} .toMap
Case s: ResultStage = >
PartitionsToCompute.map {id = >
Val p = s.partitions (id)
(id, getPreferredLocs (stage.rdd, p))
}. ToMap
}
} catch {
Case NonFatal (e) = >
Stage.makeNewStageAttempt (partitionsToCompute.size)
ListenerBus.post (SparkListenerStageSubmitted (stage.latestInfo, properties))
AbortStage (stage, s "Task creation failed: $e\ n$ {Utils.exceptionString (e)}", Some (e))
RunningStages-= stage
Return
}
/ / this re-encapsulates the partition to be calculated by the above stage and the corresponding physical location of each partition, and puts it in the latestInfo
Stage.makeNewStageAttempt (partitionsToCompute.size, taskIdToLocations.values.toSeq)
ListenerBus.post (SparkListenerStageSubmitted (stage.latestInfo, properties))
/ / serialize the information we have just obtained for transmission between driver machines and work machines
Var taskBinary: broadcast [Array [byte]] = null
Try {
/ / For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
/ / For ResultTask, serialize and broadcast (rdd, func).
Val taskBinaryBytes: Array [Byte] = stage match {
Case stage: ShuffleMapStage = >
JavaUtils.bufferToArray (
ClosureSerializer.serialize ((stage.rdd, stage.shuffleDep): AnyRef))
Case stage: ResultStage = >
JavaUtils.bufferToArray (closureSerializer.serialize ((stage.rdd, stage.func): AnyRef))
}
TaskBinary = sc.broadcast (taskBinaryBytes)} catch {/ / In the case of a failure during serialization, abort the stage. Case e: NotSerializableException = > abortStage (stage, "Task not serializable:" + e.toString, Some (e)) runningStages-= stage / / Abort execution return case NonFatal (e) = > abortStage (stage, s "Task serialization failed: $e\ n ${Utils.exceptionString (e)}", Some (e)) runningStages-= stage return}
/ / encapsulate stage to form a taskSet set. The task for ShuffleMapStage is ShuffleMapTask, and the taskSet for ResultStage is ResultTask.
Val tasks: Seq [Task [_]] = try {
Stage match {
Case stage: ShuffleMapStage = >
PartitionsToCompute.map {id = >
Val locs = taskIdToLocations (id)
Val part = stage.rdd.partitions (id)
New ShuffleMapTask (stage.id, stage.latestInfo.attemptId
TaskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option (jobId)
Option (sc.applicationId), sc.applicationAttemptId)
}
Case stage: ResultStage = > partitionsToCompute.map {id = > val p: Int = stage.partitions (id) val part = stage.rdd.partitions (p) val locs = taskIdToLocations (id) new ResultTask (stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option (jobId), Option (sc.applicationId), sc.applicationAttemptId)}}
} catch {
Case NonFatal (e) = >
AbortStage (stage, s "Task creation failed: $e\ n$ {Utils.exceptionString (e)}", Some (e))
RunningStages-= stage
Return
}
/ / submit task to TaskSchedule
If (tasks.size > 0) {
LogInfo ("Submitting" + tasks.size + "missing tasks from" + stage + "(" + stage.rdd + ")")
Stage.pendingPartitions + + = tasks.map (_ .partitionId)
LogDebug ("New pending partitions:" + stage.pendingPartitions)
TaskScheduler.submitTasks (new TaskSet (
Tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
Stage.latestInfo.submissionTime = Some (clock.getTimeMillis ())
} else {
/ / Because we posted SparkListenerStageSubmitted earlier, we should mark
/ / the stage as completed here in case there are no tasks to run
MarkStageAsFinished (stage, None)
Val debugString = stage match {case stage: ShuffleMapStage = > s "Stage ${stage} is actually done;" + s "(available: ${stage.isAvailable}," + s "available outputs: ${stage.numAvailableOutputs}," + s "partitions: ${stage.numPartitions})" case stage: ResultStage = > s "Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" logDebug (debugString) submitWaitingChildStages (stage)
}
}
-
At this point, the scheduling of the entire DAGScheduler is completed.
Scheduling of TaskSchedule of spark
Spark Task scheduling, we need to understand its scheduling process, it has different scheduling policies according to different resource managers, so it also has different scheduling daemons, this daemon manages the cluster resource information, spark provides a basic daemon class to complete the interaction with driver and executor: CoarseGrainedSchedulerBackend, it should run on the cluster resource manager, such as yarn and so on. He collects general resource information about cluster work machines. When we form a tasks to schedule, the driver process will communicate with it, request the allocation and scheduling of resources, and assign the optimal work node to the task to perform its tasks. On the other hand, TaskScheduleImpl implements the process of task scheduling, and the scheduling algorithm adopted is the FIFO strategy by default, or the fair scheduling strategy can be adopted.
When we submit the task, it creates a class TaskSetManager that manages the task and then adds it to the task scheduling pool.
-
Override def submitTasks (taskSet: TaskSet) {
Val tasks = taskSet.tasks
LogInfo ("Adding task set" + taskSet.id + "with" + tasks.length + "tasks")
This.synchronized {
/ / create a taskSetManager. Update the status below
Val manager = createTaskSetManager (taskSet, maxTaskFailures)
Val stage = taskSet.stageId
Val stageTaskSets =
TaskSetsByStageIdAndAttempt.getOrElseUpdate (stage, new HashMap [Int, TaskSetManager])
StageTaskSets (taskSet.stageAttemptId) = manager
Val conflictingTaskSet = stageTaskSets.exists {case (, ts) = >
Ts.taskSet! = taskSet & &! ts.isZombie
}
If (conflictingTaskSet) {
Throw new IllegalStateException (s "more than one active taskSet for stage $stage:" +
S "${stageTaskSets.toSeq.map {. _ 2.taskSet.id} .mkString (", ")}")
}
/ / add the encapsulated taskSet to the task scheduling queue.
SchedulableBuilder.addTaskSetManager (manager, manager.taskSet.properties)
If (! isLocal & &! hasReceivedTask) {starvationTimer.scheduleAtFixedRate (new TimerTask () {override def run (! hasLaunchedTask) {logWarning ("Initial job has not accepted any resources;" + "check your cluster UI to ensure that workers are registered" + "and have sufficient resources")} else {this.cancel ()}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)} hasReceivedTask = true
}
/ / this place is to send a request to the resource manager to schedule the task.
Backend.reviveOffers ()
}
/ *
* this method is located in the CoarseGrainedSchedulerBackend class, and the driver process sends a request for resources to the cluster manager.
/
Override def reviveOffers () {
DriverEndpoint.send (ReviveOffers)
}
-
When it receives this request, it calls such a method.
-
Override def receive: PartialFunction [Any, Unit] = {
Case StatusUpdate (executorId, taskId, state, data) = >
Scheduler.statusUpdate (taskId, state, data.value)
If (TaskState.isFinished (state)) {
ExecutorDataMap.get (executorId) match {
Case Some (executorInfo) = >
ExecutorInfo.freeCores + = scheduler.CPUS_PER_TASK
MakeOffers (executorId)
Case None = >
/ / Ignoring the update since we don't know about the executor.
LogWarning (s "Ignored task status update ($taskId state $state)" +
S "from unknown executor with ID $executorId")
}
}
/ / the request sent meets this condition
Case ReviveOffers = >
MakeOffers ()
Case KillTask (taskId, executorId, interruptThread) = >
ExecutorDataMap.get (executorId) match {
Case Some (executorInfo) = >
ExecutorInfo.executorEndpoint.send (KillTask (taskId, executorId, interruptThread))
Case None = >
/ / Ignoring the task kill since the executor is not registered.
LogWarning (s "Attempted to kill task $taskId for unknown executor $executorId.")
}
}
/ *
* this method is to collect information about the machines that are still alive on the cluster. And encapsulated into a WorkerOffer class
It then calls the resourceOffers method in TaskSchedulerImpl to filter out the machines that match the requested resources to perform our current task.
/
Private def makeOffers () {
/ / Filter out executors under killing
Val activeExecutors = executorDataMap.filterKeys (executorIsAlive)
Val workOffers = activeExecutors.map {case (id, executorData) = >
New WorkerOffer (id, executorData.executorHost, executorData.freeCores)
}. ToIndexedSeq
LaunchTasks (scheduler.resourceOffers (workOffers))
}
/ *
After we get the information about the idle machines in the cluster, we use this method to filter out the machines that meet the requirements of our task, and then return the TaskDescription class
* this class encapsulates information about task and excutor
/
Def resourceOffers (offers: IndexedSeq [WorkerOffer]): Seq [Seq [TaskDescription]] = synchronized {
/ / Mark each slave as alive and remember its hostname
/ / Also track if new executor is added
Var newExecAvail = false
/ / check whether work already exists, and add those that do not exist to the work scheduling pool
For (o o.cores) .toArray
/ / extract the tasks that need to be executed according to our scheduling algorithm from the task task scheduling pool
Val sortedTaskSets = rootPool.getSortedTaskSetQueue
For (taskSet
LogInfo ("Successfully registered with driver")
Try {
Executor = new Executor (executorId, hostname, env, userClassPath, isLocal = false)
} catch {
Case NonFatal (e) = >
ExitExecutor (1, "Unable to create executor due to" + e.getMessage, e)
}
Case RegisterExecutorFailed (message) = >
ExitExecutor (1, "Slave registration failed:" + message)
/ / perform this operation when the task is submitted.
Case LaunchTask (data) = >
If (executor = = null) {
ExitExecutor (1, "Received LaunchTask command but executor was null")
} else {
/ / deserialize first
Val taskDesc = ser.deserializeTaskDescription
LogInfo ("Got assigned task" + taskDesc.taskId)
/ / then perform the launchTask operation.
Executor.launchTask (this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber
TaskDesc.name, taskDesc.serializedTask)
}
Case KillTask (taskId, _, interruptThread) = >
If (executor = = null) {
ExitExecutor (1, "Received KillTask command but executor was null")
} else {
Executor.killTask (taskId, interruptThread)
}
Case StopExecutor = >
Stopping.set (true)
LogInfo ("Driver commanded a shutdown")
/ / Cannot shutdown here because an ack may need to be sent back to the caller. So send
/ / a message to self to actually do the shutdown.
Self.send (Shutdown)
Case Shutdown = >
Stopping.set (true)
New Thread ("CoarseGrainedExecutorBackend-stop-executor") {
Override def run (): Unit = {
/ / executor.stop () will call SparkEnv.stop () which waits until RpcEnv stops totally.
/ / However, if executor.stop () runs in some thread of RpcEnv, RpcEnv won't be able to
/ / stop until executor.stop () returns, which becomes a dead-lock (See SPARK-14180).
/ / Therefore, we put this line in a new thread.
Executor.stop ()
}
}. Start ()
}
-
The relevant source code of Executor, from the source code, we can see that for Task, it creates a thread of TaskRunner and puts it into the execution queue for execution.
-
Def launchTask (
Context: ExecutorBackend
TaskId: Long
AttemptNumber: Int
TaskName: String
SerializedTask: ByteBuffer): Unit = {
Val tr = new TaskRunner (context, taskId = taskId, attemptNumber = attemptNumber, taskName)
SerializedTask)
RunningTasks.put (taskId, tr)
ThreadPool.execute (tr)
}
-
As you can see below, it defines a thread, so let's take a look at the thread's run method.
-
Override def run (): Unit = {
/ / initialize some environments needed for threads to run
Val threadMXBean = ManagementFactory.getThreadMXBean
Val taskMemoryManager = new TaskMemoryManager (env.memoryManager, taskId)
Val deserializeStartTime = System.currentTimeMillis ()
Val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
ThreadMXBean.getCurrentThreadCpuTime
} else 0L
/ / get the class loader for the current process
Thread.currentThread.setContextClassLoader (replClassLoader)
Val ser = env.closureSerializer.newInstance ()
LogInfo (s "Running $taskName (TID $taskId)")
/ / update related status
ExecBackend.statusUpdate (taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
Var taskStart: Long = 0
Var taskStartCpu: Long = 0
StartGCTime = computeTotalGcTime ()
Try {
/ / deserialize the class-related dependencies and get the relevant parameters
Val (taskFiles, taskJars, taskProps, taskBytes) =
Task.deserializeWithDependencies (serializedTask)
/ / Must be set before updateDependencies () is called, in case fetching dependencies / / requires access to properties contained within (e.g. For access control). Executor.taskDeserializationProps.set (taskProps)
/ / Update dependency configuration
UpdateDependencies (taskFiles, taskJars)
Task = ser.roomialise [Task [any]] (taskBytes, Thread.currentThread.getContextClassLoader)
Task.localProperties = taskProps
Task.setTaskMemoryManager (taskMemoryManager)
/ / If this task has been killed before we deserialized it, let's quit now. Otherwise, / / continue executing the task. If (killed) {/ / Throw an exception rather than returning, because returning within a try {} block / / causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl / / exception will be caught by the catch block, leading to an incorrect ExceptionFailure / / for the task. Throw new TaskKilledException} logDebug ("Task" + taskId + "'s epoch is" + task.epoch)
/ / track the location of cached data
Env.mapOutputTracker.updateEpoch (task.epoch)
/ / Run the actual task and measure its runtime. TaskStart = System.currentTimeMillis () taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L var threwException = true
/ / run the task's run method to run task, mainly the following task.run method, which calls the runTask method to actually execute task. As we mentioned earlier, job changes
/ / there are two types of stage, ShuffleMapStage and ResultStage, so there are also two corresponding Task:ShuffleMapTask and ResultTask, different task types that execute different run methods.
Val value = try {
Val res = task.run (
TaskAttemptId = taskId
AttemptNumber = attemptNumber
MetricsSystem = env.metricsSystem)
ThrewException = false
Res
} finally {
/ / the following is to make some judgments and type out the log based on the running results above.
Val releasedLocks = env.blockManager.releaseAllLocksForTask (taskId)
Val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory ()
If (freedMemory > 0 & &! threwException) {val errMsg = s "Managed memory leak detected Size = $freedMemory bytes, TID = $taskId "if (conf.getBoolean (" spark.unsafe.exceptionOnMemoryLeak ", false)) {throw new SparkException (errMsg)} else {logWarning (errMsg)}} if (releasedLocks.nonEmpty & &! threwException) {val errMsg = s" ${releasedLocks.size} block locks were not released by TID = $taskId:\ n "+ releasedLocks.mkString (" [",", " ) if (conf.getBoolean ("spark.storage.exceptionOnPinLeak", false)) {throw new SparkException (errMsg)} else {logWarning (errMsg)}} val taskFinish = System.currentTimeMillis () val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L / / If the task has been killed, let's fail it. If (task.killed) {throw new TaskKilledException} val resultSer = env.serializer.newInstance () val beforeSerialization = System.currentTimeMillis () val valueBytes = resultSer.serialize (value) val afterSerialization = System.currentTimeMillis () / / Deserialization happens in two parts: first, we deserialize a Task object, which / / includes the Partition. Second, Task.run () deserializes the RDD and function to be run. Task.metrics.setExecutorDeserializeTime ((taskStart-deserializeStartTime) + task.executorDeserializeTime) task.metrics.setExecutorDeserializeCpuTime ((taskStartCpu-deserializeStartCpuTime) + task.executorDeserializeCpuTime) / / We need to subtract Task.run ()'s deserialization time to avoid double-counting task.metrics.setExecutorRunTime ((taskFinish-taskStart)-task.executorDeserializeTime) task.metrics.setExecutorCpuTime ((taskFinishCpu-taskStartCpu)-task.executorDeserializeCpuTime) task.metrics.setJvmGCTime (computeTotalGcTime ()-startGCTime) task.metrics.setResultSerializationTime (afterSerialization-beforeSerialization) / / Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates () / / TODO: do not serialize value twiceval directResult = new DirectTaskResult (valueBytes AccumUpdates) val serializedDirectResult = ser.serialize (directResult) val resultSize = serializedDirectResult.limit / / directSend = sending directly back to the driver val serializedResult: ByteBuffer = {if (maxResultSize > 0 & & resultSize > maxResultSize) {logWarning (s "Finished $taskName (TID $taskId). Result is larger than maxResultSize "+ s" (${Utils.bytesToString (resultSize)} > ${Utils.bytesToString (maxResultSize)}), "+ s" dropping it. ") Ser.serialize (new IndirectTaskResult [Any] (TaskResultBlockId (taskId), resultSize)} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId (taskId) env.blockManager.putBytes (blockId, new ChunkedByteBuffer (serializedDirectResult.duplicate ()), StorageLevel.MEMORY_AND_DISK_SER) logInfo (s "Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager) ") ser.serialize (new IndirectTaskResult [Any] (blockId, resultSize))} else {logInfo (s" Finished $taskName (TID $taskId). $resultSize bytes result sent to driver ") serializedDirectResult}} execBackend.statusUpdate (taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException = > val reason = ffe.toTaskFailedReason setTaskFinishedAndClearInterruptStatus () execBackend.statusUpdate (taskId, TaskState.FAILED, ser.serialize (reason)) case _: TaskKilledException = > logInfo (s" Executor killed $taskName (TID $taskId) ") setTaskFinishedAndClearInterruptStatus () execBackend.statusUpdate (taskId, TaskState.KILLED Ser.serialize (TaskKilled)) case _: InterruptedException if task.killed = > logInfo (s "Executor interrupted and killed $taskName (TID $taskId)") setTaskFinishedAndClearInterruptStatus () execBackend.statusUpdate (taskId, TaskState.KILLED, ser.serialize (TaskKilled)) case CausedBy (cDE: CommitDeniedException) = > val reason = cDE.toTaskFailedReason setTaskFinishedAndClearInterruptStatus () execBackend.statusUpdate (taskId, TaskState.FAILED, ser.serialize (reason)) case t: Throwable = > / / Attempt to exit cleanly by informing the driver of our failure. / / If anything goes wrong (or this was a fatal exception), we will delegate to / / the default uncaught exception handler, which will terminate the Executor. LogError (s "Exception in $taskName (TID $taskId)", t) / / Collect latest accumulator values to report back to the driver val accums: Seq [AccumulatorV2 [_ = if (task! = null) {task.metrics.setExecutorRunTime (System.currentTimeMillis ()-taskStart) task.metrics.setJvmGCTime (computeTotalGcTime ()-startGCTime) task.collectAccumulatorUpdates (taskFailed = true)} else {Seq.empty} val accUpdates = accums.map (acc = > acc.toInfo (Some (acc.value), None)) val serializedTaskEndReason = {try {ser.serialize (new ExceptionFailure (t)) AccUpdates) .withAccums (accums)} catch {case _: NotSerializableException = > / / t is not serializable so just send the stacktrace ser.serialize (new ExceptionFailure (t, accUpdates, false). WithAccums (accums)}} setTaskFinishedAndClearInterruptStatus () execBackend.statusUpdate (taskId, TaskState.FAILED, serializedTaskEndReason) / / Don't forcibly exit unless the exception was inherently fatal, to avoid / / stopping other tasks unnecessarily. If (Utils.isFatalError (t)) {SparkUncaughtExceptionHandler.uncaughtException (t)}} finally {runningTasks.remove (taskId)}
}
}
-
As we mentioned earlier, there are two ways to change job into stage, ShuffleMapStage and ResultStage, then there are also two Task:ShuffleMapTask and
ResultTask, different task types, execute different Task.runTask methods. The method of runTask is called in the Task.run method, which is overridden in both Task classes above.
RunTask method of ShuffleMapTask
-
Override def runTask (context: TaskContext): MapStatus = {
/ / Deserialize the RDD using the broadcast variable.
/ / do some initialization operations first
Val threadMXBean = ManagementFactory.getThreadMXBean
Val deserializeStartTime = System.currentTimeMillis ()
Val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
ThreadMXBean.getCurrentThreadCpuTime
} else 0L
Val ser = SparkEnv.get.closureSerializer.newInstance ()
/ / deserialization, the rdd here, is actually the last rdd before we do shuffle, which we talked about earlier.
Val (rdd, dep) = ser.deserialize [(RDD [], ShuffleDependency [,])] (
ByteBuffer.wrap (taskBinary.value), Thread.currentThread.getContextClassLoader)
_ executorDeserializeTime = System.currentTimeMillis ()-deserializeStartTime
ExecutorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
ThreadMXBean.getCurrentThreadCpuTime-deserializeStartCpuTime
} else 0L
/ / the following is to write the last rdd of the stage before each shuffle, but we do not see that the task executes the function we wrote, nor does it show that it calls the compute function and the pipeline execution between the rdd. Looking down, it will expose these problems.
Var writer: ShuffleWriter [Any, Any] = null
Try {
Val manager = SparkEnv.get.shuffleManager
Writer = manager.getWriter [Any, Any] (dep.shuffleHandle, partitionId, context)
Writer.write (rdd.iterator (partition, context) .asInstanceOf [Iterator [
Try {
If (writer! = null) {
Writer.stop (success = false)
}
} catch {
Case e: Exception = >
Log.debug ("Could not stop writer", e)
}
Throw e
}
}
-
For the red part above, we will explain it in detail here. RDD forms a directed acyclic graph based on dependencies, and through the last RDD and its dependencies, we can reverse find all its corresponding parent classes. If there is no shuffle process, then it will form a pipeline, the advantage of forming a pipeline is that all the intermediate results of RDD do not need to be stored, but directly concatenate multiple functions defined by us, and the intermediate results do not need to be stored from input to output, saving time and space. At the same time, we also know that the intermediate results of RDD can be persisted to memory or hard disk, and spark can track this.
Through the above analysis, we can see that in executor
This is the beginning of our RDD backtracking. The execution of the shuffle process and ResultTask's runTask will be followed up later.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.