Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

An example Analysis of the Fault-tolerant Mechanism of Spark structured flow processing Mechanism

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article shares with you the content of an example analysis of the fault-tolerant mechanism of the Spark structured flow processing mechanism. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

Fault-tolerant mechanism

End-to-end guarantee and only one guarantee is one of the key goals of structured flow design.

Structured flow designs Structured Streaming sources,sinks, etc., to track the exact progress of processing and have it restart or rerun to handle any failure

Streaming source is an offset (offsets) similar to kafka to track the read position of the stream. The execution engine uses checkpoints (checkpoint) and pre-written logs (write ahead logs) to record the offset range values for each execution

Streaming sinks is designed to ensure the idempotency of processing.

In this way, depending on the playable data source (streaming source) and the processing idempotent (streaming sinks), the structure flow is guaranteed end-to-end and only once under any failure.

Val lines = spark.readStream .format ("socket") .option ("host", "localhost") .option ("port", 9999) .load () / / Split the lines into wordsval words = lines.as [string] .flatMap (_ .split (")) / / Generate running word countval wordCounts = words.groupBy (" value "). Count ()

Where spark is SparkSession,lines and DataFrame,DataFrame is Dataset [Row].

DataSet

Take a look at the code implementation of Dataset's trigger factor, such as the foreach operation:

Def foreach (f: t = > Unit): Unit = withNewRDDExecutionId {rdd.foreach (f)} private def withNewRDDExecutionId [U] (body: = > U): U = {SQLExecution.withNewExecutionId (sparkSession, rddQueryExecution) {rddQueryExecution.executedPlan.foreach {plan = > plan.resetMetrics ()} body}}

Go on to see:

Def withNewExecutionId [T] (sparkSession: SparkSession, queryExecution: QueryExecution, name: Option [String] = None) (body: = > T): t = {val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty (EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty (EXECUTION_ID_KEY, executionId.toString) executionIdToQueryExecution.put (executionId QueryExecution) try {withSQLConfPropagated (sparkSession) {try {body} catch {} finally {}} finally {executionIdToQueryExecution.remove (executionId) sc.setLocalProperty (EXECUTION_ID_KEY, oldExecutionId)}}

The real code to execute is queryExecution: QueryExecution.

@ transient private lazy val rddQueryExecution: QueryExecution = {val deserialized = CatalystSerde.deserialize [T] (logicalPlan) sparkSession.sessionState.executePlan (deserialized)}

Yes, it was sessionState.executePlan who executed logicalPlan and got QueryExecution.

The sessionState.executePlan here actually creates a QueryExecution object. Then execute the executedPlan method of QueryExecution to get the physical plan of SparkPlan. How did it come into being?

Lazy val sparkPlan: SparkPlan = tracker.measurePhase (QueryPlanningTracker.PLANNING) {SparkSession.setActiveSession (sparkSession) planner.plan (ReturnAnswer (optimizedPlan.clone ()). Next ()}

Generated by the planner.plan method.

Planner is SparkPlanner. Defined in the BaseSessionStateBuilder class.

Protected def planner: SparkPlanner = {new SparkPlanner (session.sparkContext, conf, experimentalMethods) {override def extraPlanningStrategies: Seq [Strategy] = super.extraPlanningStrategies + + customPlanningStrategies}}

SparkPlanner class

SparkPlanner executes various policies on LogicalPlan and returns the corresponding SparkPlan. For example, for streaming applications, there is a strategy: DataSourceV2Strategy.

The typical mapping of several logical plans to physical plans is as follows:

StreamingDataSourceV2Relation- "ContinuousScanExec

StreamingDataSourceV2Relation- "MicroBatchScanExec

The former corresponds to the case where there is no endOffset for Offset, and the latter corresponds to the case with endOffset. The former is a continuous flow with no end, and the latter is a micro-batch flow with an interval.

The delay of the former can reach 1ms, and the delay of the latter can only reach 100ms.

[code]:

Case r: StreamingDataSourceV2Relation if r.startOffset.isDefined & & r.endOffset.isDefined = > val microBatchStream = r.stream.asInstanceOf [MicroBatchStream] val scanExec = MicroBatchScanExec (r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get) val withProjection = if (scanExec.supportsColumnar) {scanExec} else {/ / Add a Project here to make sure we produce unsafe rows. ProjectExec (r.output, scanExec)} withProjection:: Nil case r: StreamingDataSourceV2Relation if r.startOffset.isDefined & r.endOffset.isEmpty = > val continuousStream = r.stream.asInstanceOf [ContinuousStream] val scanExec = ContinuousScanExec (r.output, r.scan, continuousStream, r.startOffset.get) val withProjection = if (scanExec.supportsColumnar) {scanExec} else {/ / Add a Project here to make sure we produce unsafe rows. ProjectExec (r.output, scanExec)} withProjection:: Nil Thank you for reading! This is the end of the article on "example analysis of fault-tolerant mechanism of Spark structured flow processing mechanism". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it out for more people to see!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report