Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Spark sql streaming processing

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Editor to share with you the example analysis of Spark sql streaming processing, I believe that most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!

Spark sql supports streaming, and streaming has Source,Sink. Source defines the source of the flow, Sink defines the destination of the flow, and the execution of the flow is triggered from Sink.

Dataset's writeStream defines the destination of the flow and triggers the actual execution of the flow, so the analysis starts with writeStream.

WriteStream = new DataStreamWriter [T] (this)

DataStreamWriter

The function of DataStreamWriter is to write the dataset of the input parameter to external storage, such as kafka,database,txt, etc.

The main trigger method is the start method, which returns a StreamingQuery object, code:

Def start (): StreamingQuery = {if (source = = "memory") {assertNotPartitioned ("memory") val (sink, resultDf) = trigger match {case _: ContinuousTrigger = > val s = new MemorySinkV2 () val r = Dataset.ofRows (df.sparkSession, new MemoryPlanV2 (s, df.schema.toAttributes)) (s, r) case _ = > val s = new MemorySink (df.schema OutputMode) val r = Dataset.ofRows (df.sparkSession, new MemoryPlan (s)) (s, r)} val chkpointLoc = extraOptions.get ("checkpointLocation") val recoverFromChkpoint = outputMode = OutputMode.Complete () val query = df.sparkSession.sessionState.streamingQueryManager.startQuery (extraOptions.get ("queryName"), chkpointLoc, df, extraOptions.toMap, sink, outputMode UseTempCheckpointLocation = true, recoverFromCheckpointLocation = recoverFromChkpoint, trigger = trigger) resultDf.createOrReplaceTempView (query.name) query} else if (source = = "foreach") {assertNotPartitioned ("foreach") val sink = new ForeachSink [T] (foreachWriter) (ds.exprEnc) df.sparkSession.sessionState.streamingQueryManager.startQuery (extraOptions.get ("queryName"), extraOptions.get ("checkpointLocation"), df ExtraOptions.toMap, sink, outputMode, useTempCheckpointLocation = true, trigger = trigger)} else {val ds = DataSource.lookupDataSource (source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split (" ") val sink = ds.newInstance () match {case w: StreamWriteSupport if! disabledSources.contains (w.getClass.getCanonicalName) = > w case _ = > val ds = DataSource (df.sparkSession, className = source, options = extraOptions.toMap PartitionColumns = normalizedParCols.getOrElse (Nil) ds.createSink (outputMode)} df.sparkSession.sessionState.streamingQueryManager.startQuery (extraOptions.get ("queryName"), extraOptions.get ("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = source = = "console", recoverFromCheckpointLocation = true, trigger = trigger)}}

Let's look at the code for the last conditional branch here. Ds is the corresponding DataSource,sink and sometimes ds. Finally, the calculation of the flow is started by the startQuery of streamingQueryManager, and the StreamingQuery object in the calculation is returned.

The startQuery method of streamingQueryManager mainly calls the createQuery method to create a StreamingQueryWrapper object, which is a private method:

Private def createQuery (userSpecifiedName: Option [String], userSpecifiedCheckpointLocation: Option [String], df: DataFrame, extraOptions: Map [String, String], sink: BaseStreamingSink, outputMode: OutputMode, useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, trigger: Trigger TriggerClock: Clock): StreamingQueryWrapper = {var deleteCheckpointOnStop = falseval checkpointLocation = userSpecifiedCheckpointLocation.map {userSpecified = > new Path (userSpecified). ToUri.toString}. OrElse {df.sparkSession.sessionState.conf.checkpointLocation.map {location = > new Path (location, userSpecifiedName.getOrElse (UUID.randomUUID () .toString). ToUri.toString}}. GetOrElse {if (useTempCheckpointLocation) {/ / Delete the temp checkpoint when a query is being stopped without errors. DeleteCheckpointOnStop = true Utils.createTempDir (namePrefix = s "temporary"). GetCanonicalPath} else {throw new AnalysisException ("checkpointLocation must be specified either" + "" through option ("checkpointLocation",...) Or "" + s "" SparkSession.conf.set ("${SQLConf.CHECKPOINT_LOCATION.key}",...) "")}} / / If offsets have already been created, we trying to resume a query. If (! recoverFromCheckpointLocation) {val checkpointPath = new Path (checkpointLocation, "offsets") val fs = checkpointPath.getFileSystem (df.sparkSession.sessionState.newHadoopConf ()) if (fs.exists (checkpointPath)) {throw new AnalysisException (s "This query does not support recovering from checkpoint location. "+ s" Delete $checkpointPath to start over. ")} val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed () if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForStreaming (analyzedPlan) OutputMode)} if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {logWarning (s "${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key}" + "is not supported in streaming DataFrames/Datasets and will be disabled.")} (sink, trigger) match {case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) = > UnsupportedOperationChecker.checkForContinuous (analyzedPlan, outputMode) new StreamingQueryWrapper (new ContinuousExecution (sparkSession, userSpecifiedName.orNull, checkpointLocation) AnalyzedPlan, v2Sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop) case _ = > new StreamingQueryWrapper (new MicroBatchExecution (sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, sink, trigger, triggerClock, outputMode, extraOptions DeleteCheckpointOnStop)}}

It is divided into ContinuousExecution and MicroBatchExecution according to whether continuous stream operation or micro-batch operation. They are all subclasses of StreamExecution, and StreamExecution is the abstract class of stream processing. The class structure of StreamExecution will be analyzed later.

The code structure and function of ContinuousExecution and MicroBatchExecution are actually very similar. Let's take ContinuousExecution as an example.

ContinuousExecution

First of all, ContinuousExecution is an unfinished stream. When there is no data in the temporary flow, ContinuousExecution blocks the thread to wait for the new data to arrive, which is controlled by the awaitEpoch method.

In fact, the commit method is triggered after each piece of data is processed, and the commit method writes the offset (offset) of the current processing to the commitLog.

If you look at logicalPlan, the logical plan for the input parameters in ContinuousExecution is of type StreamingRelationV2, which will be converted to LogicalPlan of type ContinuousExecutionRelation:

AnalyzedPlan.transform {

Case r @ StreamingRelationV2 (

Source: ContinuousReadSupport, _, extraReaderOptions, output, _) = >

ToExecutionRelationMap.getOrElseUpdate (r, {

ContinuousExecutionRelation (source, extraReaderOptions, output) (sparkSession)

})

}

There is also the addOffset method, which writes the current read offset to offsetLog after each read of the offset, so that you know where to start the next restore. AddOffset and commit together ensure the execution of Exactly-once semantics.

The above is all the contents of the article "sample Analysis of Spark sql streaming processing". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report