In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article shares with you the content of an example analysis of the fault-tolerant mechanism of the Spark structured flow processing mechanism. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
Fault-tolerant mechanism
End-to-end guarantee and only one guarantee is one of the key goals of structured flow design.
Structured flow designs Structured Streaming sources,sinks, etc., to track the exact progress of processing and have it restart or rerun to handle any failure
Streaming source is an offset (offsets) similar to kafka to track the read position of the stream. The execution engine uses checkpoints (checkpoint) and pre-written logs (write ahead logs) to record the offset range values for each execution
Streaming sinks is designed to ensure the idempotency of processing.
In this way, depending on the playable data source (streaming source) and the processing idempotent (streaming sinks), the structure flow is guaranteed end-to-end and only once under any failure.
Val lines = spark.readStream .format ("socket") .option ("host", "localhost") .option ("port", 9999) .load () / / Split the lines into wordsval words = lines.as [string] .flatMap (_ .split (")) / / Generate running word countval wordCounts = words.groupBy (" value "). Count ()
Where spark is SparkSession,lines and DataFrame,DataFrame is Dataset [Row].
DataSet
Take a look at the code implementation of Dataset's trigger factor, such as the foreach operation:
Def foreach (f: t = > Unit): Unit = withNewRDDExecutionId {rdd.foreach (f)} private def withNewRDDExecutionId [U] (body: = > U): U = {SQLExecution.withNewExecutionId (sparkSession, rddQueryExecution) {rddQueryExecution.executedPlan.foreach {plan = > plan.resetMetrics ()} body}}
Go on to see:
Def withNewExecutionId [T] (sparkSession: SparkSession, queryExecution: QueryExecution, name: Option [String] = None) (body: = > T): t = {val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty (EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty (EXECUTION_ID_KEY, executionId.toString) executionIdToQueryExecution.put (executionId QueryExecution) try {withSQLConfPropagated (sparkSession) {try {body} catch {} finally {}} finally {executionIdToQueryExecution.remove (executionId) sc.setLocalProperty (EXECUTION_ID_KEY, oldExecutionId)}}
The real code to execute is queryExecution: QueryExecution.
@ transient private lazy val rddQueryExecution: QueryExecution = {val deserialized = CatalystSerde.deserialize [T] (logicalPlan) sparkSession.sessionState.executePlan (deserialized)}
Yes, it was sessionState.executePlan who executed logicalPlan and got QueryExecution.
The sessionState.executePlan here actually creates a QueryExecution object. Then execute the executedPlan method of QueryExecution to get the physical plan of SparkPlan. How did it come into being?
Lazy val sparkPlan: SparkPlan = tracker.measurePhase (QueryPlanningTracker.PLANNING) {SparkSession.setActiveSession (sparkSession) planner.plan (ReturnAnswer (optimizedPlan.clone ()). Next ()}
Generated by the planner.plan method.
Planner is SparkPlanner. Defined in the BaseSessionStateBuilder class.
Protected def planner: SparkPlanner = {new SparkPlanner (session.sparkContext, conf, experimentalMethods) {override def extraPlanningStrategies: Seq [Strategy] = super.extraPlanningStrategies + + customPlanningStrategies}}
SparkPlanner class
SparkPlanner executes various policies on LogicalPlan and returns the corresponding SparkPlan. For example, for streaming applications, there is a strategy: DataSourceV2Strategy.
The typical mapping of several logical plans to physical plans is as follows:
StreamingDataSourceV2Relation- "ContinuousScanExec
StreamingDataSourceV2Relation- "MicroBatchScanExec
The former corresponds to the case where there is no endOffset for Offset, and the latter corresponds to the case with endOffset. The former is a continuous flow with no end, and the latter is a micro-batch flow with an interval.
The delay of the former can reach 1ms, and the delay of the latter can only reach 100ms.
[code]:
Case r: StreamingDataSourceV2Relation if r.startOffset.isDefined & & r.endOffset.isDefined = > val microBatchStream = r.stream.asInstanceOf [MicroBatchStream] val scanExec = MicroBatchScanExec (r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get) val withProjection = if (scanExec.supportsColumnar) {scanExec} else {/ / Add a Project here to make sure we produce unsafe rows. ProjectExec (r.output, scanExec)} withProjection:: Nil case r: StreamingDataSourceV2Relation if r.startOffset.isDefined & r.endOffset.isEmpty = > val continuousStream = r.stream.asInstanceOf [ContinuousStream] val scanExec = ContinuousScanExec (r.output, r.scan, continuousStream, r.startOffset.get) val withProjection = if (scanExec.supportsColumnar) {scanExec} else {/ / Add a Project here to make sure we produce unsafe rows. ProjectExec (r.output, scanExec)} withProjection:: Nil Thank you for reading! This is the end of the article on "example analysis of fault-tolerant mechanism of Spark structured flow processing mechanism". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it out for more people to see!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.