Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

(version customization) lesson 16: data cleaning of Spark Streaming source code interpretation thoroughly decrypted

2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The contents of this issue:

1. Detailed explanation of Spark Streaming metadata cleanup

2. Spark Streaming metadata cleanup source code parsing

1. How to study Spark Streaming metadata cleanup

Metadata will be generated when operating DStream, so if you want to solve the data cleaning work of RDD, you must start with DStream. Because DStream is the template for RDD, there are dependencies between DStream.

The operation of DStream produces RDD, and the receiving data also depends on DStream. The whole life cycle of data input, data calculation and output is built by DStream. As a result, DStream is responsible for the entire life cycle of RDD. Therefore, the entrance to the study is DStream.

Based on the Kafka data source, access to Kafka,DStream through Direct will continue to maintain a HashMap,HashMap in its in-memory data structure over time, that is, the time window and the RDD under the time window. Follow Batch Duration to store RDD and delete RDD.

Spark Streaming itself is always running, in its own calculation will continue to generate RDD, for example, every second Batch Duration will generate RDD, in addition to there may be accumulators, broadcast variables. Due to the continuous generation of these objects, Spark Streaming has its own set of object, metadata and data cleaning mechanisms.

Spark Streaming's management of RDD is equivalent to JVM's GC.

II. Source code analysis

Spark Streaming is through the Batch Durations we set to constantly generate RDD,Spark Streaming cleaning metadata related to the clock, because the data is generated periodically, so it must be a periodic release, all of which are related to JobGenerator, so let's start from here.

1. RecurringTimer: the message circulator sends messages to EventLoop continuously

= RecurringTimer (. MillisecondslongTime = > .post ((Time (longTime)

2. EventLoop:onReceive received the message

(): = synchronized {(! =) = EventLoop [JobGeneratorEvent] () {(event: JobGeneratorEvent): = processEvent (event) (e:): = {jobScheduler.reportError (e)}} .start () (.) {restart ()} {startFirstTime ()}

3. Receive cleanup metadata messages in processEvent

/ * * Processes all events * / private def processEvent (event: JobGeneratorEvent) {logDebug ("Got event" + event) event match {case GenerateJobs (time) = > generateJobs (time) case ClearMetadata (time) = > clearMetadata (time) / / Clean metadata case DoCheckpoint (time, clearCheckpointDataLater) = > doCheckpoint (time, clearCheckpointDataLater) case ClearCheckpointData (time) = > clearCheckpointData (time) / / Clean checkpoint}}

The specific method implementation will not be said here. Let's further analyze when these cleanup actions are called. In Spark Streaming applications, the final Job is handed over to JobHandler to execute, so let's analyze JobHandler.

Private class JobHandler (job: Job) extends Runnable with Logging {import JobScheduler._def run () {try {val formattedTime = UIUtils.formatBatchTime (job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s "/ streaming/batch/?id=$ {job.time.milliseconds}" val batchLinkText = s "[output operation ${job.outputOpId}" Batch time ${formattedTime}] "ssc.sc.setJobDescription (s" Streaming job from $batchLinkText "") ssc.sc.setLocalProperty (BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty (OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) / / We need to assign `eventLoop` to a temp variable. Otherwise, because / / `JobScheduler.stop (false) `may set `eventLoop`to null when this method is running, then / / it's possible that when `post`is called, `eventLoop` happens to null.var _ eventLoop = eventLoopif (_ eventLoop! = null) {_ eventLoop.post (JobStarted (job, clock.getTimeMillis () / / Disable checks for existing output directories in jobs launched by the streaming / / scheduler, since we may need to write output to an existing directory during checkpoint / / recovery See SPARK-4835 for more details.PairRDDFunctions.disableOutputSpecValidation.withValue (true) {job.run ()} _ eventLoop = eventLoopif (_ eventLoop! = null) {_ eventLoop.post (JobCompleted (job, clock.getTimeMillis ())}} else {/ / JobScheduler has been stopped.}} finally {ssc.sc.setLocalProperty (JobScheduler.BATCH_TIME_PROPERTY_KEY) Null) ssc.sc.setLocalProperty (JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)}

When Job is completed, a JobCompleted message will be sent to onReceive to execute the specific method through processEvent

Private def processEvent (event: JobSchedulerEvent) {try {event match {case JobStarted (job, startTime) = > handleJobStart (job, startTime) case JobCompleted (job, completedTime) = > handleJobCompletion (job, completedTime) case ErrorReported (m, e) = > handleError (m, e)} catch {case e: Throwable = > reportError ("Error in job scheduler", e)} private def handleJobCompletion (job: Job) CompletedTime: Long) {val jobSet = jobSets.get (job.time) jobSet.handleJobCompletion (job) job.setEndTime (completedTime) listenerBus.post (StreamingListenerOutputOperationCompleted (job.toOutputOperationInfo)) logInfo ("Finished job" + job.id + "from job set of time" + jobSet.time) if (jobSet.hasCompleted) {jobSets.remove (jobSet.time) jobGenerator.onBatchCompletion (jobSet.time) logInfo ("Total delay:% .3f s for time% s (execution:% .3f s)" .format (jobSet.totalDelay / 1000.0) JobSet.time.toString,jobSet.processingDelay / 1000.0) listenerBus.post (StreamingListenerBatchCompleted (jobSet.toBatchInfo))} job.result match {case Failure (e) = > reportError ("Error running job" + job, e) case _ = >}}

Clean up metadata through jobGenerator.onBatchCompletion

/ * Callback called when a batch has been completely processed. * / def onBatchCompletion (time: Time) {eventLoop.post (ClearMetadata (time))}

At this point, the steps for Spark Streaming to clean up the metadata are basically complete

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report