In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Good programmer big data shares Spark tasks and cluster startup process, and Spark cluster startup process
1. Call the start-all.sh script to start Master
After the 2.Master starts, the preStart method calls a timer, which periodically checks the timeout Worker and deletes it.
3. The startup script parses the slaves configuration file to find the appropriate node that starts Worker. Start starting Worker
After the 4.Worker service starts, call the preStart method to start registering with all Master
5.Master received the registration information sent by Worker, and Master began to save the registration information and send its URL response to Worker.
After receiving the URL of Master and updating it, 6.Worker starts to call a timer to send heartbeat information to Master on a regular basis.
Task submission process
The 1.Driver side starts the SaparkSubmit process through the spark-submit script, which creates a very important object (SparkContext) and starts sending messages to the Master
After receiving the sent message, 2.Master starts to generate the task information and puts the task information into a pair of columns.
3.Master filters out all valid Worker and sorts them by idle resources
4.Master begins to notify the valid Worker to get the task information and start the corresponding Executor
5.Worker starts Executor and reverse registers with Driver
6.Driver starts to send the generated task to the corresponding Executor,Executor to start the task.
Cluster startup process
1. First create the Master class
Import akka.actor. {Actor, ActorSystem, Props}
Import com.typesafe.config. {Config, ConfigFactory}
Import scala.collection.mutable
Import scala.concurrent.duration._
Class Master (val masterHost: String, val masterPort: Int) extends Actor {
/ / used to store the registration information of Worker
Val idToWorker = new mutable.HashMap [String, WorkerInfo] ()
/ / used to store Worker information
Val workers = new mutable.HashSet [WorkerInfo] ()
/ / timeout interval of Worker
Val checkInterval: Long = 15000
/ / Lifecycle method. After the constructor, the receive method is called only once before.
Override def preStart (): Unit = {
/ / start a timer to check for timed-out Worker
Import context.dispatcher
Context.system.scheduler.schedule (0 millis, checkInterval millis, self, CheckTimeOutWorker)
}
/ / after the preStart method, repeatedly call
Override def receive: Receive = {
/ / Worker-> Master
Case RegisterWorker (id, host, port, memory, cores) = > {
If (! idToWorker.contains (id)) {
Val workerInfo = new WorkerInfo (id, host, port, memory, cores)
IdToWorker + = (id-> workerInfo)
Workers + = workerInfo
Println ("a worker registered")
Sender! RegisteredWorker (s "akka.tcp://$ {Master.MASTER_SYSTEM}" +
S "@ ${masterHost}: ${masterPort} / user/$ {Master.MASTER_ACTOR}")
}
}
Case HeartBeat (workerId) = > {
/ / obtain the corresponding WorkerInfo through the passed workerId
Val workerInfo: WorkerInfo = idToWorker (workerId)
/ / get the current time
Val currentTime = System.currentTimeMillis ()
/ / update the last heartbeat time
WorkerInfo.lastHeartbeatTime = currentTime
}
Case CheckTimeOutWorker = > {
Val currentTime = System.currentTimeMillis ()
Val toRemove: mutable.HashSet [WorkerInfo] =
Workers.filter (w = > currentTime-w.lastHeartbeatTime > checkInterval)
/ / remove timed-out Worker from idToWorker and workers
ToRemove.foreach (deadWorker = > {
IdToWorker-= deadWorker.id
Workers-= deadWorker
})
Println (s "num of workers: ${workers.size}")
}
}
}
Object Master {
Val MASTER_SYSTEM = "MasterSystem"
Val MASTER_ACTOR = "Master"
Def main (args: Array [String]): Unit = {
Val host = args (0)
Val port = args (1) .toInt
Val configStr =
S ""
| | akka.actor.provider = "akka.remote.RemoteActorRefProvider" |
| | akka.remote.netty.tcp.hostname = "$host" |
| | akka.remote.netty.tcp.port = "$port" |
"" .stripMargin
/ / configure the configuration information required to create an Actor
Val config: Config = ConfigFactory.parseString (configStr)
/ / create an ActorSystem
Val actorSystem: ActorSystem = ActorSystem (MASTER_SYSTEM, config)
/ / create Actor with actorSystem instance
ActorSystem.actorOf (Props (new Master (host, port)), MASTER_ACTOR)
ActorSystem.awaitTermination ()
}
}
two。 Create RemoteMsg traits
Trait RemoteMsg extends Serializable {
}
/ / Master-> self (Master)
Case object CheckTimeOutWorker
/ / Worker-> Master
Case class RegisterWorker (id: String, host: String
Port: Int, memory: Int, cores: Int) extends RemoteMsg
/ / Master-> Worker
Case class RegisteredWorker (masterUrl: String) extends RemoteMsg
/ / Worker-> self
Case object SendHeartBeat
/ / Worker-> Master (HeartBeat)
Case class HeartBeat (workerId: String) extends RemoteMsg
3. Create a Worker class
Import java.util.UUID
Import akka.actor. {Actor, ActorRef, ActorSelection, ActorSystem, Props}
Import com.typesafe.config. {Config, ConfigFactory}
Import scala.concurrent.duration._
Class Worker (val host: String, val port: Int, val masterHost: String
Val masterPort: Int, val memory: Int, val cores: Int) extends Actor {
/ / generate a Worker ID
Val workerId = UUID.randomUUID () .toString
/ / used to store MasterURL
Var masterUrl: String = _
/ / heartbeat interval
Val heartBeat_interval: Long = 10000
/ / Actor of master
Var master: ActorSelection = _
Override def preStart () {
/ / obtain the Actor of Master
Master = context.actorSelection (s "akka.tcp://$ {Master.MASTER_SYSTEM}" +
S "@ ${masterHost}: ${masterPort} / user/$ {Master.MASTER_ACTOR}")
Master! RegisterWorker (workerId, host, port, memory, cores)
}
Override def receive: Receive = {
/ / Worker received the message of successful registration sent by Master (masterUrl)
Case RegisteredWorker (masterUrl) = > {
This.masterUrl = masterUrl
/ / start a timer to send a heartbeat to Master regularly
Import context.dispatcher
Context.system.scheduler.schedule (0 millis, heartBeat_interval millis, self, SendHeartBeat)
}
Case SendHeartBeat = > {
/ / send a heartbeat to Master
Master! HeartBeat (workerId)
}
}
}
Object Worker {
Val WORKER_SYSTEM = "WorkerSystem"
Val WORKER_ACTOR = "Worker"
Def main (args: Array [String]): Unit = {
Val host = args (0)
Val port = args (1) .toInt
Val masterHost = args (2)
Val masterPort = args (3) .toInt
Val memory = args (4) .toInt
Val cores = args (5) .toInt
Val configStr =
S ""
| | akka.actor.provider = "akka.remote.RemoteActorRefProvider" |
| | akka.remote.netty.tcp.hostname = "$host" |
| | akka.remote.netty.tcp.port = "$port" |
"" .stripMargin
/ / configure the configuration information required to create an Actor
Val config: Config = ConfigFactory.parseString (configStr)
/ / create an ActorSystem
Val actorSystem: ActorSystem = ActorSystem (WORKER_SYSTEM, config)
/ / create Actor with actorSystem instance
Val worker: ActorRef = actorSystem.actorOf (
Props (new Worker (host, port, masterHost, masterPort, memory, cores), WORKER_ACTOR)
ActorSystem.awaitTermination ()
}
}
4. Create initialization class
Class WorkerInfo (val id: String, val host: String, val port: Int
Val memory: Int, val cores: Int) {
/ / time to initialize the last heartbeat
Var lastHeartbeatTime: Long = _
}
5. Local testing requires passing parameters:
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.