In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "how ReceiverTracker handles data". Many people will encounter such a dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
ReceiverTracker can use the specific algorithm in Driver to calculate how to start Receiver on a specific executor. The way to start Receiver is to encapsulate it and run it in a task, which is the only task in job. In essence, when ReceiverTracker starts Receiver, it is encapsulated into one job after another. One of the methods that starts Receiver has a start method of ReceiverSupervisorImpl,ReceiverSupervisorImpl that causes the Receiver to actually execute on the work node. Instead, the received data is put into block through BlockGenerator, block is stored through ReceiverSupervisorImpl, and the metadata of the data is reported to ReceiverTracker.
Let's talk about how to deal with ReceiverTracker after receiving the data.
ReceiverSupervisorImpl stores block through receivedBlockHandler.
Privateval receivedBlockHandler: ReceivedBlockHandler = {
If (WriteAheadLogUtils.enableReceiverLog (env.conf)) {
...
New WriteAheadLogBasedBlockHandler (env.blockManager, receiver.streamId
Receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
New BlockManagerBasedBlockHandler (env.blockManager, receiver.storageLevel)
}
}
One is through WAL, the other is through BlockManager.
/ * * Store block and report it to driver * /
Def pushAndReportBlock (
ReceivedBlock: ReceivedBlock
MetadataOption: Option [Any]
BlockIdOption: Option [StreamBlockId]
) {
Val blockId = blockIdOption.getOrElse (nextBlockId)
Val time = System.currentTimeMillis
Val blockStoreResult = receivedBlockHandler.storeBlock (blockId, receivedBlock)
LogDebug (s "Pushed block $blockId in ${(System.currentTimeMillis-time)} ms")
Val numRecords = blockStoreResult.numRecords
Val blockInfo = ReceivedBlockInfo (streamId, numRecords, metadataOption, blockStoreResult)
TrackerEndpoint.askWithRetry [Boolean] (AddBlock (blockInfo))
LogDebug (s "Reported block $blockId")
}
Store the data and report it to the ReceiverTracker. When reporting, it is metadata.
/ * * Information about blocks received by the receiver * /
Private [streaming] case class ReceivedBlockInfo (
StreamId: Int
NumRecords: Option [Long]
MetadataOption: Option [Any]
BlockStoreResult: ReceivedBlockStoreResult
The Sealed keyword means that all subclasses are in the current file
ReceiverTracker manages the metadata for initiating, recycling, and receiving reports of Receiver. All input stream must have been added and streamingcontext.start () before ReceiverTracker can be instantiated. Because ReceiverTracker starts a Receiver for each input stream.
There are all input data sources and ID in ReceiverTracker.
Privateval receiverInputStreams = ssc.graph.getReceiverInputStreams ()
Privateval receiverInputStreamIds = receiverInputStreams.map {_ .id}
Status of ReceiverTracker
/ * * Enumeration to identify current state of the ReceiverTracker * /
Object TrackerState extends Enumeration {
Type TrackerState = Value
Val Initialized, Started, Stopping, Stopped = Value
}
Let's take a look at what ReceiverTracker does after receiving a message from AddBlock sent by ReceiverSupervisorImpl.
Case AddBlock (receivedBlockInfo) = >
If (WriteAheadLogUtils.isBatchingEnabled (ssc.conf, isDriver = true)) {
WalBatchingThreadPool.execute (new Runnable {
Override def run (): Unit = Utils.tryLogNonFatalError {
If (active) {
Context.reply (addBlock (receivedBlockInfo))
} else {
Throw new IllegalStateException ("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
Context.reply (addBlock (receivedBlockInfo))
}
First determine whether it is the way of WAL, and if so, reply to addBlock with a thread in the thread pool, because WAL consumes a lot of performance. Otherwise, reply to addBlock directly.
Let it be handed over to receiverBlockTracker for processing.
/ * * Add new blocks for the given stream * /
Private def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {
ReceivedBlockTracker.addBlock (receivedBlockInfo)
}
ReceiverBlockTracker manages blockInfo on the driver side.
/ * Add received block. This event will get written to the write ahead log (if enabled). , /
Def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {
Try {
Val writeResult = writeToLog (BlockAdditionEvent (receivedBlockInfo))
If (writeResult) {
Synchronized {
GetReceivedBlockQueue (receivedBlockInfo.streamId) + = receivedBlockInfo
}
LogDebug (s "Stream ${receivedBlockInfo.streamId} received" +
S "block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
LogDebug (s "Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving" +
S "block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
WriteResult
} catch {
Case NonFatal (e) = >
LogError (s "Error adding block $receivedBlockInfo", e)
False
}
}
The code for writeToLog is very simple, first determine whether it is the way of WAL, and if so, write blockInfo to the log for later data recovery. Otherwise, return to true directly. Then put the block information into the streamIdToUnallocatedBlockQueues.
Privateval streamIdToUnallocatedBlockQueues = new mutable.HashMap [Int, ReceivedBlockQueue]
This data structure is subtle. Key is a streamid,value is a queue that stores the block information received by each stream separately. In this way, ReceiverBlockTracker has all the block information received by stream.
/ * * Write an update to the tracker to the write ahead log * /
Private def writeToLog (record: ReceivedBlockTrackerLogEvent): Boolean = {
If (isWriteAheadLogEnabled) {
LogTrace (s "Writing record: $record")
Try {
WriteAheadLogOption.get.write (ByteBuffer.wrap (Utils.serialize (record)
Clock.getTimeMillis ()
True
} catch {
Case NonFatal (e) = >
LogWarning (s "Exception thrown while writing record: $record to the WriteAheadLog.", e)
False
}
} else {
True
}
}
Take a closer look at ReceiverBlockTracker's notes. This class tracks all received blocks and allocates them by batch. If necessary, all action received by this class can be written to WAL. If the directory of checkpoint is specified, when Driver crashes, the state of ReceiverBlockTracker (including received blocks and assigned blocks) can be restored. If checkpoint is specified when instantiating the class, the previously saved information will be read from it.
/ * *
* Class that keep track of all the received blocks, and allocate them to batches
* when required. All actions taken by this class can be saved to a write ahead log
* (if a checkpoint directory has been provided), so that the state of the tracker
* (received blocks and block-to-batch allocations) can be recovered after driver failure.
*
* Note that when any instance of this class is created with a checkpoint directory
* it will try reading events from logs in the directory.
, /
Private [streaming] class ReceivedBlockTracker (
Let's take a look at what happens when ReceiverTracker receives the CleanupOldBlocks.
Case c: CleanupOldBlocks = >
ReceiverTrackingInfos.values.flatMap (_ .send) .foreach (_ .send (c))
When ReceiverTracker receives this message, it sends it to every Receiver it manages. ReceiverSupervisorImpl uses receivedBlockHandler to clean up the data after receiving the message.
Private def cleanupOldBlocks (cleanupThreshTime: Time): Unit = {
LogDebug (s "Cleaning up blocks older then $cleanupThreshTime")
ReceivedBlockHandler.cleanupOldBlocks (cleanupThreshTime.milliseconds)
}
ReceiverTracker can also adjust the speed at which a streamID receives data at any time and send UpdateRateLimit messages to the corresponding ReceiverSupervisorImpl.
Case UpdateReceiverRateLimit (streamUID, newRate) = >
For (info
Bg.updateRate (eps)
}
/ * *
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{spark.streaming.receiver.maxRate}, even if `newRate` is higher than that.
*
* @ param newRate A new rate in events per second. It has no effect if it's 0 or negative.
, /
Private [receiver] def updateRate (newRate: Long): Unit =
If (newRate > 0) {
If (maxRateLimit > 0) {
RateLimiter.setRate (newRate.min (maxRateLimit))
} else {
RateLimiter.setRate (newRate)
}
}
ReceiverTracker is a facade design pattern, which seems to call the functions of ReceiverTracker, but actually invokes the functions of other classes.
That's all for "how ReceiverTracker handles data". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.