In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "how the Spark Driver startup process is". In the operation of actual cases, many people will encounter such a dilemma. Then let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
SparkContext.scala
Val (sched, ts) = SparkContext.createTaskScheduler (this, master, deployMode)
_ schedulerBackend = sched
_ taskScheduler = ts
_ dagScheduler = new DAGScheduler (this)
_ heartbeatReceiver.ask [Boolean] (TaskSchedulerIsSet)
/ / start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
/ / constructor
_ taskScheduler.start ()
Let's take a look at what exactly is done in SparkContext.createTaskScheduler.
Case SPARK_REGEX (sparkUrl) = >
Val scheduler = new TaskSchedulerImpl (sc)
Val masterUrls = sparkUrl.split (",") .map ("spark://" + _)
Val backend = new StandaloneSchedulerBackend (scheduler, sc, masterUrls)
Scheduler.initialize (backend)
(backend, scheduler)
We see that _ taskScheduler is an instance of TaskSchedulerImpl, _ schedulerBackend is an instance of StandaloneSchedulerBackend, and _ schedulerBackend is given to _ taskScheduler via scheduler.initialize.
Then let's take a look at exactly what _ taskScheduler.start () has done.
Override def start () {
Backend.start ()
If (! isLocal & & conf.getBoolean ("spark.speculation", false)) {
LogInfo ("Starting speculative execution thread")
SpeculationScheduler.scheduleAtFixedRate (new Runnable {
Override def run (): Unit = Utils.tryOrStopSparkContext (sc) {
CheckSpeculatableTasks ()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
We see that the first is the call to backend.start (), and we can find the implementation of start in StandaloneSchedulerBackend:
Override def start () {
Super.start ()
LauncherBackend.connect ()
/ / The endpoint for executors to talk to us
Val driverUrl = RpcEndpointAddress (
Sc.conf.get ("spark.driver.host")
Sc.conf.get ("spark.driver.port"). ToInt
CoarseGrainedSchedulerBackend.ENDPOINT_NAME). ToString
Val args = Seq (
"- driver-url", driverUrl
"--executor-id", "{{EXECUTOR_ID}}"
"--hostname", "{{HOSTNAME}}"
"--cores", "{{CORES}}"
"--app-id", "{{APP_ID}}"
"--worker-url", "{{WORKER_URL}}")
Val extraJavaOpts = sc.conf.getOption ("spark.executor.extraJavaOptions")
.map (Utils.splitCommandString) .getOrElse (Seq.empty)
Val classPathEntries = sc.conf.getOption ("spark.executor.extraClassPath")
.map (_ .split (java.io.File.pathSeparator) .toSeq) .getOrElse (Nil)
Val libraryPathEntries = sc.conf.getOption ("spark.executor.extraLibraryPath")
.map (_ .split (java.io.File.pathSeparator) .toSeq) .getOrElse (Nil)
/ / When testing, expose the parent class path to the child. This is processed by
/ / compute-classpath. {cmd,sh} and makes all needed jars available to child processes
/ / when the assembly is built with the "*-provided" profiles enabled.
Val testingClassPath =
If (sys.props.contains ("spark.testing")) {
Sys.props ("java.class.path") .split (java.io.File.pathSeparator) .toSeq
} else {
Nil
}
/ / Start executors with a few necessary configs for registering with the scheduler
Val sparkJavaOpts = Utils.sparkJavaOpts (conf, SparkConf.isExecutorStartupConf)
Val javaOpts = sparkJavaOpts + + extraJavaOpts
Val command = Command ("org.apache.spark.executor.CoarseGrainedExecutorBackend"
Args, sc.executorEnvs, classPathEntries + + testingClassPath, libraryPathEntries, javaOpts)
Val appUIAddress = sc.ui.map (_ .appUIAddress) .getOrElse ("")
Val coresPerExecutor = conf.getOption ("spark.executor.cores") .map (_ .toInt)
/ / If we're using dynamic allocation, set our initial executor limit to 0 for now.
/ / ExecutorAllocationManager will send the real initial limit to the Master later.
Val initialExecutorLimit =
If (Utils.isDynamicAllocationEnabled (conf)) {
Some (0)
} else {
None
}
Val appDesc = new ApplicationDescription (sc.appName, maxCores, sc.executorMemory, command
AppUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
Client = new StandaloneAppClient (sc.env.rpcEnv, masters, appDesc, this, conf)
Client.start ()
LauncherBackend.setState (SparkAppHandle.State.SUBMITTED)
WaitForRegistration ()
LauncherBackend.setState (SparkAppHandle.State.RUNNING)
}
Let's take a look at what has been done in client.start ():
Def start () {
/ / Just launch an rpcEndpoint; it will call back into the listener.
Endpoint.set (rpcEnv.setupEndpoint ("AppClient", new ClientEndpoint (rpcEnv)
}
Endpoint is an AtomicReference, and rpcEnv.setupEndpoint does two things, one is to register an endpoint, and the other is to return its ref. Where does start reflect here? I know I will definitely get into the start method of ClientEndpoint, but how on earth did I get in?
The following code we mentioned in the Rpc mechanism article, the red code part, at that time did not pay much attention, now it seems that every End Point registered in the Rpc Env, will automatically trigger its start event.
Def registerRpcEndpoint (name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
Val addr = RpcEndpointAddress (nettyEnv.address, name)
Val endpointRef = new NettyRpcEndpointRef (nettyEnv.conf, addr, nettyEnv)
Synchronized {
If (stopped) {
Throw new IllegalStateException ("RpcEnv has been stopped")
}
If (endpoints.putIfAbsent (name, new EndpointData (name, endpoint, endpointRef))! = null) {
Throw new IllegalArgumentException (s "There is already an RpcEndpoint called $name")
}
Val data = endpoints.get (name)
EndpointRefs.put (data.endpoint, data.ref)
Receivers.offer (data) / / for the OnStart message
}
EndpointRef
}
After that, let's find ClientEndpoint (an inner class of StandaloneAppClient) and look at its onStart method:
Override def onStart (): Unit = {
Try {
RegisterWithMaster (1)
} catch {
Case e: Exception = >
LogWarning ("Failed to connect to master", e)
MarkDisconnected ()
Stop ()
}
}
There is also a recursive call in registerWithMaster, but this recursion is for the retry service, so let's just look at tryRegisterAllMasters ().
Private def registerWithMaster (nthRetry: Int) {
RegisterMasterFutures.set (tryRegisterAllMasters ())
RegistrationRetryTimer.set (registrationRetryThread.schedule (new Runnable {
Override def run (): Unit = {
If (registered.get) {
RegisterMasterFutures.get.foreach (_ .vendor (true))
RegisterMasterThreadPool.shutdownNow ()
} else if (nthRetry > = REGISTRATION_RETRIES) {
MarkDead ("All masters are unresponsive! Giving up.")
} else {
RegisterMasterFutures.get.foreach (_ .vendor (true))
RegisterWithMaster (nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
Private def tryRegisterAllMasters (): array [JFutur [_]] = {
For (masterAddress / / Cancelled
Case NonFatal (e) = > logWarning (s "Failed to connect to master $masterAddress", e)
}
})
}
}
Red 2 lines of code, register End Point, and send a message. The master end point here should be an end point located on the spark cluster, the master node, and a remote end point compared to the Rpc Env on the driver.
When we find master.scala, first look at its class declaration:
Private [deploy] class Master (
Overrideval rpcEnv: RpcEnv
Address: RpcAddress
WebUiPort: Int
Val securityMgr: SecurityManager
Val conf: SparkConf)
Extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
Then find its receive method:
Override def receive: PartialFunction [Any, Unit]
Just look at one of the paragraphs:
Case RegisterApplication (description, driver) = >
/ / TODO Prevent repeated registrations from some driver
If (state = = RecoveryState.STANDBY) {
/ / ignore, don't send response
} else {
LogInfo ("Registering app" + description.name)
Val app = createApplication (description, driver)
RegisterApplication (app)
LogInfo ("Registered app" + description.name + "with ID" + app.id)
PersistenceEngine.addApplication (app)
Driver.send (RegisteredApplication (app.id, self))
Schedule ()
}
1. Create app,2, register app,3, persist app,4, send messages to endpoint of driver, 5, schedule ()
Step4, where driver comes with Rpc Message, you need to send a response to driver that registers app.
Let's go back to ClientEndpoint.receive.
Override def receive: PartialFunction [Any, Unit] = {
Case RegisteredApplication (appId_, masterRef) = >
/ / FIXME How to handle the following cases?
/ / 1. A master receives multiple registrations and sends back multiple
/ / RegisteredApplications due to an unstable network.
/ / 2. Receive multiple RegisteredApplication from different masters because the master is
/ / changing.
AppId.set (appId_)
Registered.set (true)
Master = Some (masterRef)
Listener.connected (appId.get)
Step5, let's take a look at what has been done:
Private def schedule (): Unit = {
If (state! = RecoveryState.ALIVE) {
Return
}
/ / Drivers take strict precedence over executors
Val shuffledAliveWorkers = Random.shuffle (workers.toSeq.filter (_ .state = = WorkerState.ALIVE))
Val numWorkersAlive = shuffledAliveWorkers.size
Var curPos = 0
For (driver = driver.desc.mem & & worker.coresFree > = driver.desc.cores) {
LaunchDriver (worker, driver)
WaitingDrivers-= driver
Launched = true
}
CurPos = (curPos + 1)% numWorkersAlive
}
}
StartExecutorsOnWorkers ()
}
LaunchDriver (worker, driver) We understand that starting a thread for the current driver on worder.
Take another look at startExecutorsOnWorkers ():
Private def startExecutorsOnWorkers (): Unit = {
/ / Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
/ / in the queue, then the second app, etc.
For (app 0) {
Val coresPerExecutor: Option [Int] = app.desc.coresPerExecutor
/ / Filter out workers that don't have enough resources to launch an executor
Val usableWorkers = workers.toArray.filter (_ .state = = WorkerState.ALIVE)
.filter (worker = > worker.memoryFree > = app.desc.memoryPerExecutorMB & &
Worker.coresFree > = coresPerExecutor.getOrElse (1))
.sortBy (_ .coresFree) .reverse
Val assignedCores = scheduleExecutorsOnWorkers (app, usableWorkers, spreadOutApps)
/ / Now that we've decided how many cores to allocate on each worker, let's allocate them
For (pos 0) {
AllocateWorkerResourceToExecutors (
App, assignedCores (pos), coresPerExecutor, usableWorkers (pos))
}
}
}
Private def allocateWorkerResourceToExecutors (
App: ApplicationInfo
AssignedCores: Int
CoresPerExecutor: Option [Int]
Worker: WorkerInfo): Unit = {
/ / If the number of cores per executor is specified, we divide the cores assigned
/ / to this worker evenly among the executors with no remainder.
/ / Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
Val numExecutors = coresPerExecutor.map {assignedCores / _} .getOrElse (1)
Val coresToAssign = coresPerExecutor.getOrElse (assignedCores)
For (I)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.