Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 11: interpretation of Spark Streaming source code

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Content of this issue:

1. Architecture design of ReceiverTracker

2. Message circulation system

3. ReceiverTracker implementation

Last lesson talked about how Receiver continuously receives data, and the metadata of the received data will be reported to ReceiverTracker. Let's take a look at the specific functions and implementations of ReceiverTracker.

ReceiverTracker's main functions:

Start Receivers on the Executor.

Stop the Receivers.

Update the rate at which Receiver receives data (i.e. current limit)

Constantly waiting for the operation status of Receivers, as long as Receivers stops running, restart Receiver, that is, Receiver's fault tolerance function.

Accept Receiver's registration.

ReceivedBlockTracker is used to manage metadata of Receiver received data.

Report error messages sent by Receiver

ReceiverTracker manages a message communication entity ReceiverTrackerEndpoint, which is used to communicate messages with Receiver or ReceiverTracker.

In the ReceiverTracker start method, ReceiverTrackerEndpoint is instantiated and Receivers is started on the Executor.

Start Receiver. In fact, ReceiverTracker sends a local message to ReceiverTrackerEndpoint. ReceiverTrackerEndpoint encapsulates Receiver into RDD and submits it to the cluster in the form of job.

After Receiver is started, it will register with ReceiverTracker, and registration is successful until it is officially started.

When the Receiver receives the data, it needs to write the data to BlockManager and report the metadata of the data to Receiver Tracker when certain conditions are met.

/** Store block and report it to driver */def pushAndReportBlock( receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId] ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId")}

When ReceiverTracker receives metadata, it starts a thread in the thread pool to write the data

case AddBlock(receivedBlockInfo) =>if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {walBatchingThreadPool.execute(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {if (active) { context.reply(addBlock(receivedBlockInfo)) } else {throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down. ") } } }) } else { context.reply(addBlock(receivedBlockInfo)) }

Metadata of data is managed by ReceivedBlockTracker

The data is eventually written to streamIdToUnallocatedBlockQueue, a queue of block information for each stream.

Whenever Streaming triggers a job, the data in the queue is allocated into a batch and written to the timeToAllocatedBlocks data structure.

Here is a simple flowchart:

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report