Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Good programmer big data shares Spark tasks and cluster startup process

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Good programmer big data shares Spark tasks and cluster startup process, and Spark cluster startup process

1. Call the start-all.sh script to start Master

After the 2.Master starts, the preStart method calls a timer, which periodically checks the timeout Worker and deletes it.

3. The startup script parses the slaves configuration file to find the appropriate node that starts Worker. Start starting Worker

After the 4.Worker service starts, call the preStart method to start registering with all Master

5.Master received the registration information sent by Worker, and Master began to save the registration information and send its URL response to Worker.

After receiving the URL of Master and updating it, 6.Worker starts to call a timer to send heartbeat information to Master on a regular basis.

Task submission process

The 1.Driver side starts the SaparkSubmit process through the spark-submit script, which creates a very important object (SparkContext) and starts sending messages to the Master

After receiving the sent message, 2.Master starts to generate the task information and puts the task information into a pair of columns.

3.Master filters out all valid Worker and sorts them by idle resources

4.Master begins to notify the valid Worker to get the task information and start the corresponding Executor

5.Worker starts Executor and reverse registers with Driver

6.Driver starts to send the generated task to the corresponding Executor,Executor to start the task.

Cluster startup process

1. First create the Master class

Import akka.actor. {Actor, ActorSystem, Props}

Import com.typesafe.config. {Config, ConfigFactory}

Import scala.collection.mutable

Import scala.concurrent.duration._

Class Master (val masterHost: String, val masterPort: Int) extends Actor {

/ / used to store the registration information of Worker

Val idToWorker = new mutable.HashMap [String, WorkerInfo] ()

/ / used to store Worker information

Val workers = new mutable.HashSet [WorkerInfo] ()

/ / timeout interval of Worker

Val checkInterval: Long = 15000

/ / Lifecycle method. After the constructor, the receive method is called only once before.

Override def preStart (): Unit = {

/ / start a timer to check for timed-out Worker

Import context.dispatcher

Context.system.scheduler.schedule (0 millis, checkInterval millis, self, CheckTimeOutWorker)

}

/ / after the preStart method, repeatedly call

Override def receive: Receive = {

/ / Worker-> Master

Case RegisterWorker (id, host, port, memory, cores) = > {

If (! idToWorker.contains (id)) {

Val workerInfo = new WorkerInfo (id, host, port, memory, cores)

IdToWorker + = (id-> workerInfo)

Workers + = workerInfo

Println ("a worker registered")

Sender! RegisteredWorker (s "akka.tcp://$ {Master.MASTER_SYSTEM}" +

S "@ ${masterHost}: ${masterPort} / user/$ {Master.MASTER_ACTOR}")

}

}

Case HeartBeat (workerId) = > {

/ / obtain the corresponding WorkerInfo through the passed workerId

Val workerInfo: WorkerInfo = idToWorker (workerId)

/ / get the current time

Val currentTime = System.currentTimeMillis ()

/ / update the last heartbeat time

WorkerInfo.lastHeartbeatTime = currentTime

}

Case CheckTimeOutWorker = > {

Val currentTime = System.currentTimeMillis ()

Val toRemove: mutable.HashSet [WorkerInfo] =

Workers.filter (w = > currentTime-w.lastHeartbeatTime > checkInterval)

/ / remove timed-out Worker from idToWorker and workers

ToRemove.foreach (deadWorker = > {

IdToWorker-= deadWorker.id

Workers-= deadWorker

})

Println (s "num of workers: ${workers.size}")

}

}

}

