In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The contents of this issue:
Causes and phenomena of Spark Streaming data cleanup
Spark Streaming data cleanup code parsing
After parsing so many lessons on Spark Streaming, we have become more and more aware that Spark Streaming is just an application based on Spark Core, so mastering Spark Streaming is absolutely good for us to write Spark applications.
Unlike Spark Core applications, the data of Spark Core applications is stored in the underlying file system, such as HDFS and other storage systems, while Spark Streaming has been running and constantly calculating, resulting in a large number of accumulators and broadcast variables in every second, so objects and metadata need to be cleaned regularly. After each batch duration runtime constantly triggers job, you need to clean up the rdd and metadata. The printed log can be seen in Client mode, and the contents of the cleanup log can also be seen from the file log.
Spark runs on jvm, jvm produces objects, jvm needs to recycle objects, and if we don't manage gc (object generation and recycling), jvm runs out quickly. Now we are studying the Spark GC of Spark Streaming. The data management and metadata management of Spark Streaming to rdd is equivalent to the management of jvm to gc. Data and metadata are generated when operating DStream, and the recovery of data and metadata needs to study the generation and recovery of DStream.
Data input depends on InputDStream, data input, data operation, data output, the whole life cycle is based on DStream, DStream is responsible for the life cycle of rdd, rrd is generated by DStream, and the operation of rdd is also the operation of DStream, so the cycle of batchDuration is constantly generated, so to study the operation of rdd is to study the operation of DStream. Take Direct from kafka as an example, DirectKafkaInputDStream produces KafkaRDD
Override def compute (validTime: Time): Option [KafkaRDD [K, V, U, T, R]] = {val untilOffsets = clamp (latestLeaderOffsets (maxRetries)) val rdd = KafkaRDD [K, V, U, T, R] (context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) / / Report the record number and metadata of this batch interval to InputInfoTracker. Val offsetRanges = currentOffsets.map {case (tp, fo) = > val uo = untilOffsets (tp) OffsetRange (tp.topic, tp.partition, fo, uo.offset)} val description = offsetRanges.filter {offsetRange = > / / Don't display empty ranges. OffsetRange.fromOffset! = offsetRange.untilOffset} .map {offsetRange = > s "topic: ${offsetRange.topic}\ tpartition: ${offsetRange.partition}\ t" + s "offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"} .mkString ("\ n") / / Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map ("offsets"-> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION-> description) val inputInfo = StreamInputInfo (id, rdd.count) Metadata) ssc.scheduler.inputInfoTracker.reportInfo (validTime, inputInfo) currentOffsets = untilOffsets.map (kv = > kv._1-> kv._2.offset) Some (rdd)}
DStream periodically generates and releases data over time, and there is a timer in JobGenerator:
Privateval timer = new RecurringTimer (clock, ssc.graph.batchDuration.milliseconds, longTime = > eventLoop.post (GenerateJobs (new Time (longTime), "JobGenerator")
There is also an EventLoop in JobGenerator to receive message events periodically:
/ * Processes all events * / private def processEvent (event: JobGeneratorEvent) {logDebug ("Got event" + event) event match {case GenerateJobs (time) = > generateJobs (time) case ClearMetadata (time) = > clearMetadata (time) case DoCheckpoint (time, clearCheckpointDataLater) = > doCheckpoint (time, clearCheckpointDataLater) case ClearCheckpointData (time) = > clearCheckpointData (time)}}
There are methods to clean up metadata and checkpoint data clearMetadata: clear metadata information.
/ * * Clear DStream metadata for the given `time`. * / private def clearMetadata (time: Time) {ssc.graph.clearMetadata (time) / / If checkpointing is enabled, then checkpoint, / / else mark batch to be fully processed if (shouldCheckpoint) {eventLoop.post (DoCheckpoint (time, clearCheckpointDataLater = true))} else {/ / If checkpointing is not enabled, then delete metadata information about / / received blocks (block data not saved in any case). Otherwise, wait for / / checkpointing of this batch to complete. Val maxRememberDuration = graph.getMaxInputStreamRememberDuration () jobScheduler.receiverTracker.cleanupOldBlocksAndBatches (time-maxRememberDuration) jobScheduler.inputInfoTracker.cleanup (time-maxRememberDuration) markBatchFullyProcessed (time)}}
DStreamGraph: first it cleans up outputDStream, which is actually ForEachDStream
Def clearMetadata (time: Time) {logDebug ("Clearing metadata for time" + time) this.synchronized {outputStreams.foreach (_ .clearmetadata (time))} logDebug ("Cleared old metadata for time" + time)}
DStream.clearMetadata: in addition to clearing RDD, you can also clear metadata metadata. You can set the rememberDuration time if you want to RDD across Batch Duration. RememberDuration
/ * Clear metadata that are older than `rememberDuration` of this DStream. * This is an internal method that should not be called directly. This default * implementation clears the old generated RDDs. Subclasses of DStream may override * this to clear their own metadata along with the generated RDDs. * / private [streaming] def clearMetadata (time: Time) {val unpersistData = ssc.conf.getBoolean ("spark.streaming.unpersist", true) / / rememberDuration memory cycle to see if RDD is oldRDD val oldRDDs = generatedRDDs.filter (_. _ 1 s "${x.room1}-> ${x._2.id}"). MkString (",") + "]") / / remove key from generatedRDDs. GeneratedRDDs-- = oldRDDs.keys if (unpersistData) {logDebug ("Unpersisting oldRDDs:" + oldRDDs.values.map (_ .id) .mkString (" ") oldRDDs.values.foreach {rdd = > rdd.unpersist (false) / / Explicitly remove blocks of BlockRDD rdd match {case b: BlockRDD [_] = > logInfo (" Removing blocks of RDD "+ b +" of time "+ time) b.removeBlocks () / / Clean up RDD data case _ = >}} logDebug (" Cleared "+ oldRDDs.size +" RDDs " That were older than "+ (time-rememberDuration) +": "+ oldRDDs.keys.mkString (" )) / / dependent DStream also needs to be cleaned up. Dependencies.foreach (_ .clearmetadata (time))}
In BlockRDD, BlockManagerMaster deletes the Block based on the blockId. The operation to delete a Block is irreversible.
/ * Remove the data blocks that this BlockRDD is made from. NOTE: This is an * irreversible operation, as the data in the blocks cannot be recovered back * once removed. Use it with caution. * / private [spark] def removeBlocks () {blockIds.foreach {blockId = > sparkContext.env.blockManager.master.removeBlock (blockId)} _ isValid = false}
Go back to processEvent in JobGenerator and take a look at clearCheckpoint: clear cached data
/ * * Clear DStream checkpoint data for the given `time`. * / private def clearCheckpointData (time: Time) {ssc.graph.clearCheckpointData (time) / / All the checkpoint information about which batches have been processed, etc have / / been saved to checkpoints, so its safe to delete block metadata and data WAL files val maxRememberDuration = graph.getMaxInputStreamRememberDuration () jobScheduler.receiverTracker.cleanupOldBlocksAndBatches (time-maxRememberDuration) jobScheduler.inputInfoTracker.cleanup (time-maxRememberDuration) markBatchFullyProcessed (time)}
ClearCheckpointData:
Def clearCheckpointData (time: Time) {logInfo ("Clearing checkpoint data for time" + time) this.synchronized {outputStreams.foreach (_ .clearCheckpointData (time))} logInfo ("Cleared checkpoint data for time" + time)}
ClearCheckpointData: the same as clearing the metadata information, or clearing the cached data that DStream depends on.
Private [streaming] def clearCheckpointData (time: Time) {logDebug ("Clearing checkpoint data") checkpointData.cleanup (time) dependencies.foreach (_ .clearCheckpointData (time)) logDebug ("Cleared checkpoint data")}
DStreamCheckpointData: clear cached data
/ * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been * written to the checkpoint directory. * / def cleanup (time: Time) {/ / Get the time of the oldest checkpointed RDD that was written as part of the / / checkpoint of `time` timeToOldestCheckpointFileTime.remove (time) match {case Some (lastCheckpointFileTime) = > / / Find all the checkpointed RDDs (i.e. Files) that are older than `lastCheckpointFileTime` / / This is because checkpointed RDDs older than this are not going to be needed / / even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter (_. _ 1)
< lastCheckpointFileTime) logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { case (time, file) =>Try {val path = new Path (file) if (fileSystem = = null) {fileSystem = path.getFileSystem (dstream.ssc.sparkContext.hadoopConfiguration)} fileSystem.delete (path) True) timeToCheckpointFile-= time logInfo ("Deleted checkpoint file'" + file + "'for time" + time)} catch {case e: Exception = > logWarning ("Error deleting old checkpoint file'" + file + "'for time" + time, e) fileSystem = null} case None = > logDebug ("Nothing to delete")}}
At this point, we know how to clean up the old data and what data to clean up, but when did the clean up data be triggered? When the Job is finally submitted, it is left to the JobHandler to execute.
Private class JobHandler (job: Job) extends Runnable with Logging {import JobScheduler._ def run () {try {val formattedTime = UIUtils.formatBatchTime (job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s "/ streaming/batch/?id=$ {job.time.milliseconds}" val batchLinkText = s "[output operation ${job.outputOpId}" Batch time ${formattedTime}] "ssc.sc.setJobDescription (s" Streaming job from $batchLinkText "") ssc.sc.setLocalProperty (BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty (OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) / / We need to assign `eventLoop` to a temp variable. Otherwise, because / / `JobScheduler.stop (false) `may set `eventLoop` to null when this method is running, then / / it's possible that when `post` is called, `eventLoop` happens to null. Var _ eventLoop = eventLoop if (_ eventLoop! = null) {_ eventLoop.post (JobStarted (job, clock.getTimeMillis ()) / / Disable checks for existing output directories in jobs launched by the streaming / / scheduler, since we may need to write output to an existing directory during checkpoint / / recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue (true) {job.run ()} _ eventLoop = eventLoop if (_ eventLoop! = null) {/ / when Job is complete, eventLoop will send a message initializing onReceive _ eventLoop.post (JobCompleted (job, clock.getTimeMillis ())}} else {/ / JobScheduler has been stopped. }} finally {ssc.sc.setLocalProperty (JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty (JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)}
The onReceive initialization of EventLoop received the message JobCompleted.
Def start (): Unit = synchronized {if (eventLoop! = null) return / / scheduler has already been started logDebug ("Starting JobScheduler") eventLoop = new EventLoop [JobSchedulerEvent] ("JobScheduler") {override protected def onReceive (event: JobSchedulerEvent): Unit = processEvent (event) override protected def onError (e: Throwable): Unit = reportError ("Error in job scheduler", e)} eventLoop.start ()
ProcessEvent:
Private def processEvent (event: JobSchedulerEvent) {try {event match {case JobStarted (job, startTime) = > handleJobStart (job, startTime) case JobCompleted (job, completedTime) = > handleJobCompletion (job, completedTime) case ErrorReported (m, e) = > handleError (m, e)} catch {case e: Throwable = > reportError ("Error in job scheduler", e)}}
Call the onBatchCompletion method of JobGenerator to clear the metadata.
Private def handleJobCompletion (job: Job CompletedTime: Long) {val jobSet = jobSets.get (job.time) jobSet.handleJobCompletion (job) job.setEndTime (completedTime) listenerBus.post (StreamingListenerOutputOperationCompleted (job.toOutputOperationInfo)) logInfo ("Finished job" + job.id + "from job set of time" + jobSet.time) if (jobSet.hasCompleted) {jobSets.remove (jobSet.time) jobGenerator.onBatchCompletion (jobSet.time) logInfo ("Total delay:% .3f s for time% s (execution:%. 3f s) ".format (jobSet.totalDelay / 1000.0) JobSet.time.toString, jobSet.processingDelay / 1000.0) listenerBus.post (StreamingListenerBatchCompleted (jobSet.toBatchInfo))} job.result match {case Failure (e) = > reportError ("Error running job" + job, e) case _ = >}}
At this point, we understand when to trigger the process of clarifying old data.
Note:
1. DT big data DreamWorks Wechat official account DT_Spark
2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580
3. Sina Weibo: http://www.weibo.com/ilovepains
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 269
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.