Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 12: Execu for Spark Streaming source code interpretation

2025-02-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The data received by Receiver is managed by ReceiverSupervisorImpl.

When ReceiverSupervisorImpl receives the data, it stores the data and reports the metadata of the data to ReceiverTracker.

Data fault tolerance for Executor can be done in three ways:

WAL log

Data copy

Receive data stream playback of receiver

/ * * Store block and report it to driver * / def pushAndReportBlock (receivedBlock: ReceivedBlock, metadataOption: Option [Any], blockIdOption: Option [StreamBlockId]) {val blockId = blockIdOption.getOrElse (nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock (blockId, receivedBlock) logDebug (s "Pushed block $blockId in ${(System.currentTimeMillis-time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo (streamId, numRecords, metadataOption BlockStoreResult) trackerEndpoint.askWithRetry [Boolean] (AddBlock (blockInfo)) logDebug (s "Reported block $blockId")}

Data is stored with the help of receiverBlockHandler, which is implemented in two ways:

Privateval receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog (env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException ("Cannot enable receiver write-ahead log without checkpoint directory set. + "Please use streamingContext.checkpoint () to set the checkpoint directory. "+" See documentation for more details. ")} new WriteAheadLogBasedBlockHandler (env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)} else {new BlockManagerBasedBlockHandler (env.blockManager, receiver.storageLevel)}}

On the one hand, WriteAheadLogBaseBlockHandler hands over the data to BlockManager management, on the other hand, it writes WAL logs.

Once the node crashes, the data in memory can be recovered by the WAL log. At the beginning of WAL, it is no longer recommended to store multiple copies of the data.

Privateval effectiveStorageLevel = {if (storageLevel.deserialized) {logWarning (s "Storage level serialization ${storageLevel.deserialized} is not supported when" + s "write ahead log is enabled, change to serialization false")} if (storageLevel.replication > 1) {logWarning (s "Storage level replication ${storageLevel.replication} is unnecessary when" + s "write ahead log is enabled, change to replication 1")} StorageLevel (storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)}

BlockManagerBaseBlockHandler, on the other hand, hands over the data to BlockManager directly.

If you don't write WAL, will the data be lost when the node crashes? Not necessarily. Because the storageLevel of receiver is passed in when building WriteAheadLogBaseBlockHandler and BlockManagerBaseBlockHandler. StorageLevel is used to describe where the data is stored (memory, disk) and the number of copies.

Class StorageLevel private (private var _ useDisk: Boolean, private var _ useMemory: Boolean, private var _ useOffHeap: Boolean, private var _ deserialized: Boolean, private var _ replication: Int = 1) extends Externalizable

The following types of StorageLevel are publicly owned:

Val NONE = new StorageLevel (false, false) val DISK_ONLY = new StorageLevel (true, false, false, false) val DISK_ONLY_2 = new StorageLevel (true, false, 2) val MEMORY_ONLY = new StorageLevel (false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel (false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel (false, true, false, false) false = val MEMORY_ONLY_SER_2 (val MEMORY_ONLY_SER_2, val MEMORY_ONLY_SER_2, new StorageLevel, new StorageLevel, new StorageLevel 2) val MEMORY_AND_DISK = new StorageLevel (true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel (true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel (true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel (true, true, false, false, 2) val OFF_HEAP = new StorageLevel (false, false, true, false)

By default, the data is MEMORY_AND_DISK_2, which means that two copies of the data are made and are written to disk when there is insufficient memory.

The final storage of the data is completed and managed by BlockManager:

Def storeBlock (blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {var numRecords = None: Option [Long] val putResult: Seq [(BlockId, BlockStatus)] = block match {case ArrayBufferBlock (arrayBuffer) = > numRecords = Some (arrayBuffer.size.toLong) blockManager.putIterator (blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock (iterator) = > val countIterator = new CountingIterator (iterator) val putResult = blockManager.putIterator (blockId, countIterator, storageLevel) TellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock (byteBuffer) = > blockManager.putBytes (blockId, byteBuffer, storageLevel, tellMaster = true) case o = > throw new SparkException (s "Could not store $blockId to block manager) Unexpected block type ${o.getClass.getName} ")} if (! putResult.map {_ .1} .targets (blockId)) {throw new SparkException (s" Could not store $blockId to block manager with storage level $storageLevel ")} BlockManagerBasedStoreResult (blockId, numRecords)}

For reading data directly from kafka, fault tolerance can be carried out by recording the data offset. If the program crashes, the next time you start, you can read the data again from the last unprocessed offset.

Note:

1. DT big data DreamWorks Wechat official account DT_Spark

2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580

3. Sina Weibo: http://www.weibo.com/ilovepains

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report