In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
SPARK's MAster Resource allocation algorithm (SPARK1.3)
Master scheduling is realized by schedule () method under the org.apache.spark.deploy.master package in the source code.
The steps are as follows:
First of all, determine whether the master is in alive status, and return if it is not alive, that is, only the active master will schedule resources, and standby master will not schedule resources.
Pass the worker of the alive state in the previously registered worker into the Random.shuffer method, which mainly disrupts the order of the worker and returns an array
Get the number of worker returned
Driver scheduling is performed with for loop. Only when yarn-cluster mode submission application is enabled will driver scheduling be carried out, because both yarn-client mode and standalone mode start driver on the submitted client side and do not need scheduling.
The for loop traverses the WaittingDrivers ArrayBuffer, which uses the while loop to determine if the worker of the alive is not traversed and the driver is started, then continue traversing
If the memory of this worker > = memory required by driver and CPU > = CPU required by driver, start driver and remove driver from the WaittingDrivers queue
The method to start driver is launchDriver, add driver to the internal cache of worker, add the remaining memory of worker, CPU minus the memory needed by driver, and CPU,worker is also added to the driver cache structure, then call the actor method of worker to send LaunchDriver message to worker, tell it to start driver, and then change the driver state to RUNNING
After driver starts, schedule application. There are two algorithms, spreadOutApps and non-spreadOutApps, which can be set in the SparkConf of the code ("spark.deploy.spreadOut", true). The default is true, and spreadoutApps is enabled.
For traverses the application in WaittingApps and uses the if guard to filter out the worker that is alive and can be used by application in the application,for loop that still needs to be allocated by CPU, and then sorts in reverse order by the number of remaining CPU (the worker that can be used by application must be available memory that is greater than or equal to the required memory needed by application minimum executor, and has not been enabled by application)
Put the number of application that needs to be allocated into an array, and then get the final number of CPU to be allocated = the minimum value of the total CPU of CPU and worker that application needs to allocate.
The while traverses the worker, and if the worker still has an allocable CPU, move the pointer to the next CPU, the total CPU-1 that needs to be allocated, and the CPU+1 assigned to this worker. Loop until the CPU is allocated. The result of this allocation algorithm is that the CPU of application is evenly distributed to each worker as much as possible, and the application runs on as many Node as possible.
After assigning CPU to worker, iterate through the worker assigned to CPU, in each application internal cache structure, add executor, create an executorDSC object, which encapsulates the number of CPU core allocated to this executor, then start executor on worker, and change the application state to RUNNING
If it is a non-spreadOutApps algorithm, on the contrary, allocate all the CPU of each worker first, and then allocate the CPU of the next worker.
The following is the core source code:
Private def schedule () {
/ *
* first determine whether master is in alive state.
, /
If (state! = RecoveryState.ALIVE) {return}
/ / First schedule drivers, they take strict precedence over applications
/ / Randomization helps balance drivers
/ / pass the worker (previously registered) of the alive status into the Random.shuffle method, and randomly disrupt the worker
Val shuffledAliveWorkers = Random.shuffle (workers.toSeq.filter (_ .state = = WorkerState.ALIVE))
/ / get the number of currently available worker (after random disruption)
Val numWorkersAlive = shuffledAliveWorkers.size
Var curPos = 0
/ *
* scheduling mechanism of driver, traversing the ArrayBuffer of waitingDrivers
* driver is registered only when submitted in yarn-cluster mode, causing driver to be scheduled because of standalone and yarn-client mode
* will launch driver locally without registering or scheduling
, /
For (driver = memory required by driver and free CPU of worker > = CPU required by driver
If (worker.memoryFree > = driver.desc.mem & & worker.coresFree > = driver.desc.cores) {
/ / start driver
LaunchDriver (worker, driver)
/ / and removed from the waitingDrivers queue via driver
WaitingDrivers-= driver
Launched = true
}
/ / pointer to the next worker
CurPos = (curPos + 1)% numWorkersAlive
}
}
/ / Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
/ / in the queue, then the second app, etc.
/ *
* scheduling mechanism of application
* the two algorithms can be set in sparkconf. The default is true (spreadOutApps algorithm).
, /
If (spreadOutApps) {
/ / Try to spread out each app among all the nodes, until it has all its cores
/ / traverse the application in waitingApps, and use the if guard to filter out the application that still needs to be allocated by CPU
For (app 0) {
/ / filter again the worker whose status is alive and can be used by application, and then sort them in reverse order according to the number of CPU remaining
/ / the worker that can be used by application must have available memory greater than the memory required by the minimum Executor of application, and has not been enabled by the application
Val usableWorkers = workers.toArray.filter (_ .state = = WorkerState.ALIVE)
.filter (canUse (app, _)) .filter (_ .coresFree) .reverse
/ / create an array to store the number of CPU to be allocated
Val numUsable = usableWorkers.length
Val assigned = new Array [Int] (numUsable) / / Number of cores to give on each node
/ / get how much CPU needs to be allocated, and take the minimum number of CPU and worker total CPU that need to be allocated for application
Var toAssign = math.min (app.coresLeft, usableWorkers.map (_ .coresFree) .sum)
Var pos = 0
While (toAssign > 0) {}
/ / Now that we've decided how many cores to give on each node, let's actually give them
/ / after assigning CPU to worker, traverse worker
For (pos 0) {
/ / first, add executor to each application internal cache structure
/ / create an executorDSC object that encapsulates the number of CPU core assigned to the executor
Val exec = app.addExecutor (usableWorkers (pos), assigned (pos))
/ / then start Executor on worker
LaunchExecutor (usableWorkers (pos), exec)
/ / set application status to RUNNING
App.state = ApplicationState.RUNNING
}
}
}
} else {
/ / Pack each app into as few nodes as possible until we've assigned all its cores
For (worker 0 & & worker.state = = WorkerState.ALIVE) {
For (app 0) {
If (canUse (app, worker)) {
Val coresToUse = math.min (worker.coresFree, app.coresLeft)
If (coresToUse > 0) {
Val exec = app.addExecutor (worker, coresToUse)
LaunchExecutor (worker, exec)
App.state = ApplicationState.RUNNING
}
}
}
}
}
}
/ / start of executor
Def launchExecutor (worker: WorkerInfo, exec: ExecutorDesc) {
LogInfo ("Launching executor" + exec.fullId + "on worker" + worker.id)
/ / add executor to the worker internal cache
Worker.addExecutor (exec)
/ / send LaunchExecutor messages to worker
Worker.actor! LaunchExecutor (masterUrl
Exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
/ / send an ExecutorAdded message to the application corresponding to executor
Exec.application.driver! ExecutorAdded (
Exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
/ / start of Driver
Def launchDriver (worker: WorkerInfo, driver: DriverInfo) {
LogInfo ("Launching driver" + driver.id + "on worker" + worker.id)
/ / add driver to worker's internal cache structure
/ / subtract the remaining memory of worker, CPU from the memory used by driver and CPU
Worker.addDriver (driver)
/ / worker is also added to the driver internal cache structure
Driver.worker = Some (worker)
/ / then call the actor method of worker to send a LaunchDriver message to worker and ask it to start driver
Worker.actor! LaunchDriver (driver.id, driver.desc)
/ / change driver status to RUNNING
Driver.state = DriverState.RUNNING
}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.