Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 11: thorough Research on the Design and implementation of ReceiverTracker Architecture in Driver for Spark Streaming Source Code interpretation

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

In the last lesson, we will see how Receiver continues to receive data, and the metadata of the received data will be reported to ReceiverTracker. Let's take a look at the specific functions and implementation of ReceiverTracker.

1. Main functions of ReceiverTracker:

Start Receivers on Executor.

Stop Receivers.

Update the rate at which Receiver receives data (that is, current limit)

Keep waiting for the running state of Receivers, and restart Receiver as soon as Receivers stops running. That is, the fault tolerance function of Receiver.

Accept registration with Receiver.

ReceivedBlockTracker is used to manage the metadata of data received by Receiver.

Report the error message sent by Receiver

ReceiverTracker manages a messaging body, ReceiverTrackerEndpoint, for message communication with Receiver or ReceiverTracker.

In the start method of ReceiverTracker, ReceiverTrackerEndpoint is instantiated and Receivers is started on Executor:

/ * Start the endpoint and receiver execution thread. * / def start (): Unit = synchronized {if (isTrackerStarted) {throw new SparkException ("ReceiverTracker already started")} if (! receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint ("ReceiverTracker", new ReceiverTrackerEndpoint (ssc.env.rpcEnv) if (! skipReceiverLaunch) launchReceivers () logInfo ("ReceiverTracker started") trackerState = Started}}

When you start Receivr, in fact, ReceiverTracker sends a local message to ReceiverTrackerEndpoint. ReceiverTrackerEndpoint encapsulates Receiver into RDD and submits it to the cluster to run in the form of job.

Endpoint.send (StartAllReceivers (receivers))

The endpoint here is a reference to ReceiverTrackerEndpoint.

After Receiver starts, it will register with ReceiverTracker, and the registration will not be officially started until the registration is successful.

Override protected def onReceiverStart (): Boolean = {val msg = RegisterReceiver (streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) trackerEndpoint.askWithRetry [Boolean] (msg)}

When the Receiver side receives the data, it is required to write the data to BlockManager when certain conditions are met, and the metadata of the data is reported to ReceiverTracker:

/ * * Store block and report it to driver * / def pushAndReportBlock (receivedBlock: ReceivedBlock, metadataOption: Option [Any], blockIdOption: Option [StreamBlockId]) {val blockId = blockIdOption.getOrElse (nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock (blockId, receivedBlock) logDebug (s "Pushed block $blockId in ${(System.currentTimeMillis-time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo (streamId, numRecords, metadataOption BlockStoreResult) trackerEndpoint.askWithRetry [Boolean] (AddBlock (blockInfo)) logDebug (s "Reported block $blockId")}

When ReceiverTracker receives the metadata, it starts a thread in the thread pool to write the data:

Case AddBlock (receivedBlockInfo) = > if (WriteAheadLogUtils.isBatchingEnabled (ssc.conf, isDriver = true)) {walBatchingThreadPool.execute (new Runnable {override def run (): Unit = Utils.tryLogNonFatalError {if (active) {context.reply (addBlock (receivedBlockInfo))} else {throw new IllegalStateException ("ReceiverTracker RpcEndpoint shut down.")} else {context.reply (addBlock (receivedBlockInfo))}

The metadata of the data is managed by ReceivedBlockTracker.

The data is eventually written to the streamIdToUnallocatedBlockQueues: a stream corresponds to a queue of block information.

Private type ReceivedBlockQueue = mutable. Queue [ReceivedBlockInfo] privateval streamIdToUnallocatedBlockQueues = new mutable.HashMap [Int, ReceivedBlockQueue]

Whenever Streaming triggers job, the data in the queue is allocated as a batch and the data is written to the timeToAllocatedBlocks data structure.

Privateval timeToAllocatedBlocks = new mutable.HashMap [Time, AllocatedBlocks].... def allocateBlocksToBatch (batchTime: Time): Unit = synchronized {if (lastAllocatedBatchTime = = null | | batchTime > lastAllocatedBatchTime) {val streamIdToBlocks = streamIds.map {streamId = > (streamId, getReceivedBlockQueue (streamId) .dequeueAll (x = > true))}. ToMap val allocatedBlocks = AllocatedBlocks (streamIdToBlocks) if (writeToLog (BatchAllocationEvent (batchTime, allocatedBlocks) {timeToAllocatedBlocks.put (batchTime) AllocatedBlocks) lastAllocatedBatchTime = batchTime} else {logInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")}} else {/ / This situation occurs when: / / 1.WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, / / possibly processed batch job or half-processed batch job need to be processed again, / / so the batchTime will be equal to lastAllocatedBatchTime. / / 2. Slow checkpointing makes recovered batch time older than WAL recovered / / lastAllocatedBatchTime. / / This situation will only occurs in recovery time. LogInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")}}

It can be seen that a batch will contain data from multiple streams.

Every time a job of Streaming finishes running:

Private def handleJobCompletion (job: Job CompletedTime: Long) {val jobSet = jobSets.get (job.time) jobSet.handleJobCompletion (job) job.setEndTime (completedTime) listenerBus.post (StreamingListenerOutputOperationCompleted (job.toOutputOperationInfo)) logInfo ("Finished job" + job.id + "from job set of time" + jobSet.time) if (jobSet.hasCompleted) {jobSets.remove (jobSet.time) jobGenerator.onBatchCompletion (jobSet.time) logInfo ("Total delay:% .3f s for time% s (execution:%. 3f s) ".format (jobSet.totalDelay / 1000.0) JobSet.time.toString, jobSet.processingDelay / 1000.0) listenerBus.post (StreamingListenerBatchCompleted (jobSet.toBatchInfo))}.

JobScheduler calls the handleJobCompletion method, which eventually triggers

JobScheduler.receiverTracker.cleanupOldBlocksAndBatches (time-maxRememberDuration)

The maxRememberDuration here is the maximum retention time of the RDD generated at each time in the DStream.

Def cleanupOldBatches (cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {require (cleanupThreshTime.milliseconds

< clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) if (writeToLog(BatchCleanupEvent(timesToCleanup))) { timeToAllocatedBlocks --= timesToCleanup writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) } else { logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.") }} 而最后 listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) 这个代码会调用 case batchCompleted: StreamingListenerBatchCompleted =>

Listener.onBatchCompleted (batchCompleted)... All the way down. / A RateController that sends the new rate to receivers, via the receiver tracker. * / private [streaming] class ReceiverRateController (id: Int, estimator: RateEstimator) extends RateController (id, estimator) {override def publish (rate: Long): Unit = ssc.scheduler.receiverTracker.sendRateUpdate (id, rate)} / * * Update a receiver's maximum ingestion rate * / def sendRateUpdate (streamUID: Int, newRate: Long): Unit = synchronized {if (isTrackerStarted) {endpoint.send (UpdateReceiverRateLimit (streamUID, newRate))} case UpdateReceiverRateLimit (streamUID, newRate) = > for (info bg.updateRate (info bg.updateRate)}

Note:

1. DT big data DreamWorks Wechat official account DT_Spark

2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580

3. Sina Weibo: http://www.weibo.com/ilovepains

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report