Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the DataSourceV2 stream processing method?

2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "what is the method of DataSourceV2 flow processing". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

SparkSession structured flow processing is actually triggered by DataSet's writeStream. This is different from the traditional spark sql approach. WriteStream will find the startQuery method of StreamingQueryManager, and then step by step to MicroBatchExecution and ContinuousExecution.

Core point: StreamingRelationV2 is converted to StreamingDataSourceV2Relation in MicroBatchExecution and ContinuousExecution. MicroBatchExecution and ContinuousExecution are only used in the createQuery method of StreamingQueryManager. So where will the createQuery method of StreamingQueryManager be used? The trace code will find that the startQuery method of StreamingQueryManager is called in DataStreamWriter and then the createQuery method is called.

DataStreamWriter was created by writeStream of Dataset.

[the above is the process of writing a stream].

Key class: BaseSessionStateBuilder, which has the definition of analyzer.

Protected def analyzer: Analyzer = new Analyzer (catalog, v2SessionCatalog, conf) {overrideval extendedResolutionRules: Seq [rose [logical Plan]] = new FindDataSourceTable (session) +: new ResolveSQLOnFile (session) +: new FallBackFileSourceV2 (session) +: DataSourceResolution (conf) This.catalogManager) +: customResolutionRules overrideval postHocResolutionRules: Seq [logical Plan] = new DetectAmbiguousSelfJoin (conf) +: PreprocessTableCreation (session) +: PreprocessTableInsertion (conf) +: DataSourceAnalysis (conf) +: customPostHocResolutionRules overrideval extendedCheckRules: Seq [LogicalPlan = > Unit] = PreWriteCheck +: PreReadCheck +: HiveOnlyCheck +: TableCapabilityCheck +: customCheckRules}

There is nothing special to pay attention to here, so ignore it first.

DataSourceV2 refers to the structured flow processing engine framework of the V2 version of spark. The logical plan here is StreamingDataSourceV2Relation, and the corresponding physical plans are divided into two categories: MicroBatchScanExec and ContinuousScanExec, whose application scenarios can be distinguished by their names, one is micro-batch mode, and the other is continuous flow mode.

Let's start with the physics plan.

The two physical plans are based on the same parent class: DataSourceV2ScanExecBase. First look at the code of the parent class:

Key code:

Override def doExecute (): RDD [InternalRow] = {val numOutputRows = longMetric ("numOutputRows") inputRDD.map {r = > numOutputRows + = 1 r}}

The subclass needs to override inputRDD.

StreamExecution

Two important checkpoint attributes:

Val offsetLog = new OffsetSeqLog (sparkSession, checkpointFile ("offsets"))

Val commitLog = new CommitLog (sparkSession, checkpointFile ("commits"))

Which offset is currently read by offsetLog and which Offset is currently processed by commitLog. These two Log are very important and work together to ensure Exactly-once semantics.

MicroBatchScanExec

All right, let's see how MicroBatchScanExec rewrites inputRDD.

Override lazy val partitions: Seq [InputPartition] = stream.planInputPartitions (start, end) override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory () override lazy val inputRDD: RDD [InternalRow] = {new DataSourceRDD (sparkContext, partitions, readerFactory, supportsColumnar)}

There are three places, the first is to rewrite Seq [InputPartition], call the planInputPartitions method of stream, notice that the stream type here is MicroBatchStream;, the second is to rewrite readerFactory to get the reader factory class; the third override is inputRDD, creating DataSourceRDD as inputRDD, and the seq [InputPartition] and readerFactory rewritten in the first two steps are used as construction parameters of DataSourceRDD.

First of all, let's take a look at what DataSourceRDD does.

The code for the class DataSourceRDD is short and easy to read. The most important thing is the compute method, which gives the whole code first:

Override def compute (split: Partition Context: TaskContext): Iterator [InternalRow] = {val inputPartition = castPartition (split). InputPartition val reader: PartitionReader [_] = if (columnarReads) {partitionReaderFactory.createColumnarReader (inputPartition)} else {partitionReaderFactory.createReader (inputPartition)} context.addTaskCompletionListener [Unit] (_ = > reader.close () val iter = new Iterator [Any] {private [this] var valuePrepared = false override def hasNext: Boolean = {if (! valuePrepared) ) {valuePrepared = reader.next ()} valuePrepared} override def next (): Any = {if (! hasNext) {throw new java.util.NoSuchElementException ("End of stream")} valuePrepared = false reader.get ()} / / TODO: SPARK-25083 remove the type erasure hack in data source scan new InterruptibleIterator (context) Iter.asInstanceOf [Iterator [InternalRow]])}

First create a PartitionReader based on the reader factory class, and then call the get method of PartitionReader to get the data. It's that simple!

ContinuousScanExec

Finally, take a look at the definition of ContinuousScanExec.

Override lazy val partitions: Seq [InputPartition] = stream.planInputPartitions (start) override lazy val readerFactory: ContinuousPartitionReaderFactory = {stream.createContinuousReaderFactory ()} override lazy val inputRDD: RDD [InternalRow] = {EpochCoordinatorRef.get (sparkContext.getLocalProperty (ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync [Unit] (SetReaderPartitions (partitions.size)) new ContinuousDataSourceRDD (sparkContext, sqlContext.conf.continuousStreamingExecutorQueueSize, sqlContext.conf.continuousStreamingExecutorPollIntervalMs, partitions, schema ReaderFactory.asInstanceOf [ContinuousPartitionReaderFactory])}

Similar to the microbatch mode MicroBatchScanExec, there are three overrides. The first is to rewrite Seq [InputPartition], calling the planInputPartitions method of stream. Note that the stream type here is ContinuousStream;, the second is rewriting readerFactory, and the third rewriting of the reader factory class ContinuousPartitionReaderFactory; is inputRDD, creating ContinuousDataSourceRDD as inputRDD, and Seq [InputPartition] and readerFactory rewritten in the first two steps as construction parameters of ContinuousDataSourceRDD.

First of all, let's take a look at what ContinuousDataSourceRDD does.

The code of ContinuousDataSourceRDD is basically the same as that of DataSourceRDD. Look at the source code directly. There will be no details here, and there is nothing to talk about in detail.

For Kafka, ContinuousDataSourceRDD and DataSourceRDD are ultimately the same.

This is the end of the content of "what is the method of DataSourceV2 flow processing". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report