In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Content of this issue:
1. Architecture design of ReceiverTracker
2. Message circulation system
3. ReceiverTracker implementation
Last lesson talked about how Receiver continuously receives data, and the metadata of the received data will be reported to ReceiverTracker. Let's take a look at the specific functions and implementations of ReceiverTracker.
ReceiverTracker's main functions:
Start Receivers on the Executor.
Stop the Receivers.
Update the rate at which Receiver receives data (i.e. current limit)
Constantly waiting for the operation status of Receivers, as long as Receivers stops running, restart Receiver, that is, Receiver's fault tolerance function.
Accept Receiver's registration.
ReceivedBlockTracker is used to manage metadata of Receiver received data.
Report error messages sent by Receiver
ReceiverTracker manages a message communication entity ReceiverTrackerEndpoint, which is used to communicate messages with Receiver or ReceiverTracker.
In the ReceiverTracker start method, ReceiverTrackerEndpoint is instantiated and Receivers is started on the Executor.
Start Receiver. In fact, ReceiverTracker sends a local message to ReceiverTrackerEndpoint. ReceiverTrackerEndpoint encapsulates Receiver into RDD and submits it to the cluster in the form of job.
After Receiver is started, it will register with ReceiverTracker, and registration is successful until it is officially started.
When the Receiver receives the data, it needs to write the data to BlockManager and report the metadata of the data to Receiver Tracker when certain conditions are met.
/** Store block and report it to driver */def pushAndReportBlock( receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId] ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId")}
When ReceiverTracker receives metadata, it starts a thread in the thread pool to write the data
case AddBlock(receivedBlockInfo) =>if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {walBatchingThreadPool.execute(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {if (active) { context.reply(addBlock(receivedBlockInfo)) } else {throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down. ") } } }) } else { context.reply(addBlock(receivedBlockInfo)) }
Metadata of data is managed by ReceivedBlockTracker
The data is eventually written to streamIdToUnallocatedBlockQueue, a queue of block information for each stream.
Whenever Streaming triggers a job, the data in the queue is allocated into a batch and written to the timeToAllocatedBlocks data structure.
Here is a simple flowchart:
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.