In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly talks about "what is the relationship between DStream and RDD". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what is the relationship between DStream and RDD"?
How is RDD generated? What does RDD depend on to generate? What is the basis for RDD generation? Is the execution of RDD in Spark Streaming different from that of RDD in Spark Core? What do we do with RDD after running?
RDD itself is also a basic object, for example, if BatchInterval is 1 second, then RDD will be generated every second, and the object cannot be fully accommodated in memory. After each BatchInterval job is executed, how to manage the existing RDD.
ForEachDStream does not necessarily trigger the execution of Job, which has nothing to do with the execution of Job.
The generation of Job is caused by the Spark Streaming framework.
ForeachRDD is the back door of Spark Streaming and can operate on RDD directly.
DStream is the template for RDD, and the later DStream is dependent on the previous DStream.
Val lines = jsc.socketTextStream ("127.0.0.1", 9999) SocketInputDStream is generated here.
Lines.flatMap (_ .split (")) .map (word = > (word, 1)) .reduceByKey (_ + _) .map () where SocketInputDStream is converted to FlatMappedDStream, then to MappedDStream, then to ShuffledDStream, and then to ForEachDStream.
For the DStream class, this is explained in the source code.
* DStreams internally is characterized by a few basic properties:
*-A list of other DStreams that the DStream depends on
*-A time interval at which the DStream generates an RDD
*-A function that is used to generate an RDD after each time interval
It roughly means:
1.DStream depends on other DStream.
two。 Generate a RDD every other BatchDuration,DStream
3. Every BatchDuration,DStream internal function generates RDD
DStream is dependent from back to front, because DStream represents Spark Streaming business logic, RDD is dependent from back to front, and DStream is lazy level. The dependency of DStream must be highly consistent with that of RDD.
GeneratedRDDs stores RDD instances at different times in the DStream class. Each DStream instance has its own generatedRDDs. In the actual operation, because it is pushed back to forward, the calculation only acts on the last DStream.
/ / RDDs generated, marked as private [streaming] so that testsuites can access it
@ transient
Private [streaming] var generatedRDDs = new HashMap [Time, RDD [T]] ()
How generatedRDDs is obtained. The getOrCompute method of DStream determines whether the RDD corresponding to the time already exists in the HashMap according to the time. If not, call compute to get the RDD and put it into the HashMap.
/ * *
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
, /
Private [streaming] final def getOrCompute (time: Time): option [RDD [T]] = {
/ / If RDD was already generated, then retrieve it from HashMap
/ / or else compute the RDD
GeneratedRDDs.get (time). OrElse {
/ / Compute the RDD if time is valid (e.g. Correct time in a sliding window)
/ / of RDD generation, else generate nothing.
If (isTimeValid (time)) {
Val rddOption = createRDDWithLocalProperties (time, displayInnerRDDOps = false) {
/ / Disable checks for existing output directories in jobs launched by the streaming
/ / scheduler, since we may need to write output to an existing directory during checkpoint
/ / recovery; see SPARK-4835 for more details. We need to have this call here because
/ / compute () might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue (true) {
Compute (time)
}
}
RddOption.foreach {case newRDD = >
/ / Register the generated RDD for caching and checkpointing
If (storageLevel! = StorageLevel.NONE) {
NewRDD.persist (storageLevel)
LogDebug (s "Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
If (checkpointDuration! = null & & (time-zeroTime) .isMultipleOf (checkpointDuration)) {
NewRDD.checkpoint ()
LogInfo (s "Marking RDD ${newRDD.id} for time $time for checkpointing")
}
GeneratedRDDs.put (time, newRDD)
}
RddOption
} else {
None
}
}
}
The ReceiverInputDStream subclass of DStream is used to illustrate the compute method, and the createBlockRDD method is called internally.
/ * *
* Generates RDDs with blocks received by the receiver of this stream. , /
Override def compute (validTime: Time): option [RDD [T]] = {
Val blockRDD = {
If (validTime
< graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) } createBlockRDD会返回BlockRDD,由于ReceiverInputDStream没有父依赖,所以自己生成RDD。 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id =>Ssc.sparkContext.env.blockManager.master.contains (id)
}
If (validBlockIds.size! = blockIds.size) {
LogWarning ("Some blocks could not be recovered as they were not found in memory." +
"To prevent such data loss, enabled Write Ahead Log (see programming guide" +)
"for more details."
}
New BlockRDD [T] (ssc.sc, validBlockIds)
}
} else {
/ / If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
/ / according to the configuration
If (WriteAheadLogUtils.enableReceiverLog (ssc.conf)) {
New WriteAheadLogBackedBlockRDD [T] (
Ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
New BlockRDD [T] (ssc.sc, Array.empty)
}
}
}
Take the MappedDStream subclass of DStream, for example, the compute method here is to call the getOrCompute method of the parent RDD to get the RDD, and then use the map operation.
Private [streaming]
Class MappedDStream [T: ClassTag, U: ClassTag] (
Parent: DStream [T]
MapFunc: t = > U
) extends DStream [U] (parent.ssc) {
Override def dependencies: list [DStream [_]] = List (parent)
Override def slideDuration: Duration = parent.slideDuration
Override def compute (validTime: Time): option [RDD [U]] = {
Parent.getOrCompute (validTime) .map (_ .map [U] (mapFunc))
}
}
From the above two subclasses of DStream, you can show that the first DStream, that is, the comput method of InputDStream, takes the data and calculates it, while the other DStream depends on the parent DStream, call the getOrCompute method of the parent DStream, and then calculate.
The above shows that the operation on DStream finally acts on the operation on RDD.
Then take a look at another subclass of DStream, ForEachDStream, and find that its compute method has no action, but overrides the generateJob method.
Private [streaming]
Class ForEachDStream [T: ClassTag] (
Parent: DStream [T]
ForeachFunc: (RDD [T], Time) = > Unit
DisplayInnerRDDOps: Boolean
) extends DStream [Unit] (parent.ssc) {
Override def dependencies: list [DStream [_]] = List (parent)
Override def slideDuration: Duration = parent.slideDuration
Override def compute (validTime: Time): option [RDD] = None
Override def generateJob (time: Time): Option [Job] = {
Parent.getOrCompute (time) match {
Case Some (rdd) = >
Val jobFunc = () = > createRDDWithLocalProperties (time, displayInnerRDDOps) {
ForeachFunc (rdd, time)
}
Some (new Job (time, jobFunc))
Case None = > None
}
}
}
Start with the generation of Job, the generateJobs method of JobGenerator, and the generateJobs method of DStreamGraph called internally.
/ * * Generate jobs and perform checkpoint for the given `time`. , /
Private def generateJobs (time: Time) {
/ / Set the SparkEnv in this thread, so that job generation code can access the environment
/ / Example: BlockRDDs are created in this thread, and it needs to access BlockManager
/ / Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set (ssc.env)
Try {
/ / obtain specific data according to a specific time
JobScheduler.receiverTracker.allocateBlocksToBatch (time) / / allocate received blocks to batch
/ / call generateJobs of DStreamGraph to generate Job
Graph.generateJobs (time) / / generate jobs using allocated block
} match {
Case Success (jobs) = >
Val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo (time)
JobScheduler.submitJobSet (JobSet (time, jobs, streamIdToInputInfos))
Case Failure (e) = >
JobScheduler.reportError ("Error generating jobs for time" + time, e)
}
EventLoop.post (DoCheckpoint (time, clearCheckpointDataLater = false))
}
The generateJobs method of DStreamGraph calls the generateJob method of OutputStream, and OutputStream is ForEachDStream.
Def generateJobs (time: Time): Seq [Job] = {
LogDebug ("Generating jobs for time" + time)
Val jobs = this.synchronized {
OutputStreams.flatMap {outputStream = >
Val jobOption = outputStream.generateJob (time)
JobOption.foreach (_ .setCallSite (outputStream.creationSite))
JobOption
}
}
LogDebug ("Generated" + jobs.length + "jobs for time" + time)
Jobs
}
At this point, I believe you have a deeper understanding of "what is the relationship between DStream and RDD". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.