In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces how the ContinuousExecution implementation process in Spark is, which is very detailed and has a certain reference value. Friends who are interested must read it!
The most important thing is to see how ContinuousExecution rewrites LogicalPlan, the detailed code is not posted, and finally a LogicalPlan of type Sink is created.
Val writer = sink.createStreamWriter (s "$runId", triggerLogicalPlan.schema, outputMode, new DataSourceOptions (extraOptions.asJava)) val withSink = WriteToDataSourceV2 (writer, triggerLogicalPlan) val reader = withSink.collect {case DataSourceV2Relation (_, r: ContinuousReader) = > r} .head
The sink here can be thought of as DataSource. Then the IncrementalExecution is created with withSink as the input parameter.
TriggerLogicalPlan is the StreamingDataSourceV2Relation class.
IncrementalExecution itself is fine, just wrapping some extra auxiliary processing at each processing.
The function of WriteToDataSourceV2 is to write the execution result of triggerLogicalPlan's physical plan to external storage through writer. Instead of looking at WriteToDataSourceV2 here, let's just see what the corresponding physical plan of triggerLogicalPlan is. As mentioned earlier, its corresponding logical plan is: StreamingDataSourceV2Relation.
Find directly whether it is the physical plan corresponding to StreamingDataSourceV2Relation, so let's first look at the definition of the StreamingDataSourceV2Relation class:
Class StreamingDataSourceV2Relation (output: Seq [AttributeReference], reader: DataSourceReader) extends DataSourceV2Relation (output, reader) {override def isStreaming: Boolean = true}
It turns out to be a subclass of DataSourceV2Relation!
Go directly to DataSourceV2Relation's physical plan, which is defined in the DataSourceV2Strategy.scala file.
Object DataSourceV2Strategy extends Strategy {override def apply (plan: LogicalPlan): Seq [SparkPlan] = plan match {case DataSourceV2Relation (output, reader) = > DataSourceV2ScanExec (output, reader):: Nil case WriteToDataSourceV2 (writer, query) = > WriteToDataSourceV2Exec (writer, planLater (query)):: Nil case _ = > Nil}}
The physical plan for DataSourceV2Relation is DataSourceV2ScanExec.
There is not much code for DataSourceV2ScanExec either.
DataSourceV2ScanExec uses DataSourceReader as the reader of the data source. Its inputRDDs returns DataSourceRDD or ContinuousDataSourceRDD,ContinuousDataSourceRDD must be the corresponding ContinuousExecution. The other way is DataSourceRDD.
Whether it's DataSourceRDD or ContinuousDataSourceRDD, their classes that read the data source are the same, coming from DataSourceReader. DataSourceRDD or ContinuousDataSourceRDD both have very little code, and you can see what's going on at a glance.
The above is all the contents of the article "what is the implementation process of ContinuousExecution in Spark". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.