Object Master {

Val MASTER_SYSTEM = "MasterSystem"

Val MASTER_ACTOR = "Master"

Def main (args: Array [String]): Unit = {

Val host = args (0)

Val port = args (1) .toInt

Val configStr =

S ""

| | akka.actor.provider = "akka.remote.RemoteActorRefProvider" |

| | akka.remote.netty.tcp.hostname = "$host" |

| | akka.remote.netty.tcp.port = "$port" |

"" .stripMargin

/ / configure the configuration information required to create an Actor

Val config: Config = ConfigFactory.parseString (configStr)

/ / create an ActorSystem

Val actorSystem: ActorSystem = ActorSystem (MASTER_SYSTEM, config)

/ / create Actor with actorSystem instance

ActorSystem.actorOf (Props (new Master (host, port)), MASTER_ACTOR)

ActorSystem.awaitTermination ()

}

}

two。 Create RemoteMsg traits

Trait RemoteMsg extends Serializable {

}

/ / Master-> self (Master)

Case object CheckTimeOutWorker

/ / Worker-> Master

Case class RegisterWorker (id: String, host: String

Port: Int, memory: Int, cores: Int) extends RemoteMsg

/ / Master-> Worker

Case class RegisteredWorker (masterUrl: String) extends RemoteMsg

/ / Worker-> self

Case object SendHeartBeat

/ / Worker-> Master (HeartBeat)

Case class HeartBeat (workerId: String) extends RemoteMsg

3. Create a Worker class

Import java.util.UUID

Import akka.actor. {Actor, ActorRef, ActorSelection, ActorSystem, Props}

Import com.typesafe.config. {Config, ConfigFactory}

Import scala.concurrent.duration._

Class Worker (val host: String, val port: Int, val masterHost: String

Val masterPort: Int, val memory: Int, val cores: Int) extends Actor {

/ / generate a Worker ID

Val workerId = UUID.randomUUID () .toString

/ / used to store MasterURL

Var masterUrl: String = _

/ / heartbeat interval

Val heartBeat_interval: Long = 10000

/ / Actor of master

Var master: ActorSelection = _

Override def preStart () {

/ / obtain the Actor of Master

Master = context.actorSelection (s "akka.tcp://$ {Master.MASTER_SYSTEM}" +

S "@ ${masterHost}: ${masterPort} / user/$ {Master.MASTER_ACTOR}")

Master! RegisterWorker (workerId, host, port, memory, cores)

}

Override def receive: Receive = {

/ / Worker received the message of successful registration sent by Master (masterUrl)

Case RegisteredWorker (masterUrl) = > {

This.masterUrl = masterUrl

/ / start a timer to send a heartbeat to Master regularly

Import context.dispatcher

Context.system.scheduler.schedule (0 millis, heartBeat_interval millis, self, SendHeartBeat)

}

Case SendHeartBeat = > {

/ / send a heartbeat to Master

Master! HeartBeat (workerId)

}

}

}

Object Worker {

Val WORKER_SYSTEM = "WorkerSystem"

Val WORKER_ACTOR = "Worker"

Def main (args: Array [String]): Unit = {

Val host = args (0)

Val port = args (1) .toInt

Val masterHost = args (2)

Val masterPort = args (3) .toInt

Val memory = args (4) .toInt

Val cores = args (5) .toInt

Val configStr =

S ""

| | akka.actor.provider = "akka.remote.RemoteActorRefProvider" |

| | akka.remote.netty.tcp.hostname = "$host" |

| | akka.remote.netty.tcp.port = "$port" |

"" .stripMargin

/ / configure the configuration information required to create an Actor

Val config: Config = ConfigFactory.parseString (configStr)

/ / create an ActorSystem

Val actorSystem: ActorSystem = ActorSystem (WORKER_SYSTEM, config)

/ / create Actor with actorSystem instance

Val worker: ActorRef = actorSystem.actorOf (

Props (new Worker (host, port, masterHost, masterPort, memory, cores), WORKER_ACTOR)

ActorSystem.awaitTermination ()

}

}

4. Create initialization class

Class WorkerInfo (val id: String, val host: String, val port: Int

Val memory: Int, val cores: Int) {

/ / time to initialize the last heartbeat

Var lastHeartbeatTime: Long = _

}

5. Local testing requires passing parameters:

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report