Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

(version customization) lesson 13: Driver Fault-tolerant Security for Spark Streaming Source Code interpretation

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The contents of this issue:

1. ReceiverBlockTracker fault-tolerant security

2. DStream and JobGenerator fault-tolerant security

One: fault-tolerant security

1. ReceivedBlockTracker is responsible for managing the metadata of the Spark Streaming running program. Data plane

2. DStream and JobGenerator are the core level of job scheduling, that is, to what extent the specific scheduling is, from the consideration of operation. DStream is the logical level.

3. Job survival level, JobGenerator is the Job scheduling level, specific scheduling to what extent. From an operational point of view.

When it comes to Driver fault tolerance, you should consider the operations in Driver that need to be maintained.

1. ReceivedBlockTracker tracks the data, so fault tolerance is required. Fault tolerance through WAL.

2. DStreamGraph expresses the dependency. When restoring the state, you need to restore the dependency at the logical level according to DStream. Fault tolerance through checkpoint.

3. JobGenerator surface how you constantly generate Job based on the data in ReceiverBlockTracker and the dependencies made up of DStream. How far have you consumed that data?

The summary is as follows:

ReceivedBlockTracker:

1. ReceivedBlockTracker will manage all the data during the running of Spark Streaming. And assign the data to the required batches, and all the actions will be written to the Log by WAL. If Driver fails, you can restore the tracker state according to history, and use checkpoint to save the history directory when the ReceivedBlockTracker is created.

Let's start with how to deal with the data after Receiver received it.

2. The ReceiverBlockTracker.addBlock source code is as follows:

Receiver receives the data, reports the metadata information, and then reports the data through ReceiverSupervisorImpl, which is fault-tolerant directly through WAL.

When the manager of Receiver, ReceiverSupervisorImpl, reports the metadata information to Driver, he is processing it to ReceiverBlockTracker. ReceiverBlockTracker writes the data into the WAL file before it is written into memory, which is used by the current Spark Streaming program scheduler, which is used by JobGenerator. It is not possible for JobGenerator to use WAL directly. WAL's data is on disk, where JobGenerator uses in-memory cached data structures

/ * Add received block. This event will get written to the write ahead log (if enabled). * / def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {try {val writeResult = writeToLog (BlockAdditionEvent (receivedBlockInfo)) / / after receiving the data, WALif (writeResult) {synchronized {getReceivedBlockQueue (receivedBlockInfo.streamId) + = receivedBlockInfo / / when WAL is successful Add BlockInfo metadata information to Block Queue} logDebug (s "Stream ${receivedBlockInfo.streamId} received" + s "block ${receivedBlockInfo.blockStoreResult.blockId}")} else {logDebug (s "Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving" + s "block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")} writeResult} catch {case NonFatal (e) = > logError (s "Error adding block $receivedBlockInfo", e) false}}

The data received by the Driver end is saved in streamIdToUnallocatedBlockQueues. The specific structure is as follows:

Private type ReceivedBlockQueue = mutable.Queue [ReceivedBlockInfo] privateval streamIdToUnallocatedBlockQueues = new mutable.HashMap [Int, ReceivedBlockQueue] allocateBlocksToBatch assigns the received data to the batch, and fetches the Block according to the streamId, thus knowing that the Spark Streaming can have different data sources when processing the data. So what exactly is BlockInfo?

BatchTime is the time when the last Job began to receive data after it had allocated the data.

/ * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). * / def allocateBlocksToBatch (batchTime: Time): Unit = synchronized {if (lastAllocatedBatchTime = = null | | batchTime > lastAllocatedBatchTime) {val streamIdToBlocks = streamIds.map {streamId = > (streamId, getReceivedBlockQueue (streamId) .dequeueAll (x = > true)) / / obtain Block information according to StreamId}. ToMapval allocatedBlocks = AllocatedBlocks (streamIdToBlocks) if (writeToLog (BatchAllocationEvent (batchTime, allocatedBlocks)) {timeToAllocatedBlocks.put (batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime / / the batchTime is assigned by the previous job Start at the time the data is received} else {logInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")}} else {logInfo (s "Possibly processed batch $batchTime need to be processed again in WAL recovery")}}

With the passage of time, RDD will continue to be generated. At this time, it is necessary to clean up some historical data. You can clean up the historical data through the cleanupOldBatches method.

/ * Clean up block information of old batches. If waitForCompletion is true, this method * returns only after the files are cleaned up. * / def cleanupOldBatches (cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {require (cleanupThreshTime.milliseconds

< clock.getTimeMillis())val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup)if (writeToLog(BatchCleanupEvent(timesToCleanup))) {timeToAllocatedBlocks --= timesToCleanupwriteAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion)) } else { logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.") }} 以上几个方法都进行了WAL动作 (record: ReceivedBlockTrackerLogEvent): = {(isWriteAheadLogEnabled) { logTrace(record){.get.write(ByteBuffer.(Utils.(record))clock.getTimeMillis())} {(e) =>

LogWarning (recorde)} {}}

Summary:

The management of data by WAL includes data generation, data destruction and consumption. All of the above should be written to the WAL file after operation.

JobGenerator:

There will be time intervals for Checkpoint. Batch Duractions,Batch will perform checkpoint before and after execution.

The process before and after the doCheckpoint is called:

1. Take a brief look at generateJobs

/ * * Generate jobs and perform checkpoint for the given `time`. * / private def generateJobs (time: Time) {/ / Set the SparkEnv in this thread, so that job generation code can access the environment / / Example: BlockRDDs are created in this thread And it needs to access BlockManager / / Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set (ssc.env) Try {jobScheduler.receiverTracker.allocateBlocksToBatch (time) / / allocate received blocks to batchgraph.generateJobs (time) / / generate jobs using allocated block} match {case Success (jobs) = > val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo (time) jobScheduler.submitJobSet (JobSet (time, jobs, streamIdToInputInfos) case Failure (e) = > jobScheduler.reportError ("Error generating jobs for time" + time) E)} eventLoop.post (DoCheckpoint (time, clearCheckpointDataLater = false)) / / checkpoint action is required after job is completed.

2. ProcessEvent received a message event

/ * * Processes all events * / private def processEvent (event: JobGeneratorEvent) {logDebug ("Got event" + event) event match {case GenerateJobs (time) = > generateJobs (time) case ClearMetadata (time) = > clearMetadata (time) case DoCheckpoint (time, clearCheckpointDataLater) = > doCheckpoint (time, clearCheckpointDataLater) / / call doCheckpoint method case ClearCheckpointData (time) = > clearCheckpointData (time)}}

3. The doCheckpoint source code is as follows:

/ * * Perform checkpoint for the give `time`. * / private def doCheckpoint (time: Time, clearCheckpointDataLater: Boolean) {if (shouldCheckpoint & & (time-graph.zeroTime) .isMultipleOf (ssc.checkpointDuration)) {logInfo ("Checkpointing graph for time" + time) ssc.graph.updateCheckpointData (time) / / finally CheckpointcheckpointWriter.write (new Checkpoint (ssc, time), clearCheckpointDataLater)}}

4. The updateCheckpointData source code in DStream is as follows: it eventually leads to the Checkpoint of RDD.

/ * * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is * a default implementation that saves only the file names of the checkpointed RDDs to * checkpointData. Subclasses of DStream (especially those of InputDStream) may override * this method to save custom checkpoint data. * / private [streaming] def updateCheckpointData (currentTime: Time) {logDebug ("Updating checkpoint data for time" + currentTime) checkpointData.update (currentTime) dependencies.foreach (_ .updateCheckpo intData (currentTime)) logDebug ("Updated checkpoint data for time" + currentTime + ":" + checkpointData)}

JobGenerator fault-tolerant security is shown below:

Reference blog: http://blog.csdn.net/snail_gesture/article/details/51492873#comments

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report