In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces Spark 3.0 AQE and CBO example analysis, the content is very detailed, interested friends can refer to, hope to be helpful to you.
Spark3.0 has been released for half a year, and the upgrade of this large version is mainly focused on performance optimization and rich documentation. 46% of the optimizations are focused on Spark SQL, and Adaptive Query Execution is the most eye-catching one in SQL optimization.
Adaptive Query Execution (AQE) is an adaptive execution engine improved and implemented by Intel big data technical team and Baidu big data Infrastructure Department engineers on the basis of Spark community version. In recent years, Spark SQL has been optimizing for CBO features, and has been very successful.
Basic principles of CBO
First of all, let's introduce another optimizer based on rule optimization (Rule-Based Optimization, referred to as RBO), which is an empirical and heuristic optimization idea. The optimization rules have been defined in advance, and we only need to apply SQL to these rules. To put it simply, RBO is like an experienced driver who knows all the basic routines.
However, there is a kind of thing in the world called-not to follow the routine. It is not so much that it does not follow the routine, but rather that it has no routine of its own. The most typical is complex Join operator optimization. For these Join, there are usually two multiple choice questions to do:
Which algorithm strategy should Join choose to execute? BroadcastJoin or ShuffleHashJoin or SortMergeJoin? Different execution strategies require different resources of the system, and the execution efficiency is also very different. With the same SQL, it may only take a few seconds to choose the appropriate policy execution, but if you do not choose the appropriate execution strategy, it may lead to the system OOM.
For snowflake model or star model, what sequence should multi-table Join choose to execute? Different Join order means different execution efficiency, for example, A join B join C _ Magi An and B tables are very large, C table is very small, then A join B obviously needs a lot of system resources to operate, and the execution time must not be short. If you use the execution order of A join C join B, because the C table is very small, A join C will get the results quickly, and the result set is very small, and then using the small result set join B, the performance will obviously be better than the previous scheme.
Think about it, are there any fixed optimization rules? No. To put it bluntly, you need to know more basic information about the table (table size, total number of table records, etc.), and then pass a certain rule cost assessment in order to choose an optimal execution plan. Therefore, CBO means to optimize the strategy based on cost, which needs to calculate the cost of all possible execution plans and pick out the least expensive execution plan.
AQE adjusts and optimizes the overall implementation process of Spark SQL. Its biggest highlight is that it can constantly feedback and re-optimize the rest of the implementation plan according to the real and accurate implementation statistical results of the completed plan nodes.
CBO is so difficult to implement, how can Spark solve it?
CBO accounting calculates some statistics related to business data to optimize the query, such as the number of rows, the number of rows removed, null, maximum and minimum, and so on. Spark will automatically select BHJ or SMJ based on these data, and optimize the execution plan for Cost-based Join Reorder in multi-Join scenarios.
However, because these statistics need to be pre-processed and will be out of date, we are judging with outdated data, which in some cases will turn into a negative effect and reduce the efficiency of SQL implementation.
Spark3.0 's AQE framework uses three ways to solve this problem:
Dynamic merging of shuffle partitions (Dynamically coalescing shuffle partitions)
Dynamically adjust Join policy (Dynamically switching join strategies)
Dynamically optimize data skew Join (Dynamically optimizing skew joins)
Let's take a closer look at these three features.
Dynamically merge partitions of shuffle
When the magnitude of data we are dealing with is very large, shuffle is generally the most performance-impacting. Because shuffle is a very time-consuming operator, it needs to move data through the network and distribute it to downstream operators. In shuffle, the number of partition is critical. The optimal number of partition depends on the data, and the data size varies greatly from query to stage, so it is difficult to determine a specific number:
If there is too little partition, there will be too much data per partition, which may cause a large amount of data to fall to disk, thus slowing down the query.
If there is too much partition, the amount of data per partition will be small, which will incur a lot of additional network overhead and affect the Spark task scheduler, thus slowing down the query.
To solve this problem, we set a relatively large number of shuffle partition at the beginning, merging adjacent small partitions through the data of the shuffle file during execution. For example, suppose we execute SELECT max (I) FROM tbl GROUP BY j, the table tbl has only two partition and the amount of data is very small. We set the initial shuffle partition to 5, so there will be five partitions after grouping. Without AQE optimization, five tasks will be generated to aggregate the result. in fact, the amount of data of three partitions is very small.
In this case, however, AQE will only generate three reduce task.
Dynamically switch join policy
Spark supports many Join strategies, of which broadcast hash join is usually the best performance, as long as the data from a table participating in join can be loaded into memory. For this reason, when Spark estimates that the amount of table data participating in join is less than the threshold for broadcast size, it adjusts the Join policy to broadcast hash join. However, many cases can lead to this size estimation error-for example, there is a very selective filter.
Because AQE has accurate upstream statistics, it can solve this problem. For example, in the following example, the actual size of the table on the right is 15m, but in this scenario, after filter filtering, the size of the data actually participating in join is 8m, which is less than the default broadcast threshold of 10m and should be broadcast.
While we convert to BHJ during our execution, we can even optimize the traditional shuffle to local shuffle (for example, shuffle reads in mapper rather than based on reducer) to reduce network overhead.
Dynamically optimize data tilt
If there is a data skew problem with a key in Join, it is basically the performance killer of this task. Before AQE, users could not automatically deal with this thorny problem encountered in Join, so they needed to use external manual collection of data statistics, additional salt, batch processing and other relatively tedious methods to deal with the data skew problem.
Data skew is essentially caused by the uneven distribution of data among partitions on the cluster, which slows down the entire query in the join scenario. AQE automatically detects skew data according to shuffle file statistics, breaks those skewed partitions into small sub-partitions, and then join them separately.
We can look at this scenario, Table A join Table B, where the partition A0 data of Table An is much larger than that of other partitions.
AQE splits partition A0 into 2 subpartitions and leaves them alone to join with partition B0 of Table B.
Without this optimization, SMJ will produce four tasks and one of them takes much longer to execute than the others. Optimized, this join will have five tasks, but each task takes about the same time to execute, so the entire query brings better performance.
How to turn on AQE
We can set the parameter spark.sql.adaptive.enabled to true to enable AQE. In Spark 3.0, the default is false, and the following conditions are met:
Non-streaming query
Contains at least one exchange (such as join, aggregation, window operator) or one subquery
By reducing the dependence on static statistical data, AQE successfully solves a difficult trade off of Spark CBO (the cost of generating statistical data and query time) and the problem of data accuracy. Compared with the previous limited CBO, it is now very flexible.
Spark CBO source code implementation
The Adaptive Execution pattern is generated using Spark physical execution plan injection. There is a set of preparations optimizers in the QueryExecution class to optimize the physical execution plan, and InsertAdaptiveSparkPlan is the first optimizer.
After InsertAdaptiveSparkPlan uses PlanAdaptiveSubqueries Rule to process part of the SubQuery, it wraps the current Plan as AdaptiveSparkPlanExec.
When the collect () or take () methods of AdaptiveSparkPlanExec are executed, the getFinalPhysicalPlan () method is executed first to generate a new SparkPlan, and then the corresponding SparkPlan corresponding methods are executed.
/ / QueryExecution class lazy val executedPlan: SparkPlan = {executePhase (QueryPlanningTracker.PLANNING) {QueryExecution.prepareForExecution (preparations, sparkPlan.clone ())} protected def preparations: Seq [sparkSession [this]] = {QueryExecution.preparations (sparkSession, Option (AdaptiveExecutionContext (sparkSession, this)} private [execution] def preparations (sparkSession: SparkSession, adaptiveExecutionRule: Option [InsertAdaptiveSparkPlan] = None): Seq [InsertAdaptiveSparkPlan Plan] = {/ / `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op / / as the original plan is hidden behind `AdaptiveSparkPlanExec`. AdaptiveExecutionRule.toSeq + + Seq (PlanDynamicPruningFilters (sparkSession), PlanSubqueries (sparkSession), EnsureRequirements (sparkSession.sessionState.conf), ApplyColumnarRulesAndInsertTransitions (sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), CollapseCodegenStages (sparkSession.sessionState.conf), ReuseExchange (sparkSession.sessionState.conf), ReuseSubquery (sparkSession.sessionState.conf)} / / InsertAdaptiveSparkPlan override def apply (plan: SparkPlan): SparkPlan = applyInternal (plan, false) private def applyInternal (plan: SparkPlan) IsSubquery: Boolean): SparkPlan = plan match {/ /... some checking case _ if shouldApplyAQE (plan, isSubquery) = > if (supportAdaptive (plan)) {try {/ / Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. / / Fall back to non-AQE mode if AQE is not supported in any of the sub-queries. Val subqueryMap = buildSubqueryMap (plan) val planSubqueriesRule = PlanAdaptiveSubqueries (subqueryMap) val preprocessingRules = Seq (planSubqueriesRule) / / Run pre-processing rules. Val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules (plan, preprocessingRules) logDebug (s "Adaptive execution enabled for plan: $plan") AdaptiveSparkPlanExec (newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)} catch {case SubqueryAdaptiveNotSupportedException (subquery) = > logWarning (s "${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled" + s "but is not supported for sub-query: $subquery.") Plan}} else {logWarning (s "${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled" + s "but is not supported for query: $plan.") Plan} case _ = > plan}
The process of AQE's phased submission and optimization of Stage is as follows:
Private def getFinalPhysicalPlan (): SparkPlan = lock.synchronized {/ / false when the getFinalPhysicalPlan method is called for the first time. Wait for the method to be executed, and all Stage will not be changed. Return the final plan if (isFinalPlan) return currentPhysicalPlan / / In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., / `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be / / created in the middle of the execution. Context.session.withActive {val executionId = getExecutionId var currentLogicalPlan = currentPhysicalPlan.logicalLink.get var result = createQueryStages (currentPhysicalPlan) val events = new LinkedBlockingQueue [StageMaterializationEvent] () val errors = new mutable.ArrayBuffer [Throwable] () var stagesToReplace = Seq.empty [QueryStageExec] while (! result.allChildStagesMaterialized) {currentPhysicalPlan = result.newPlan / / which Stage to execute next Refer to the createQueryStages (plan: SparkPlan) method if (result.newStages.nonEmpty) {stagesToReplace = result.newStages + + stagesToReplace / / onUpdatePlan to update UI executionId.foreach through listener (onUpdatePlan (_) Result.newStages.map (_ .plan)) / / Start materialization of all new stages and fail fast if any stages failed eagerly result.newStages.foreach {stage = > try {/ / materialize () method executes Stage as a separate Job submission And return SimpleFutureAction to receive the execution result / / QueryStageExec: materialize ()-> doMaterialize ()-> / / ShuffleExchangeExec:-> mapOutputStatisticsFuture-> ShuffleExchangeExec / / SparkContext:-> submitMapStage (shuffleDependency) stage.materialize (). OnComplete {res = > if (res.isSuccess) {events.offer (StageSuccess (stage) Res.get)} else {events.offer (StageFailure (stage, res.failed.get))}} (AdaptiveSparkPlanExec.executionContext)} catch {case e: Throwable = > cleanUpAndThrowException (Seq (e)) Some (stage.id)} / / Wait on the next completed stage, which indicates new stats are available and probably / / new stages can be created. There might be other stages that finish at around the same / / time, so we process those stages too in order to reduce re-planning. / / wait Until Stage finishes execution val nextMsg = events.take () val rem = new util.ArrayList [StageMaterializationEvent] () events.drainTo (rem) (Seq (nextMsg) + + rem.asScala). Foreach {case StageSuccess (stage, res) = > stage.resultOption = Some (res) case StageFailure (stage, ex) = > errors.append (ex)} / / In case of errors We cancel all running stages and throw exception. If (errors.nonEmpty) {cleanUpAndThrowException (errors, None)} / / Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less / / than that of the current plan; otherwise keep the current physical plan together with / / the current logical plan since the physical plan's logical links point to the logical / / plan it has originated from. / / Meanwhile, we keep a list of the query stages that have been created since last plan / / update, which stands for the "semantic gap" between the current logical and physical / / plans. And each time before re-planning, we replace the corresponding nodes in the / / current logical plan with logical query stages to make it semantically in sync with / / the current physical plan. Once a new plan is adopted and both logical and physical / / plans are updated, we can clear the query stage list because at this point the two plans / / are semantically and physically in sync again. / / replace the previous Stage with LogicalQueryStage nodes val logicalPlan = replaceWithQueryStagesInLogicalPlan (currentLogicalPlan, stagesToReplace) / / call optimizer and planner again to optimize val (newPhysicalPlan, newLogicalPlan) = reOptimize (logicalPlan) val origCost = costEvaluator.evaluateCost (currentPhysicalPlan) val newCost = costEvaluator.evaluateCost (newPhysicalPlan) if (newCost
< origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan") cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan stagesToReplace = Seq.empty[QueryStageExec] } // Now that some stages have finished, we can try creating new stages. // 进入下一轮循环,如果存在Stage执行完毕, 对应的resultOption 会有值,对应的allChildStagesMaterialized 属性 = true result = createQueryStages(currentPhysicalPlan) } // Run the final plan when there's no more unfinished stages. // 所有前置stage全部执行完毕,根据stats信息优化物理执行计划,确定最终的 physical plan currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) currentPhysicalPlan } }// SparkContext /** * Submit a map stage for execution. This is currently an internal API only, but might be * promoted to DeveloperApi in the future. */ private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C]) : SimpleFutureAction[MapOutputStatistics] = { assertNotStopped() val callSite = getCallSite() var result: MapOutputStatistics = null val waiter = dagScheduler.submitMapStage( dependency, (r: MapOutputStatistics) =>{result = r}, callSite, localProperties.get) new SimpleFutureAction [MapOutputStatistics] (waiter, result)} / / DAGScheduler def submitMapStage [K, V, C] (dependency: ShuffleDependency [K, V, C], callback: MapOutputStatistics = > Unit, callSite: CallSite Properties: Properties): JobWaiter [MapOutputStatistics] = {val rdd = dependency.rdd val jobId = nextJobId.getAndIncrement () if (rdd.partitions.length = = 0) {throw new SparkException ("Can't run submitMapStage on RDD with 0 partitions")} / / We create a JobWaiter with only one "task", which will be marked as complete when the whole / / map stage has completed, and will be passed the MapOutputStatistics for that stage. / / This makes it easier to avoid race conditions between the user code and the map output / / tracker that might result if we told the user the stage had finished, but then they queries / / the map output tracker and some node failures had caused the output statistics to be lost. Val waiter = new JobWaiter [MapOutputStatistics] (this, jobId, 1, (_: Int, r: MapOutputStatistics) = > callback (r) eventProcessLoop.post (MapStageSubmitted (jobId, dependency, callSite, waiter, Utils.cloneProperties (properties)) waiter}
Currently, the list of optimizers performed on physics in AdaptiveSparkPlanExec is as follows:
/ / AdaptiveSparkPlanExec @ transient privateval queryStageOptimizerRules: Seq [run [context.subqueryCache]] = Seq (ReuseAdaptiveSubquery (conf, context.subqueryCache), CoalesceShufflePartitions (context.session), / / The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' / / added by `CoalesceShufflePartitions`. So they must be executed after it. OptimizeSkewedJoin (conf), OptimizeLocalShuffleReader (conf), ApplyColumnarRulesAndInsertTransitions (conf, context.session.sessionState.columnarRules), CollapseCodegenStages (conf))
The OptimizeSkewedJoin method is optimized for the Join that is most prone to data skew:
In AQE mode, before the execution of each Stage, the pre-dependent Stage has been fully executed, so you can get the stats information of each Stage. When it is found that the output of shuffle partition is more than 5 times of the median of partition size, and the output of partition is more than 256m, it will be judged that the data will tilt, and the partition data will be divided into N parts according to targetSize. TargetSize = max (64m, average size of non-data skew partition).
The shuffle before optimization is as follows:
Optimized shuffle:
Application and practice of Spark3.0AQE in FreeWheel
Through efficient agile development, the FreeWheel team successfully launched in the production environment before the 2020 Christmas advertising season, with an overall performance improvement of up to 40% (for large batch), with an average AWS Cost savings of between 25% and 30%, saving the company at least a million dollars a year.
Major upgrade changes
Open the new features of Spark 3.0 AQE. The main configurations are as follows:
"spark.sql.adaptive.enabled": true, "spark.sql.adaptive.coalescePartitions.enabled": true, "spark.sql.adaptive.coalescePartitions.minPartitionNum": 1, "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"
It is important to note that the AQE feature only does not specify the number of reducer during the reducer phase, but it does not mean that you no longer need to specify the degree of parallelism of the task. Because the map phase still needs to divide the data into appropriate partitions for processing, the default 200 will be used if the parallelism is not specified, and OOM can easily occur when the amount of data is too large. It is recommended that you configure the parameters spark.sql.shuffle.partitions and spark.default.parallelism according to the parallelism setting before the task.
Let's take a closer look at why upgrading to 3.0 can reduce uptime and save the cost of the cluster. Take the operation of a table in Optimus data modeling as an example:
In the reduce phase, the number of tasks without AQE plummeted from 40320 to 4580 tasks, an order of magnitude reduction.
The bottom half of the figure shows the task of Spark 2.x without AQE, and the top half of Spark 3.x with the AQE feature turned on.
From a more detailed run-time diagram, the operation time of the same aggregate after shuffler reader is also from 4.44h to 2.56h, saving nearly half.
On the left is the details of the operation metrics of spark 2.x, and on the right is the operation metrics after opening AQE and passing custom shuffler reader.
Performance improvement
AQE performance
AQE adjusts and optimizes the overall Spark SQL execution process (as shown in the following figure). Its biggest highlight is that it can constantly feedback and re-optimize the rest of the execution plan according to the real and accurate implementation statistical results of the completed plan nodes.
AQE automatically adjusts the number of reducer to reduce the number of partition. The parallelism of Spark tasks has always been a problem for users. If the parallelism is too high, it will lead to too much task, relatively large overhead, and slow down the running of the task as a whole. If the parallelism is too small, the data partition will be relatively large, it is easy to have the problem of OOM, and the resources can not be used reasonably, and the advantage of parallel running tasks can not be brought into full play.
And because of the parallelism of the entire Spark Context task, it needs to be set at the beginning and cannot be dynamically modified, so it is easy to have a large amount of data at the beginning of the task that requires a large degree of parallelism, and the final data set that may be filtered through transformation in the process of running has become very small, and the number of partitions initially set appears to be too large. AQE can solve this problem very well. When reducer reads data, it automatically adjusts and Coalesce small partition according to the size of partition data (spark.sql.adaptive.advisoryPartitionSizeInBytes) set by users, adaptively reducing the number of partition, so as to reduce the waste of resources and overhead, and improve the performance of tasks.
As can be seen from the above single table, opening AQE greatly reduces the number of task, which not only reduces the burden of Driver, but also reduces the overhead brought by starting task, such as schedule,memory, startup management, etc., reduces the occupation of cpu, and improves the performance of cpu.
Take historical Data Pipelines as an example, at the same time, there will be more than 30 tables running in Spark in parallel, and each table has a great performance improvement, so that other tables can obtain resources earlier and benefit each other, then the whole data modeling process will naturally have an accelerated result.
Large batch (> 200G) has a larger improvement than small batch (< 100G), up to 40%. This is mainly because the large batch itself has a large amount of data, requires a large number of machines, and sets a greater degree of concurrency, so the AQE shows more and more obvious moments. While the concurrency of the small batch is relatively low, then the promotion will be relatively less, but there is also an acceleration of about 27.5%.
Memory optimization
In addition to reducing the consumption of broken task to memory due to the opening of AQE, Spark 3.0 has also made a lot of memory optimizations in other places, such as slimming down some Aggregate metrics, Netty's shared memory Pool function, Task Manager deadlock problem, avoiding reading shuffle block from the network in some scenarios, and so on, to reduce memory pressure. A series of memory optimizations combined with AQE features can be seen from the previous memory practice chart that the memory usage of the cluster has decreased by about 30% at the same time.
Practical achievements
The main practical results of the upgrade are as follows:
Performance improvement is obvious.
Historical data Pipeline improves the performance of large batch data (200~400G/ hourly) by up to 40%. For small batch (less than 100G/ hourly), the improvement effect is not as obvious as that of big batch. The average daily batches improvement level is about 27.5%.
Forecast data performance improves by an average of 30%. Due to the different data input sources, there are currently two pipelines running history and prediction data, and the number of tables generated is not quite the same, so they are evaluated separately.
Taking the end-to-end running time of historical data as an example (as shown in the following figure), it is visible to the naked eye that the running time of the overall pipeline has decreased significantly after it has been launched, and the data can be output faster for downstream use.
Reduced cluster memory usage
Cluster memory usage is reduced by about 30% for large batch, with an average daily saving of about 25%.
Taking a screenshot of the memory of the runtime cluster on ganglia after the launch of historical data as an example (as shown in the following figure), the memory usage of the overall cluster is reduced from 41.2T to 30.1T, which means we can run the same Spark task with fewer machines and less money.
AWS Cost decrease
Pipelines makes an automatic Scale In/Scale Out strategy: expand the Task nodes of the cluster when resources are needed, automatically shrink the Task nodes of the cluster at the end of the task, and learn the best number of machines through algorithm according to the size of each batch data. After upgrading to Spark 3.0, because tasks now run faster and require fewer machines, AWS Cost saves about 30% per day after launch, which can save the company millions of costs a year.
This is the end of the sample analysis of Spark 3.0 AQE and CBO. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.