Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize ReceiverSupervisorImpl instantiation

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how to realize ReceiverSupervisorImpl instantiation". In daily operation, I believe many people have doubts about how to realize ReceiverSupervisorImpl instantiation. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "how to realize ReceiverSupervisorImpl instantiation". Next, please follow the editor to study!

Let's first review the specific methods implemented in Executor

Instantiate ReceiverSupervisorImpl

Wait for awaitTermination after start

/ / ReceiverTracker.scala line 564val startReceiverFunc: Iterator [receiver []] = > Unit = (iterator: Iterator [receiver []]) = > {if (! iterator.hasNext) {throw new SparkException ("Could not start receiver as object not found.")} if (TaskContext.get (). AttemptNumber () = 0) {val receiver = iterator.next () assert (iterator.hasNext = = false) val supervisor = new ReceiverSupervisorImpl (receiver, SparkEnv.get) SerializableHadoopConf.value, checkpointDirOption) supervisor.start () supervisor.awaitTermination ()} else {/ / It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. }}

Take a look at the construction of the parent class ReceiverSupervisor of ReceiverSupervisorImpl.

Member variable assignment, associating the current supervisor with receiver (receiver.attachSupervisor (this))

The note is also clear: responsible for overseeing Receiver on Worker. Provide all required interfaces for processing data received from receiver

/ / ReceiverSupervisor.scala line 31 * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. * / private [streaming] abstract class ReceiverSupervisor (receiver: Receiver [_], conf: SparkConf) extends Logging {/ * * Enumeration to identify current state of the Receiver * / object ReceiverState extends Enumeration {type CheckpointState = Valueval Initialized, Started, Stopped = Value} import ReceiverState._ / / Attach the supervisor to the receiver receiver.attachSupervisor (this) / / associate receiver with supervisor privateval futureExecutionContext = ExecutionContext.fromExecutorService ("receiver-supervisor-future") / * * Receiver id * / protected val streamId = receiver.streamId / * * Has the receiver been marked for stop. * / privateval stopLatch = new CountDownLatch (1) / * * Time between a receiver is stopped and started again * / privateval defaultRestartDelay = conf.getInt ("spark.streaming.receiverRestartDelay", 2000) / * * The current maximum rate limit for this receiver. * / private [streaming] def getCurrentRateLimit: Long = Long.MaxValue / * * Exception associated with the stopping of the receiver * / @ volatile protected var stoppingError: Throwable = null / * * State of the receiver * / @ volatile private [streaming] var receiverState = Initialized / / some methods are actually data processing interfaces}

Instantiation of ReceiverSupervisorImpl

BlockManagerBasedBlockHandler is instantiated to send data to BlockManager

Instantiate RpcEndpoint

Instantiate BlockGenerator

Instantiate BlockGeneratorListener listeners

