Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand Receiver startup and startup source code analysis

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Today, I will talk to you about how to understand Receiver startup and start source code analysis. Many people may not know much about it. In order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.

Why Receiver?

Receiver continuously receives data from external data sources and reports the data to the driver, so that we generate a different Job of the reporting data every other BatchDuration to perform RDD operations.

Receiver starts with the startup of the application.

Receiver and InputDStream correspond one to one.

RDD [Receiver] has only one Partition and one Receiver instance.

Spark Core does not know the particularity of RDD [receiver]. If it still schedules according to the Job corresponding to ordinary RDD, it is possible to start multiple Receiver on the same Executor, which will lead to load imbalance and Receiver startup failure.

The scheme that Receiver starts in Executor:

1. Start different Receiver by using different Partiton in RDD. Different Partiton represents different Receiver. At the execution level, it is a different Task. When each Task starts, Receiver is started.

The implementation of this method is simple and ingenious, but there are drawbacks that startup may fail, Receiver failure in the process of running, will lead to TaskRetry, if 3 failures will lead to Job failure, will lead to the failure of the entire Spark application. Because of the failure of Receiver, Job fails and cannot be fault-tolerant.

two。 The second approach is the one adopted by Spark Streaming.

In the start method of ReceiverTacker, the Rpc message communication body ReceiverTrackerEndpoint is instantiated before calling the

LaunchReceivers method.

/ * Start the endpoint and receiver execution thread. , /

Def start (): Unit = synchronized {

If (isTrackerStarted) {

Throw new SparkException ("ReceiverTracker already started")

}

If (! receiverInputStreams.isEmpty) {

Endpoint = ssc.env.rpcEnv.setupEndpoint (

"ReceiverTracker", new ReceiverTrackerEndpoint (ssc.env.rpcEnv))

If (! skipReceiverLaunch) launchReceivers ()

LogInfo ("ReceiverTracker started")

TrackerState = Started

}

}

In the launchReceivers method, a corresponding Receiver is obtained for each ReceiverInputStream, and then a StartAllReceivers message is sent. Receiver corresponds to a data source.

/ * *

* Get the receivers from the ReceiverInputDStreams, distributes them to the

* worker nodes as a parallel collection, and runs them.

, /

Private def launchReceivers (): Unit = {

Val receivers = receiverInputStreams.map (nis = > {

Val rcvr = nis.getReceiver ()

Rcvr.setReceiverId (nis.id)

Rcvr

})

RunDummySparkJob ()

LogInfo ("Starting" + receivers.length + "receivers")

Endpoint.send (StartAllReceivers (receivers))

}

After ReceiverTrackerEndpoint receives the StartAllReceivers message, it first finds out which Executor Receiver is running on, and then calls the startReceiver method.

Override def receive: PartialFunction [Any, Unit] = {

/ / Local messages

Case StartAllReceivers (receivers) = >

Val scheduledLocations = schedulingPolicy.scheduleReceivers (receivers, getExecutors)

For (receiver preferredLocations))

}

ReceiverRDD.setName (s "Receiver $receiverId")

Ssc.sparkContext.setJobDescription (s "Streaming job running receiver $receiverId")

Ssc.sparkContext.setCallSite (Option (ssc.getStartSite ()) .getOrElse (Utils.getCallSite ()

Val future = ssc.sparkContext.submitJob [Receiver [_], Unit, Unit] (

ReceiverRDD, startReceiverFunc, Seq (0), (_, _) = > Unit, ()

/ / We will keep restarting the receiver job until ReceiverTracker is stopped

Future.onComplete {

Case Success (_) = >

If (! shouldStartReceiver) {

OnReceiverJobFinish (receiverId)

} else {

LogInfo (s "Restarting Receiver $receiverId")

Self.send (RestartReceiver (receiver))

}

Case Failure (e) = >

If (! shouldStartReceiver) {

OnReceiverJobFinish (receiverId)

} else {

LogError ("Receiver has been stopped. Try to restart it.", e)

LogInfo (s "Restarting Receiver $receiverId")

Self.send (RestartReceiver (receiver))

}

} (submitJobThreadPool)

LogInfo (s "Receiver ${receiver.streamId} started")

}

After reading the above, do you have any further understanding of how to understand Receiver startup and start source code analysis? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report