Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 13: Drive for Spark Streaming source code interpretation

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The contents of this issue:

ReceivedBlockTracker fault-tolerant security

DStream and JobGenerator fault-tolerant security

There are two levels of fault tolerance in Driver: 1. Metadata of Receiver receiving data. 2. Information of components managed by Driver (scheduling and driver level)

The metadata adopts the fault tolerance mechanism of WAL.

Case AddBlock (receivedBlockInfo) = > if (WriteAheadLogUtils.isBatchingEnabled (ssc.conf) IsDriver = true) {walBatchingThreadPool.execute (new Runnable {override def run (): Unit = Utils.tryLogNonFatalError {if (active) {context.reply (addBlock (receivedBlockInfo))} else {throw new IllegalStateException ("ReceiverTracker RpcEndpoint shut down.")})} else {context.reply (addBlock (receivedBlockInfo))}. / * * Add New blocks for the given stream * / private def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {receivedBlockTracker.addBlock (receivedBlockInfo)}

Metadata is actually managed by ReceivedBlockTracker.

Def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {try {val writeResult = writeToLog (BlockAdditionEvent (receivedBlockInfo)) if (writeResult) {synchronized {getReceivedBlockQueue (receivedBlockInfo.streamId) + = receivedBlockInfo} logDebug (s "Stream ${receivedBlockInfo.streamId} received" + s "block ${receivedBlockInfo.blockStoreResult.blockId}") else {logDebug (s "Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving" + s "block $) {receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. ")} writeResult} catch {case NonFatal (e) = > logError (s" Error adding block $receivedBlockInfo " E) false}}

The writeToLog method is called first:

/ * * Write an update to the tracker to the write ahead log * / private def writeToLog (record: ReceivedBlockTrackerLogEvent): Boolean = {if (isWriteAheadLogEnabled) {logTrace (s "Writing record: $record") try {writeAheadLogOption.get.write (ByteBuffer.wrap (Utils.serialize (record)), clock.getTimeMillis () true} catch {case NonFatal (e) = > logWarning (s "Exception thrown while writing record: $record to the WriteAheadLog." E) false}} else {true}}

The data is then written to the streamIdToUnallocatedBlockQueue queue.

After every batchInterval time, the job of Streaming is triggered to run. At this point, you want to assign the data in the streamIdToUnallocatedBlockQueue queue to a specific time.

Def allocateBlocksToBatch (batchTime: Time): Unit = synchronized {if (lastAllocatedBatchTime = = null | | batchTime > lastAllocatedBatchTime) {val streamIdToBlocks = streamIds.map {streamId = > (streamId, getReceivedBlockQueue (streamId) .dequeueAll (x = > true))}. ToMap val allocatedBlocks = AllocatedBlocks (streamIdToBlocks) if (writeToLog (BatchAllocationEvent (batchTime, allocatedBlocks) {timeToAllocatedBlocks.put (batchTime) AllocatedBlocks) lastAllocatedBatchTime = batchTime} else {logInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")}} else {/ / This situation occurs when: / / 1.WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, / / possibly processed batch job or half-processed batch job need to be processed again, / / so the batchTime will be equal to lastAllocatedBatchTime. / / 2. Slow checkpointing makes recovered batch time older than WAL recovered / / lastAllocatedBatchTime. / / This situation will only occurs in recovery time. LogInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")}}

WAL logs will also be written during this process.

JobGenerator is triggered to generate job at every batchInterval time.

/ * * Generate jobs and perform checkpoint for the given `time`. * / private def generateJobs (time: Time) {/ / Set the SparkEnv in this thread, so that job generation code can access the environment / / Example: BlockRDDs are created in this thread, and it needs to access BlockManager / / Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set (ssc.env) Try {jobScheduler.receiverTracker.allocateBlocksToBatch (time) / / allocate received blocks to batch graph.generateJobs (time) / / generate jobs using allocated block} match {case Success (jobs) = > val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo (time) jobScheduler.submitJobSet (JobSet (time, jobs, streamIdToInputInfos) case Failure (e) = > jobScheduler.reportError ("Error generating jobs for time" + time, e)} eventLoop.post (DoCheckpoint (time, clearCheckpointDataLater = false))}

Finally, put a DoCheckpoint message into the message loop queue.

When JobGenerator receives the message:

/ * * Processes all events * / private def processEvent (event: JobGeneratorEvent) {logDebug ("Got event" + event) event match {case GenerateJobs (time) = > generateJobs (time) case ClearMetadata (time) = > clearMetadata (time) case DoCheckpoint (time, clearCheckpointDataLater) = > doCheckpoint (time, clearCheckpointDataLater) case ClearCheckpointData (time) = > clearCheckpointData (time)}} / * Perform checkpoint for the give `time`. * / private def doCheckpoint (time: Time, clearCheckpointDataLater: Boolean) {if (shouldCheckpoint & & (time-graph.zeroTime) .isMultipleOf (ssc.checkpointDuration)) {logInfo ("Checkpointing graph for time" + time) ssc.graph.updateCheckpointData (time) checkpointWriter.write (new Checkpoint (ssc, time), clearCheckpointDataLater)}}

A Checkpoint object is generated based on ssc and time. And ssc has all the information about Driver. So when Driver crashes, Driver can be recovered based on Checkpoint data.

The code for the recovery is as follows:

/ * * Restarts the generator based on the information in checkpoint * / private def restart () {/ / If manual clock is being used for testing, then / / either set the manual clock to the last checkpointed time, / / or if the property is defined set it to that time if (clock.isInstanceOf [ManualClock]) {val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = ssc.sc.conf.getLong ("spark.streaming.manualClock.jump" 0) clock.asInstanceOf [ManualClock] .settime (lastTime + jumpTime)} val batchDuration = ssc.graph.batchDuration / / Batches when the master was down, that is, / / between the checkpoint and current restart timeval checkpointTime = ssc.initialCheckpoint.checkpointTimeval restartTime = new Time (timer.getRestartTime (graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until (restartTime, batchDuration) logInfo ("Batches during down time (" + downTimes.size + "batches):" + downTimes.mkString (") ") / / Batches that were unprocessed before failureval pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted (Time.ordering) logInfo (" Batches pending processing ("+ pendingTimes.size +" batches): "+ pendingTimes.mkString (", ")) / / Reschedule jobs for these times val timesToReschedule = (pendingTimes + + downTimes). Filter {_

< restartTime } .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time =>

/ / Allocate the related blocks when recovering from failure, because some blocks that were / / added but not allocated, are dangling in the queue after recovering, we have to allocate / / those blocks to the next batch, which is the batch they were supposed to go. JobScheduler.receiverTracker.allocateBlocksToBatch (time) / / allocate received blocks to batch jobScheduler.submitJobSet (JobSet (time, graph.generateJobs (time))} / / Restart the timer timer.start (restartTime.milliseconds) logInfo ("Restarted JobGenerator at" + restartTime)}

Note:

1. DT big data DreamWorks Wechat official account DT_Spark

2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580

3. Sina Weibo: http://www.weibo.com/ilovepains

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report