/ / ReceiverSupervisorImpl.scala line 43 Concrete implementation of * * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. * / private [streaming] class ReceiverSupervisorImpl (receiver: Receiver [_], env: SparkEnv, hadoopConf: Configuration, checkpointDirOption: Option [String]) extends ReceiverSupervisor (receiver Env.conf) with Logging {privateval host = SparkEnv.get.blockManager.blockManagerId.host privateval executorId = SparkEnv.get.blockManager.blockManagerId.executorId privateval receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog (env.conf)) {/ / if (checkpointDirOption.isEmpty) {throw new SparkException ("Cannot enable receiver write-ahead log without checkpoint directory set. + "Please use streamingContext.checkpoint () to set the checkpoint directory. "+" See documentation for more details. ")} new WriteAheadLogBasedBlockHandler (env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)} else {new BlockManagerBasedBlockHandler (env.blockManager, receiver.storageLevel)}} / * * Remote RpcEndpointRef for the ReceiverTracker * / privateval trackerEndpoint = RpcUtils.makeDriverRef (" ReceiverTracker ", env.conf Env.rpcEnv) / * * RpcEndpointRef for receiving messages from the ReceiverTracker in the driver * / privateval endpoint = env.rpcEnv.setupEndpoint ("Receiver-" + streamId + "-" + System.currentTimeMillis (), new ThreadSafeRpcEndpoint {overrideval rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction [Any, Unit] = {case StopReceiver = > logInfo ("Received stop signal") ReceiverSupervisorImpl.this.stop ("Stopped by driver") None) case CleanupOldBlocks (threshTime) = > logDebug ("Received delete old batch signal") cleanupOldBlocks (threshTime) case UpdateRateLimit (eps) = > logInfo (s "Received a new rate limit: $eps.") RegisteredBlockGenerators.foreach {bg = > bg.updateRate (eps)}}) / * * Unique block ids if one wants to add blocks directly * / privateval newBlockId = new AtomicLong (System.currentTimeMillis ()) privateval registeredBlockGenerators = new mutable.ArrayBuffer [BlockGenerator] / / typical bread pattern with mutable.SynchronizedBuffer [BlockGenerator] / * * Divides received data records into data blocks for pushing in BlockManager. * / privateval defaultBlockGeneratorListener = new BlockGeneratorListener {def onAddData (data: Any, metadata: Any): Unit = {} def onGenerateBlock (blockId: StreamBlockId): Unit = {} def onError (message: String, throwable: Throwable) {reportError (message, throwable)} def onPushBlock (blockId: StreamBlockId, arrayBuffer: ArrayBuffer [_]) {pushArrayBuffer (arrayBuffer, None, Some (blockId))} privateval defaultBlockGenerator = createBlockGenerator (defaultBlockGeneratorListener) / /. Some methods / * * Store an ArrayBuffer of received data as a data block into Spark's memory. * / def pushArrayBuffer (arrayBuffer: ArrayBuffer [_], metadataOption: Option [Any], blockIdOption: Option [StreamBlockId]) {pushAndReportBlock (ArrayBufferBlock (arrayBuffer), metadataOption, blockIdOption)} / * * Store block and report it to driver * / def pushAndReportBlock (receivedBlock: ReceivedBlock, metadataOption: Option [Any], blockIdOption: Option [StreamBlockId]) {val blockId = blockIdOption.getOrElse (nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock (blockId ReceivedBlock) logDebug (s "Pushed block $blockId in ${(System.currentTimeMillis-time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo (streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry [Boolean] (AddBlock (blockInfo)) logDebug (s "Reported block $blockId")}}

Look at BlockGenerator.

The comments are clear. There are two threads.

Periodically use the previous batch of data as a block, and create the next batch of data; RecurringTimer class, internal Thread

Push data to BlockManager

/ / * * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. * / private [streaming] class BlockGenerator (listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock () extends RateLimiter (conf) with Logging {private case class Block (id: StreamBlockId, buffer: ArrayBuffer [Any]) / * The BlockGenerator can be in 5 possible states, in the order as follows. * *-Initialized: Nothing has been started *-Active: start () has been called, and it is generating blocks on added data. *-StoppedAddingData: stop () has been called, the adding of data has been stopped, * but blocks are still being generated and pushed. *-StoppedGeneratingBlocks: Generating of blocks has been stopped, but * they are still being pushed. *-StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed. * / private object GeneratorState extends Enumeration {type GeneratorState = Valueval Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value} import GeneratorState._privateval blockIntervalMs = conf.getTimeAsMs ("spark.streaming.blockInterval", "200ms") require (blockIntervalMs > 0, s "'spark.streaming.blockInterval' should be a positive value") privateval blockIntervalTimer = new RecurringTimer (clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") / / Periodic thread privateval blockQueueSize = conf.getInt ("spark.streaming.blockQueueSize") 10) privateval blocksForPushing = new ArrayBlockingQueue [Block] (blockQueueSize) privateval blockPushingThread = new Thread () {override def run () {keepPushingBlocks ()}} / / @ volatile private var currentBuffer = new ArrayBuffer [any] @ volatile private var state = Initialized//...}

At this point, the ReceiverSupervisorImpl instantiation is complete. However, Receiver has not been started so far.

At this point, the study on "how to instantiate ReceiverSupervisorImpl" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report