Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the principle of Spark Streaming

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article is to share with you about how to analyze the principle of Spark Streaming. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

One: initialize and receive data.

Spark Streaming caches the received data through receivers distributed on each node, and wraps the data into a RDD format that Spark can handle, inputs it to Spark Streaming, and then Spark Streaming submits the job to the Spark cluster for execution, as shown below:

The process of initialization can be summarized into two points. That is:

1. Initialization of the scheduler.

The scheduler schedules the operation of Spark Streaming, and users can tune it by configuring relevant parameters.

two。 Convert the receiver of the input stream to a RDD format that Spark can handle, distribute it across the cluster, and then start each receiver in the sink collection.

For different data sources, Spark Streaming provides different data receivers. Each receiver distributed on each node can be regarded as a specific process, receiving a part of the shunt data as input.

First of all, take a look at part of the source code of JavaStreamingContext, indicating that you can receive a Socket text data, or use the file as an input stream as a data source, as follows:

Class JavaStreamingContext (val ssc: StreamingContext) extends Closeable {def this (master: String, appName: String, batchDuration: Duration) = this (new StreamingContext (master, appName, batchDuration, null, Nil, Map ()) def socketTextStream (hostname: String, port: Int): JavaReceiverInputDStream [String] = {ssc.socketTextStream (hostname, port)} def textFileStream (directory: String): JavaDStream [String] = {ssc.textFileStream (directory)}}

Eg: when completing the following code:

Val lines = ssc.socketTextStream ("master", 9999)

This allows you to receive text data from socket, which returns JavaReceiverInputDStream, which is an implementation of ReceiverInputDStream that contains Receiver, receives data, and converts it into RDD data that Spark can handle.

Let's take a look at some of the source codes in JavaReceiverInputDStream:

Class JavaReceiverInputDStream [T] (val receiverInputDStream: ReceiverInputDStream [T]) (implicit overrideval classTag: ClassTag [T]) extends JavaInputDStream [T] (receiverInputDStream) {} object JavaReceiverInputDStream {implicit def fromReceiverInputDStream [T: ClassTag] (receiverInputDStream: ReceiverInputDStream [T]): JavaReceiverInputDStream [T] = {new JavaReceiverInputDStream [T] (receiverInputDStream)}}

Understand that JavaReceiverInputDStream is an implementation of ReceiverInputDStream through the source code, which contains Receiver, which can receive data and convert it into RDD data. Part of the source code is as follows: (note English notes)

Abstract class ReceiverInputDStream [T: ClassTag] (ssc_: StreamingContext) extends InputDStream [T] (ssc_) {/ * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a ReceiverInputDStream. * / def getReceiver (): Receiver [T] / / Nothing to start or stop as both taken care of by the ReceiverTracker. Def start () {} / * Generates RDDs with blocks received by the receiver of this stream. * / override def compute (validTime: Time): option [RDD [T]] = {val blockRDD = {if (validTime

< graph.startTime) { new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }} 当调用 getReceiver()时候,过程如下:(SocketInputDStream的部分源码) private[streaming]class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream =>

Iterator [T], storageLevel: StorageLevel) extends ReceiverInputDStream [T] (ssc_) {def getReceiver (): Receiver [T] = {new SocketReceiver (host, port, bytesToObjects, storageLevel)}}

Instead, it actually new a SocketReceiver object and pass in the previous parameters, as follows:

Private [private] class SocketReceiver [T: ClassTag] (host: String, port: Int, bytesToObjects: InputStream = > Iterator [T] StorageLevel: StorageLevel) extends Receiver [T] (storageLevel) with Logging {def onStart () {/ / Start the thread that receives data over a connection new Thread ("Socket Receiver") {setDaemon (true) override def run () {receive ()}. Start ()} / * * Create a socket connection and receive data until receiver is stopped * / def receive () {var socket: Socket = null try {logInfo ( "Connecting to" + host + ":" + port) socket = new Socket (host Port) logInfo ("Connected to" + host + ":" + port) val iterator = bytesToObjects (socket.getInputStream ()) while (! isStopped & & iterator.hasNext) {store (iterator.next)} if (! isStopped ()) {restart ("Socket data stream had no more data")} else {logInfo ("Stopped receiving")} catch {/ /...}

For the subclass to implement this method, the worker node can get the Receiver after calling it, so that the work of data reception can be distributed to the worker.

Receivers are distributed on each node, as follows:

Second, data reception and transformation

In the above "initialize and receive data" step, we briefly introduced how receiver collections are converted to RDD and receive data streams distributed on the cluster, then we will briefly understand how receiver receives and processes data streams. The general process is as follows:

Receiver provides a series of store () interfaces, eg: (please see the source code for more)

Abstract class Receiver [T] (val storageLevel: StorageLevel) extends Serializable {def store (dataItem: t) {supervisor.pushSingle (dataItem)} / * * Store an ArrayBuffer of received data as a data block into Spark's memory. * / def store (dataBuffer: ArrayBuffer [T]) {supervisor.pushArrayBuffer (dataBuffer, None, None)}}

These interfaces have been implemented and these storage functions will be completed by the ReceiverSupervisor initialized on the worker node. ReceiverSupervisor will also monitor Receiver, such as whether monitoring is started, whether stopped, whether to restart, report error, and so on. Some of the ReceiverSupervisor are as follows:

Def createBlockGenerator (blockGeneratorListener: BlockGeneratorListener): BlockGenerator

Private [streaming] abstract class ReceiverSupervisor (receiver: Receiver [_], conf: SparkConf) extends Logging {/ * * Push a single data item to backend data store. * / def pushSingle (data: Any) / * * Store an ArrayBuffer of received data as a data block into Spark's memory. * / def pushArrayBuffer (arrayBuffer: ArrayBuffer [_], optionalMetadata: Option [Any], optionalBlockId: Option [StreamBlockId]) / * * Create a custom [[BlockGenerator]] that the receiver implementation can directly control * using their provided [[BlockGeneratorListener]]. * * Note: Do not explicitly start or stop the `BlockGenerator`, the `ReceiverSupervisorImpl` * will take care of it. * / def createBlockGenerator (blockGeneratorListener: BlockGeneratorListener): BlockGenerator / * * Start receiver * / def startReceiver (): Unit = synchronized {/ /...}}

The implementation of the storage interface of ReceiverSupervisor, with the help of BlockManager, the data will be stored in the form of RDD, and different storage strategies will be selected according to StorageLevel. The default is serialized post-memory memory, if you can't put it down, write to disk (executor). For the calculated RDD intermediate results, the default storage policy is to store only memory after serialization.

The most fundamental reason is that the internal implementation of the store function calls the addData method of BlockGenerator, which ultimately stores the data in currentBuffer, and currentBuffer is actually an ArrayBuffer [Any].

The source code of the addData method of BlockGenerator is as follows: currentBuffer + = data

/ * Push a single data item into the buffer. * / def addData (data: Any): Unit = {if (state = = Active) {waitToPush () synchronized {if (state = = Active) {currentBuffer + = data} else {throw new SparkException ("Cannot add data as BlockGenerator has not been started or has been stopped")}} else {throw new SparkException ("Cannot" Add data as BlockGenerator has not been started or has been stopped ")}}

And how do you convert buffered data into blocks?

There are actually two threads inside the BlockGenerator:

(1) generate new batch periodically, and then package the previously generated batch into block. The periodicity here is actually configured by the spark.streaming.blockInterval parameter, and the default is 200ms. The source code is as follows

Privateval blockIntervalMs = conf.getTimeAsMs ("spark.streaming.blockInterval", "200ms")

(2) send the generated block to Block Manager.

First thread:

The first thread periodically calls the updateCurrentBuffer function to encapsulate the data stored in currentBuffer into Block. As for how the block is generated, that is, there is a RecurringTimer in BlockGenerator that encapsulates the data in the current buffer into a block Block at user-defined intervals. The source code is as follows

Privateval blockIntervalTimer = new RecurringTimer (clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

A block of data converted from a batch of records is then placed in blocksForPushing, where blocksForPushing is a queue of type ArrayBlockingQueue [Block]. The source code is as follows

Privateval blockQueueSize = conf.getInt ("spark.streaming.blockQueueSize", 10) privateval blocksForPushing = new ArrayBlockingQueue [Block] (blockQueueSize)

The default size is 10, which we can configure through the spark.streaming.blockQueueSize parameter (of course, in many cases this value does not need to be configured). When the blocksForPushing has no extra space, the thread blocks until there is free space to store the newly generated Block. If you have so much data that blocksForPushing can't store those block in time, you'll have to consider increasing the size of the spark.streaming.blockQueueSize.

The updateCurrentBuffer function implements the following source code:

/ * Change the buffer to which single records are added to. * / private def updateCurrentBuffer (time: Long): Unit = {try {var newBlock: Block = null synchronized {if (currentBuffer.nonEmpty) {val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer [Any] val blockId = StreamBlockId (receiverId, time-blockIntervalMs) listener.onGenerateBlock (blockId) newBlock = newBlock (blockId) NewBlockBuffer)} if (newBlock! = null) {blocksForPushing.put (newBlock) / / put is blocking when queue is full}} catch {case ie: InterruptedException = > logInfo ("Block updating timer thread was interrupted") case e: Exception = > reportError ("Error in block updating thread", e)}}

Second thread:

The second thread repeatedly calls the keepPushingBlocks function to get the generated Block from the blocksForPushing blocking queue, and then calls the pushBlock method to store the Block in the BlockManager.

Privateval blockPushingThread = new Thread () {override def run () {keepPushingBlocks ()}}

The source code for keepPushingBlocks implementation is as follows:

/ * Keep pushing blocks to the BlockManager. * / private def keepPushingBlocks () {logInfo ("Started block pushing thread") def areBlocksBeingGenerated: Boolean = synchronized {state! = StoppedGeneratingBlocks} try {/ / While blocks are being generated, keep polling for to-be-pushed blocks and push them. While (areBlocksBeingGenerated) {Option (blocksForPushing.poll (10, TimeUnit.MILLISECONDS)) match {case Some (block) = > pushBlock (block) case None = >}} / / At this point, state is StoppedGeneratingBlock So drain the queue of to-be-pushed blocks. LogInfo ("Pushing out the last" + blocksForPushing.size () + "blocks") while (! blocksForPushing.isEmpty) {val block = blocksForPushing.take () logDebug (s "Pushing block $block") pushBlock (block) logInfo ("Blocks left to push" + blocksForPushing.size ()} logInfo ("Stopped block pushing thread")} catch {case ie: InterruptedException = > logInfo ("Block pushing" Thread was interrupted ") case e: Exception = > reportError (" Error in block pushing thread ") E)}}

The source code for pushBlock implementation is as follows:

Private def pushBlock (block: Block) {listener.onPushBlock (block.id, block.buffer) logInfo ("Pushed block" + block.id)}

When stored in BlockManager, a BlcokStoreResult result is returned, which is the StreamBlcokId successfully stored in BlockManager.

The next step is to encapsulate the BlcokStoreResult into ReceivedBlockInfo, which is the latest unprocessed data, and then tell ReceiverTracker through Akka that a new block has been added, and ReceiverTracker will call the addBlock method to store the ReceivedBlockInfo in the streamIdToUnallocatedBlockQueues queue.

ReceiverTracker puts the stored blockId in the queue of the corresponding StreamId. (HashMap)

Privateval receiverTrackingInfos = new HashMap [Int, ReceiverTrackingInfo] receiverTrackingInfos.put (streamId, receiverTrackingInfo)

The source code of addBlock method in ReceiverTracker is implemented as follows:

Private def addBlock (receivedBlockInfo: ReceivedBlockInfo): Boolean = {receivedBlockTracker.addBlock (receivedBlockInfo)}

The ReceivedBlockInfo source code is implemented as follows:

Def blockId: StreamBlockId = blockStoreResult.blockId

/ * * Information about blocks received by the receiver * / private [streaming] case class ReceivedBlockInfo (streamId: Int, numRecords: Option [Long], metadataOption: Option [Any], blockStoreResult: ReceivedBlockStoreResult) {require (numRecords.isEmpty | | numRecords.get > = 0, "numRecords must not be negative") @ volatile private var _ isBlockIdValid = true def blockId: StreamBlockId = blockStoreResult.blockId}

Finally, take a look at some of the source code of ReceivedBlockStoreResult:

Private [streaming] trait ReceivedBlockStoreResult {/ / Any implementation of this trait will store a block id def blockId: StreamBlockId / / Any implementation of this trait will have to return the number of records def numRecords: Option [Long]} 3: generate RDD and submit Spark Job

Spark Streaming divides the data into RDD according to the time period, and then triggers the Action fetch of RDD

Hand over the Job, the Job is submitted to the Job Queue in the Job Manager, and then adjusted by the JobScheduler.

Job Scheduler submits Job to Spark's Job Scheduler, and then converts Job into a large number of tasks

Send it to Spark cluster for execution

Then take a look at the source code of JobScheduler:

JobScheduler is the core component of the whole Spark Streaming scheduling.

Partial source code:

Def submitJobSet (jobSet: JobSet) {if (jobSet.jobs.isEmpty) {logInfo ("No jobs added for time" + jobSet.time)} else {listenerBus.post (StreamingListenerBatchSubmitted (jobSet.toBatchInfo)) jobSets.put (jobSet.time, jobSet) jobSet.jobs.foreach (job = > jobExecutor.execute (new JobHandler (job)) logInfo ("Added jobs for time" + jobSet.time)}}

Enter the method of generating job in Graph. Graph is essentially an object generated by the DStreamGraph class. Some of the source codes are as follows:

Final private [streaming] class DStreamGraph extends Serializable with Logging {privateval inputStreams = new ArrayBuffer [InputDStream[ _]] () privateval outputStreams = new ArrayBuffer [DStream[ _] () def generateJobs (time: Time): Seq [Job] = {logDebug ("Generating jobs for time" + time) val jobs = this.synchronized {outputStreams.flatMap {outputStream = > val jobOption = outputStream.generateJob (time) jobOption.foreach (_ .setCallSite (outputStream.creationSite) jobOption) } logDebug ("Generated" + jobs.length + "jobs for time" + time) jobs}}

The object in outputStreaming is DStream. Take a look at the generateJob of DStream:

This is equivalent to generating a RDD for each time period, calling the method runJob of SparkContext to submit a Job of Spark.

Private [streaming] def generateJob (time: Time): Option [Job] = {getOrCompute (time) match {case Some (rdd) = > {val jobFunc = () > {val emptyFunc = {(iterator: Iterator [T]) = > {} context.sparkContext.runJob (rdd, emptyFunc)} Some (new Job (time) JobFunc)} case None = > None}} above is how to analyze the principle of Spark Streaming The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report