Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement Driver Fault-tolerant Security

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how to achieve Driver fault-tolerant security". In daily operation, I believe many people have doubts about how to achieve Driver fault-tolerant security. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "how to achieve Driver fault-tolerant security". Next, please follow the editor to study!

First, look at the fault tolerance of ReceiverTracker, mainly that ReceiverTracker receives metadata into WAL, and look at the addBlock method of ReceiverTracker. The code is as follows

Def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {

Try {

Val writeResult = writeToLog (BlockAdditionEvent (receivedBlockInfo))

If (writeResult) {

Synchronized {

GetReceivedBlockQueue (receivedBlockInfo.streamId) + = receivedBlockInfo

}

LogDebug (s "Stream ${receivedBlockInfo.streamId} received" +

S "block ${receivedBlockInfo.blockStoreResult.blockId}")

} else {

LogDebug (s "Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving" +

S "block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

}

WriteResult

} catch {

Case NonFatal (e) = >

LogError (s "Error adding block $receivedBlockInfo", e)

False

}

}

The writeToLog method is to perform the operation of WAL. Look at the code of writeToLog.

Private def writeToLog (record: ReceivedBlockTrackerLogEvent): Boolean = {

If (isWriteAheadLogEnabled) {

LogTrace (s "Writing record: $record")

Try {

WriteAheadLogOption.get.write (ByteBuffer.wrap (Utils.serialize (record)

Clock.getTimeMillis ()

True

} catch {

Case NonFatal (e) = >

LogWarning (s "Exception thrown while writing record: $record to the WriteAheadLog.", e)

False

}

} else {

True

}

}

First determine whether WAL is turned on, according to the isWriteAheadLogEnabled value

Private [streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

Go on to see writeAheadLogOption.

Privateval writeAheadLogOption = createWriteAheadLog ()

Take a look at the createWriteAheadLog () method

Private def createWriteAheadLog (): Option [WriteAheadLog] = {

CheckpointDirOption.map {checkpointDir = >

Val logDir = ReceivedBlockTracker.checkpointDirToLogDir (checkpointDirOption.get)

WriteAheadLogUtils.createLogForDriver (conf, logDir, hadoopConf)

}

}

According to the configuration of checkpoint, get the directory of checkpoint. Here, you can see that checkpoint can have multiple directories.

Put receivedBlockInfo into memory queue getReceivedBlockQueue after writing WAL

Second, look at the allocateBlocksToBatch method of ReceivedBlockTracker. The code is as follows

Def allocateBlocksToBatch (batchTime: Time): Unit = synchronized {

If (lastAllocatedBatchTime = = null | | batchTime > lastAllocatedBatchTime) {

Val streamIdToBlocks = streamIds.map {streamId = >

(streamId, getReceivedBlockQueue (streamId) .dequeueAll (x = > true))

}. ToMap

Val allocatedBlocks = AllocatedBlocks (streamIdToBlocks)

If (writeToLog (BatchAllocationEvent (batchTime, allocatedBlocks)) {

TimeToAllocatedBlocks.put (batchTime, allocatedBlocks)

LastAllocatedBatchTime = batchTime

} else {

LogInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")

}

} else {

/ / This situation occurs when:

/ / 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent

/ / possibly processed batch job or half-processed batch job need to be processed again

/ / so the batchTime will be equal to lastAllocatedBatchTime.

/ / 2. Slow checkpointing makes recovered batch time older than WAL recovered

/ / lastAllocatedBatchTime.

/ / This situation will only occurs in recovery time.

LogInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")

}

}

First get the ReceivedBlockQueue queue value of each receiver from getReceivedBlockQueue and assign it to streamIdToBlocks, then wrap it

Val allocatedBlocks = AllocatedBlocks (streamIdToBlocks)

AllocatedBlocks is a batch of metadata obtained according to time, which can be used by the job,job of the corresponding batchDuration during execution. WAL is performed before use. If the job is restored by error, you can know where the data is calculated.

Val allocatedBlocks = AllocatedBlocks (streamIdToBlocks)

If (writeToLog (BatchAllocationEvent (batchTime, allocatedBlocks)) {

TimeToAllocatedBlocks.put (batchTime, allocatedBlocks)

LastAllocatedBatchTime = batchTime

} else {

LogInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")

}

Third, look at the cleanupOldBatches method. The function of cleanupOldBatches is to clear the unused batches metadata from memory, then delete the WAL data, and then delete the batches information to be deleted before WAL.

Def cleanupOldBatches (cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

Require (cleanupThreshTime.milliseconds

< clock.getTimeMillis()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) if (writeToLog(BatchCleanupEvent(timesToCleanup))) { timeToAllocatedBlocks --= timesToCleanup writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) } else { logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.") } } · 总结一下上面的三种WAL,对应下面的三种事件,这就是ReceiverTracker的容错 /** Trait representing any event in the ReceivedBlockTracker that updates its state. */ private[streaming] sealed trait ReceivedBlockTrackerLogEvent private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo) extends ReceivedBlockTrackerLogEvent private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks) extends ReceivedBlockTrackerLogEvent private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent · 看一下Dstream.graph和JobGenerator的容错,从开始 private def generateJobs(time: Time) { SparkEnv has been removed. SparkEnv.set(ssc.env) Try { // allocate received blocks to batch // 分配接收到的数据给batch jobScheduler.receiverTracker.allocateBlocksToBatch(time) // 使用分配的块生成jobs graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) =>

/ / get metadata information

Val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo (time)

/ / submit jobSet

JobScheduler.submitJobSet (JobSet (time, jobs, streamIdToInputInfos))

Case Failure (e) = >

JobScheduler.reportError ("Error generating jobs for time" + time, e)

}

EventLoop.post (DoCheckpoint (time, clearCheckpointDataLater = false))

}

Send a DoCheckpoint message after jobs generation is completed, and finally call the doCheckpoint method. The code is as follows

Private def doCheckpoint (time: Time, clearCheckpointDataLater: Boolean) {

If (shouldCheckpoint & & (time-graph.zeroTime) .isMultipleOf (ssc.checkpointDuration)) {

LogInfo ("Checkpointing graph for time" + time)

Ssc.graph.updateCheckpointData (time)

CheckpointWriter.write (new Checkpoint (ssc, time), clearCheckpointDataLater)

}

}

At this point, the study on "how to implement Driver fault-tolerant security" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report