Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use DataStreamReader and DataStreamWriter

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how to use DataStreamReader and DataStreamWriter". In daily operation, I believe many people have doubts about how to use DataStreamReader and DataStreamWriter. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use DataStreamReader and DataStreamWriter". Next, please follow the editor to study!

Reading of the stream starts with DataStreamReader and DataStreamWriter.

DataStreamReader

DataStreamReader is the entry point for generating stream readers, and the key method is load. This code is critical, so post all the code first and analyze it slowly.

Def load (): DataFrame = {val ds = DataSource.lookupDataSource (source, sparkSession.sqlContext.conf). GetConstructor () .newInstance () val v1DataSource = DataSource (sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) val v1Relation = ds match {case _: StreamSourceProvider = > Some (StreamingRelation (v1DataSource)) case _ = > None} ds match {case provider: TableProvider = > val sessionOptions = DataSourceV2Utils.extractSessionConfigs (source = provider Conf = sparkSession.sessionState.conf) val options = sessionOptions + + extraOptions val dsOptions = new CaseInsensitiveStringMap (options.asJava) val table = userSpecifiedSchema match {case Some (schema) = > provider.getTable (dsOptions, schema) case _ = > provider.getTable (dsOptions)} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ table match {case _: SupportsRead if table.supportsAny (MICRO_BATCH_READ CONTINUOUS_READ) = > Dataset.ofRows (sparkSession, StreamingRelationV2 (provider, source, table, dsOptions, table.schema.toAttributes, v1Relation) (sparkSession) / / fallback to v1 / / TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. Case _ = > Dataset.ofRows (sparkSession, StreamingRelation (v1DataSource))} case _ = > / / Code path for data source v1. Dataset.ofRows (sparkSession, StreamingRelation (v1DataSource))}}

There are many branches, and it is important to distinguish between V1 and V2.

The logical relationship used by V1 is StreamingRelation; and the logical relationship used by V2 is StreamingRelationV2. Let's take a look at their corresponding physical plan first.

In the SparkStrategies.scala file, the physical plan is defined:

/ * This strategy is just for explaining `Dataset/ DataFrame` created by `spark.readStream`. * It won't affect the execution, because `StreamingRelation` will be replaced with * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will * be replaced with the real relation using the `S ource` in `StreamExecution`. * / object StreamingRelationStrategy extends Strategy {def apply (plan: LogicalPlan): Seq [SparkPlan] = plan match {case s: StreamingRelation = > StreamingRelationExec (s.sourceName, s.output):: Nil case s: StreamingExecutionRelation = > StreamingRelationExec (s.toString, s.output):: Nil case s: StreamingRelationV2 = > StreamingRelationExec (s.sourceName, s.output):: Nil case _ = > Nil}}

The physical plan is all StreamingRelationExec,StreamingRelationExec code, so in the end, looking at the code comments StreamingRelationExec is not a real physical plan.

Remember the related classes ContinuousExecution and MicroBatchExecution first. We couldn't find out how to execute the specific physical plans ContinuousExecution and MicroBatchExecution, so we tried to push back. Let's take a look at the ContinuousExecution code.

StreamExecution

StreamExecution is an abstract class. Its abstract method runActivatedStream performs a concrete continuous stream read task, and the subclass overrides the function.

The runStream method encapsulates the runActivatedStream method and adds some additional handling mechanisms such as event notification.

StreamingQueryManager

Here first try to see what StreamingQueryManager is for, see that the comments are supposed to manage all StreamingQuery.

Private def createQuery (...): StreamingQueryWrapper = {(sink, trigger) match {case (table: SupportsWrite, trigger: ContinuousTrigger) = > new StreamingQueryWrapper (new ContinuousExecution (sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, table, trigger, triggerClock, outputMode, extraOptions) DeleteCheckpointOnStop)) case _ = > if (operationCheckEnabled) {UnsupportedOperationChecker.checkForStreaming (analyzedPlan, outputMode)} new StreamingQueryWrapper (new MicroBatchExecution (sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)}}

For continuous streams, return a:

New StreamingQueryWrapper (new ContinuousExecution))

The function of StreamingQueryWrapper is to encapsulate StreamingQuery into serializable, which is no different from StreamingQuery. Here ContinuousExecution is wrapped for continuous streams.

ContinuousExecution

ContinuousExecution sees that the name should correspond to the physical execution plan of the continuous flow, inherited from StreamExecution (abstract class). Take a look at the main code that actually rewrites the runActivatedStream method.

Override protected def runActivatedStream (sparkSessionForStream: SparkSession): Unit = {val stateUpdate = new UnaryOperator [State] {override def apply (s: State) = s match {/ / If we ended the query to reconfigure, reset the state to active. Case RECONFIGURING = > ACTIVE case _ = > s}} do {runContinuous (sparkSessionForStream)} while (state.updateAndGet (stateUpdate) = = ACTIVE) stopSources ()}

The real execution logic code is in the private method runContinuous, so I won't expand it in detail here, just know the main flow.

The following is to see exactly where ContinuousExecution is converted from a logical plan to a physical plan.

Search the full text and find the file StreamingQueryManager.scala. By the way, this ContinuousExecution is found from the StreamingQueryManager above.

DataStreamWriter

DataStreamWriter is the place where the real trigger stream computing starts execution.

The start () method gets the key code snippet to give to StreamingQuery in the method:

Df.sparkSession.sessionState.streamingQueryManager.startQuery (extraOptions.get ("queryName"), extraOptions.get ("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = source = = "console" | source = = "noop", recoverFromCheckpointLocation = true, trigger = trigger)

Trace it to StreamingQueryManager and see its startQuery method.

The startQuery method is divided into several steps:

Call the createQuery method to return StreamingQuery.

Val query = createQuery (userSpecifiedName, userSpecifiedCheckpointLocation, df, extraOptions, sink, outputMode, useTempCheckpointLocation, recoverFromCheckpointLocation, trigger, triggerClock)

Query is StreamingQueryWrapper, which is code like this:

New StreamingQueryWrapper (new ContinuousExecution))

2. Start the query in the previous step

Try {query.streamingQuery.start ()} catch {}

Code definition:

Def start (): Unit = {logInfo (s "Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") QueryExecutionThread.setDaemon (true) queryExecutionThread.start () startLatch.await () / / Wait until thread started and QueryStart event has been posted}

Again, the definition of queryExecutionThread thread is as follows:

Val queryExecutionThread: QueryExecutionThread = new QueryExecutionThread (s "stream execution thread for $prettyIdString") {override def run (): Unit = {sparkSession.sparkContext.setCallSite (callSite) runStream ()}}

Finally, start the private method runStream in the thread.

3. Return query

Finally, return query, and notice that the query here is already start running in the above code.

At this point, the study on "how to use DataStreamReader and DataStreamWriter" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report