Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Analysis of MAster Resource scheduling principle (Source Code) of SPARK

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

SPARK's MAster Resource allocation algorithm (SPARK1.3)

Master scheduling is realized by schedule () method under the org.apache.spark.deploy.master package in the source code.

The steps are as follows:

First of all, determine whether the master is in alive status, and return if it is not alive, that is, only the active master will schedule resources, and standby master will not schedule resources.

Pass the worker of the alive state in the previously registered worker into the Random.shuffer method, which mainly disrupts the order of the worker and returns an array

Get the number of worker returned

Driver scheduling is performed with for loop. Only when yarn-cluster mode submission application is enabled will driver scheduling be carried out, because both yarn-client mode and standalone mode start driver on the submitted client side and do not need scheduling.

The for loop traverses the WaittingDrivers ArrayBuffer, which uses the while loop to determine if the worker of the alive is not traversed and the driver is started, then continue traversing

If the memory of this worker > = memory required by driver and CPU > = CPU required by driver, start driver and remove driver from the WaittingDrivers queue

The method to start driver is launchDriver, add driver to the internal cache of worker, add the remaining memory of worker, CPU minus the memory needed by driver, and CPU,worker is also added to the driver cache structure, then call the actor method of worker to send LaunchDriver message to worker, tell it to start driver, and then change the driver state to RUNNING

After driver starts, schedule application. There are two algorithms, spreadOutApps and non-spreadOutApps, which can be set in the SparkConf of the code ("spark.deploy.spreadOut", true). The default is true, and spreadoutApps is enabled.

For traverses the application in WaittingApps and uses the if guard to filter out the worker that is alive and can be used by application in the application,for loop that still needs to be allocated by CPU, and then sorts in reverse order by the number of remaining CPU (the worker that can be used by application must be available memory that is greater than or equal to the required memory needed by application minimum executor, and has not been enabled by application)

Put the number of application that needs to be allocated into an array, and then get the final number of CPU to be allocated = the minimum value of the total CPU of CPU and worker that application needs to allocate.

The while traverses the worker, and if the worker still has an allocable CPU, move the pointer to the next CPU, the total CPU-1 that needs to be allocated, and the CPU+1 assigned to this worker. Loop until the CPU is allocated. The result of this allocation algorithm is that the CPU of application is evenly distributed to each worker as much as possible, and the application runs on as many Node as possible.

After assigning CPU to worker, iterate through the worker assigned to CPU, in each application internal cache structure, add executor, create an executorDSC object, which encapsulates the number of CPU core allocated to this executor, then start executor on worker, and change the application state to RUNNING

If it is a non-spreadOutApps algorithm, on the contrary, allocate all the CPU of each worker first, and then allocate the CPU of the next worker.

The following is the core source code:

Private def schedule () {

/ *

* first determine whether master is in alive state.

, /

If (state! = RecoveryState.ALIVE) {return}

/ / First schedule drivers, they take strict precedence over applications

/ / Randomization helps balance drivers

/ / pass the worker (previously registered) of the alive status into the Random.shuffle method, and randomly disrupt the worker

Val shuffledAliveWorkers = Random.shuffle (workers.toSeq.filter (_ .state = = WorkerState.ALIVE))

/ / get the number of currently available worker (after random disruption)

Val numWorkersAlive = shuffledAliveWorkers.size

Var curPos = 0

/ *

* scheduling mechanism of driver, traversing the ArrayBuffer of waitingDrivers

* driver is registered only when submitted in yarn-cluster mode, causing driver to be scheduled because of standalone and yarn-client mode

* will launch driver locally without registering or scheduling

, /

For (driver = memory required by driver and free CPU of worker > = CPU required by driver

If (worker.memoryFree > = driver.desc.mem & & worker.coresFree > = driver.desc.cores) {

/ / start driver

LaunchDriver (worker, driver)

/ / and removed from the waitingDrivers queue via driver

WaitingDrivers-= driver

Launched = true

}

/ / pointer to the next worker

CurPos = (curPos + 1)% numWorkersAlive

}

}

/ / Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

/ / in the queue, then the second app, etc.

/ *

* scheduling mechanism of application

* the two algorithms can be set in sparkconf. The default is true (spreadOutApps algorithm).

, /

If (spreadOutApps) {

/ / Try to spread out each app among all the nodes, until it has all its cores

/ / traverse the application in waitingApps, and use the if guard to filter out the application that still needs to be allocated by CPU

For (app 0) {

/ / filter again the worker whose status is alive and can be used by application, and then sort them in reverse order according to the number of CPU remaining

/ / the worker that can be used by application must have available memory greater than the memory required by the minimum Executor of application, and has not been enabled by the application

Val usableWorkers = workers.toArray.filter (_ .state = = WorkerState.ALIVE)

.filter (canUse (app, _)) .filter (_ .coresFree) .reverse

/ / create an array to store the number of CPU to be allocated

Val numUsable = usableWorkers.length

Val assigned = new Array [Int] (numUsable) / / Number of cores to give on each node

/ / get how much CPU needs to be allocated, and take the minimum number of CPU and worker total CPU that need to be allocated for application

Var toAssign = math.min (app.coresLeft, usableWorkers.map (_ .coresFree) .sum)

Var pos = 0

While (toAssign > 0) {}

/ / Now that we've decided how many cores to give on each node, let's actually give them

/ / after assigning CPU to worker, traverse worker

For (pos 0) {

/ / first, add executor to each application internal cache structure

/ / create an executorDSC object that encapsulates the number of CPU core assigned to the executor

Val exec = app.addExecutor (usableWorkers (pos), assigned (pos))

/ / then start Executor on worker

LaunchExecutor (usableWorkers (pos), exec)

/ / set application status to RUNNING

App.state = ApplicationState.RUNNING

}

}

}

} else {

/ / Pack each app into as few nodes as possible until we've assigned all its cores

For (worker 0 & & worker.state = = WorkerState.ALIVE) {

For (app 0) {

If (canUse (app, worker)) {

Val coresToUse = math.min (worker.coresFree, app.coresLeft)

If (coresToUse > 0) {

Val exec = app.addExecutor (worker, coresToUse)

LaunchExecutor (worker, exec)

App.state = ApplicationState.RUNNING

}

}

}

}

}

}

/ / start of executor

Def launchExecutor (worker: WorkerInfo, exec: ExecutorDesc) {

LogInfo ("Launching executor" + exec.fullId + "on worker" + worker.id)

/ / add executor to the worker internal cache

Worker.addExecutor (exec)

/ / send LaunchExecutor messages to worker

Worker.actor! LaunchExecutor (masterUrl

Exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)

/ / send an ExecutorAdded message to the application corresponding to executor

Exec.application.driver! ExecutorAdded (

Exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

}

/ / start of Driver

Def launchDriver (worker: WorkerInfo, driver: DriverInfo) {

LogInfo ("Launching driver" + driver.id + "on worker" + worker.id)

/ / add driver to worker's internal cache structure

/ / subtract the remaining memory of worker, CPU from the memory used by driver and CPU

Worker.addDriver (driver)

/ / worker is also added to the driver internal cache structure

Driver.worker = Some (worker)

/ / then call the actor method of worker to send a LaunchDriver message to worker and ask it to start driver

Worker.actor! LaunchDriver (driver.id, driver.desc)

/ / change driver status to RUNNING

Driver.state = DriverState.RUNNING

}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report