In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the example analysis of AQE configuration in spark 3.0.1, which is very detailed and has a certain reference value. Interested friends must read it!
Introduction to AQE
From spark configuration to the earliest version of spark 1.6, there was AQE; to version 2.x of spark, and the big data team of intel carried out corresponding prototype development and practice. In the era of spark 3.0, Databricks and intel together contributed new AQE to the community.
AQE configuration item defaults in spark 3.0.1 official description Analysis of whether spark.sql.adaptive.enabledfalse is enabled Adaptive query here is set to true whether spark.sql.adaptive.coalescePartitions.enabledtrue is enabled to merge adjacent shuffle partitions (according to the threshold of 'spark.sql.adaptive.advisoryPartitionSizeInBytes') true is enabled by default. For analysis, see: analyze the initial number of partitions before 1spark.sql.adaptive.coalescePartitions.initialPartitionNum (none) shuffle merges partitions Default value analysis of spark.sql.shuffle.partitions see: analyze the minimum number of partitions merged with 2spark.sql.adaptive.coalescePartitions.minPartitionNum (none) shuffle partitions. Default is the default parallelism of spark clusters. For analysis of the size of shuffle partitions recommended by 3spark.sql.adaptive.advisoryPartitionSizeInBytes64MB, use it when merging partitions and dealing with join data skew. See: analyze whether 3spark.sql.adaptive.skewJoin.enabledtrue enables adaptive processing of data skew in join.
Spark.sql.adaptive.skewJoin.skewedPartitionFactor5 data skew judgment factor must meet both skewedPartitionFactor and skewedPartitionThresholdInBytes analysis. See: analyze 4spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB data skew judgment threshold, must meet both skewedPartitionFactor and skewedPartitionThresholdInBytes analysis see: analyze 4spark.sql.adaptive.logLeveldebug configuration adaptive plan change log adjusted to info level, easy to observe adaptive plan change spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2 to broadcastJoin non-empty partition ratio threshold, > = this value Will not be converted to broadcastjoin analysis see: analysis 5 Analysis 1
In OptimizeSkewedJoin.scala, we see ADVISORY_PARTITION_SIZE_IN_BYTES, where spark.sql.adaptive.advisoryPartitionSizeInBytes is referenced, (OptimizeSkewedJoin is the rule in the physical plan)
/ * The goal of skew join optimization is to make the data distribution more even. The target size * to split skewed partitions is the average size of non-skewed partition, or the * advisory partition size if avg size is smaller than it. * / private def targetSize (sizes: Seq [Long], medianSize: Long): Long = {val advisorySize = conf.getConf (SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) val nonSkewSizes = sizes.filterNot (isSkewed (_, medianSize)) / / It's impossible that all the partitions are skewed, as we use median size to define skew Assert (nonSkewSizes.nonEmpty) math.max (advisorySize, nonSkewSizes.sum / nonSkewSizes.length)}
Where:
NonSkewSizes is the non-tilted partition of task
TargetSize returns max (average of non-slanted partitions, advisorySize), where advisorySize is spark.sql.adapve.advisoryPartitionSizeInBytes, so targetSize is not necessarily spark.sql.adaptive.advisoryPartitionSizeInBytes
The medianSize value is the median of the partition size of the task
Analysis 2
In SQLConf.scala
Def numShufflePartitions: Int = {if (adaptiveExecutionEnabled & & coalesceShufflePartitionsEnabled) {getConf (COALESCE_PARTITIONS_INITIAL_PARTITION_NUM) .getOrElse (defaultNumShufflePartitions)} else {defaultNumShufflePartitions}}
Starting from spark 3.0.1, if AQE and shuffle partition merging are enabled, spark.sql.adaptive.coalescePartitions.initialPartitionNum is used. If there are multiple shuffle stage, increasing the number of partitions can effectively enhance the effect of shuffle partition merging.
Analysis 3
In CoalesceShufflePartitions.scala,CoalesceShufflePartitions, it is a physical plan rule that performs the following actions
If (! shuffleStages.forall (_ .shuffle.canChangeNumPartitions)) {plan} else {/ / `ShuffleQueryStageExec# mapStats` returns None when the input RDD has 0 partitions, / / we should skip it when calculating the `partitionStartIndices`. Val validMetrics = shuffleStages.flatMap (_ .mapStats) / / We may have different pre-shuffle partition numbers, don't reduce shuffle partition number / / in that case. For example when we union fully aggregated data (data is arranged to a single / / partition) and a result of a SortMergeJoin (multiple partitions). Val distinctNumPreShufflePartitions = validMetrics.map (stats = > stats.bytesByPartitionId.length). Distinct if (validMetrics.nonEmpty & & distinctNumPreShufflePartitions.length = = 1) {/ / We fall back to Spark default parallelism if the minimum number of coalesced partitions / / is not set, so to avoid perf regressions compared to no coalescing. Val minPartitionNum = conf.getConf (SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) .getOrElse (session.sparkContext.defaultParallelism) val partitionSpecs = ShufflePartitionsUtil.coalescePartitions (validMetrics.toArray, advisoryTargetSize = conf.getConf (SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), minNumPartitions = minPartitionNum) / / This transformation adds new nodes, so we must use `transformUp` here. Val stageIds = shuffleStages.map (_ .id). ToSet plan.transformUp {/ / even for shuffle exchange whose input RDD has 0 partition, we should still update its / / `partitionStartIndices`, so that all the leaf shuffles in a stage have the same / / number of output partitions. Case stage: ShuffleQueryStageExec if stageIds.contains (stage.id) = > CustomShuffleReaderExec (stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)}} else {plan}}
In other words:
If it is a partition operation specified by the user, such as the repartition operation, spark.sql.adaptive.coalescePartitions.minPartitionNum is invalid and partition merge optimization is skipped
If multiple task perform shuffle and task has a different number of partitions, spark.sql.adaptive.coalescePartitions.minPartitionNum is invalid and partition merge optimization is skipped
See ShufflePartitionsUtil.coalescePartition analysis
Analysis 4
In OptimizeSkewedJoin.scala, we see
/ * * A partition is considered as a skewed partition if its size is larger than the median * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than * ADVISORY_PARTITION_SIZE_IN_BYTES. * / private def isSkewed (size: Long, medianSize: Long): Boolean = {size > medianSize * conf.getConf (SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) & & size > conf.getConf (SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)}
OptimizeSkewedJoin is the rule of a physical plan, which determines whether the data is skewed according to isSkewed, and it must satisfy SKEW_JOIN_SKEWED_PARTITION_FACTOR and SKEW_JOIN_SKEWED_PARTITION_THRESHOLD to determine whether the data is skewed.
MedianSize is the median partition size of task
Analysis 5
The reOptimize method is called in the AdaptiveSparkPlanExec method getFinalPhysicalPlan, while the reOptimize method performs the optimization of the logical plan:
Private def reOptimize (logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {logicalPlan.invalidateStatsCache () val optimized = optimizer.execute (logicalPlan) val sparkPlan = context.session.sessionState.planner.plan (ReturnAnswer (optimized)). Next () val newPlan = applyPhysicalRules (sparkPlan, preprocessingRules + + queryStagePreparationRules) (newPlan, optimized)}
And there is a DemoteBroadcastHashJoin rule in optimizer:
Transient privateval optimizer = new RuleExecutor [LogicalPlan] {/ / TODO add more optimization rules override protected def batches: Seq [Batch] = Seq (Batch ("DemoteBroadcastHashJoin", Once, DemoteBroadcastHashJoin (conf)}
As for DemoteBroadcastHashJoin, there is a judgment on whether it is broadcastjoin or not:
Case class DemoteBroadcastHashJoin (conf: SQLConf) extends Rule [LogicalPlan] {private def shouldDemote (plan: LogicalPlan): Boolean = plan match {case LogicalQueryStage (_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined & & stage.mapStats.isDefined = > val mapStats = stage.mapStats.get val partitionCnt = mapStats.bytesByPartitionId.length val nonZeroCnt = mapStats.bytesByPartitionId.count (_ > 0) partitionCnt > 0 & & nonZeroCnt > 0 & (nonZeroCnt * 1.0 / partitionCnt)
< conf.nonEmptyPartitionRatioForBroadcastJoin case _ =>False} def apply (plan: LogicalPlan): LogicalPlan = plan.transformDown {case j @ Join (left, right, _, _ Hint) = > var newHint = hint if (! hint.leftHint.exists (_ .gy.isDefined) & & shouldDemote (left)) {newHint = newHint.copy (leftHint = Some (hint.leftHint.getOrElse (HintInfo ()) .copy (strategy = Some (NO_BROADCAST_HASH)} if (! hint.rightHint.exists (_ .explorgy.isDefined) & shouldDemote (right)) {newHint = newHint. Copy (rightHint = Some (hint.rightHint.getOrElse (HintInfo ()) .copy (strategy = Some (NO_BROADCAST_HASH)} if (newHint.ne (hint)) {j.copy (hint = newHint)} else {j}}
ShouldDemote is the judgment of whether or not to conduct broadcastjoin:
First, it has to be the ShuffleQueryStageExec operation.
If the non-empty partition ratio column is greater than nonEmptyPartitionRatioForBroadcastJoin, that is, spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin, mergehashjoin will not be converted to broadcastJoin
This is easy to happen in sql before join in groupby scenarios.
ShufflePartitionsUtil.coalescePartition analysis (core code for merging partitions)
See coalescePartition as shown:
Def coalescePartitions (mapOutputStatistics: Array [MapOutputStatistics], advisoryTargetSize: Long, minNumPartitions: Int): Seq [ShufflePartitionSpec] = {/ / If `minNumPartitions` is very large, it is possible that we need to use a value less than / / `advisoryTargetSize` as the target size of a coalesced task. Val totalPostShuffleInputSize = mapOutputStatistics.map (_ .bytesByPartitionId.sum). Sum / / The max at here is to make sure that when we have an empty table, we only have a single / / coalesced partition. / / There is no particular reason that we pick 16. We just need a number to prevent / / `maxTargetSize` from being set to 0. Val maxTargetSize = math.max (math.ceil (totalPostShuffleInputSize / minNumPartitions.toDouble). ToLong, 16) val targetSize = math.min (maxTargetSize, advisoryTargetSize) val shuffleIds = mapOutputStatistics.map (_ .shuffleId) .mkString (",") logInfo (s "For shuffle ($shuffleIds), advisory target size: $advisoryTargetSize," + s "actual target size $targetSize.") / / Make sure these shuffles have the same number of partitions. Val distinctNumShufflePartitions = mapOutputStatistics.map (stats = > stats.bytesByPartitionId.length). Distinct / / The reason that we are expecting a single value of the number of shuffle partitions / / is that when we add Exchanges, we set the number of shuffle partitions / / (i.e. Map output partitions) using a static setting, which is the value of / / `spark.sql.shuffle.partitions`. Even if two input RDDs are having different / / number of partitions, they will have the same number of shuffle partitions / / (i.e. Map output partitions). Assert (distinctNumShufflePartitions.length = = 1, "There should be only one distinct value of the number of shuffle partitions" + "among registered Exchange operators.") Val numPartitions = distinctNumShufflePartitions.head val partitionSpecs = ArrayBuffer [CoalescedPartitionSpec] () var latestSplitPoint = 0 var coalescedSize = 0L var I = 0 while (I
< numPartitions) { // We calculate the total size of i-th shuffle partitions from all shuffles. var totalSizeOfCurrentPartition = 0L var j = 0 while (j < mapOutputStatistics.length) { totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i) j += 1 } // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a // new coalesced partition. if (i >LatestSplitPoint & & coalescedSize + totalSizeOfCurrentPartition > targetSize) {partitionSpecs + = CoalescedPartitionSpec (latestSplitPoint, I) latestSplitPoint = I / / reset postShuffleInputSize. CoalescedSize = totalSizeOfCurrentPartition} else {coalescedSize + = totalSizeOfCurrentPartition} I + = 1} partitionSpecs + = CoalescedPartitionSpec (latestSplitPoint, numPartitions) partitionSpecs}
TotalPostShuffleInputSize first calculates the data size of the total shuffle
MaxTargetSize takes the maximum value of max (totalPostShuffleInputSize/minNumPartitions,16), and minNumPartitions is the value of spark.sql.adaptive.coalescePartitions.minPartitionNum.
TargetSize takes min (maxTargetSize,advisoryTargetSize), and advisoryTargetSize is the value of spark.sql.adaptive.advisoryPartitionSizeInBytes, so this value is only a suggested value, not necessarily targetSize.
The while loop is to merge adjacent partitions, for each adjacent partition in each task, until it is no more than targetSize
OptimizeSkewedJoin.optimizeSkewJoin analysis (core code for data skew optimization)
See optimizeSkewJoin as shown:
Def optimizeSkewJoin (plan: SparkPlan): SparkPlan = plan.transformUp {case smj @ SortMergeJoinExec (_, _, joinType, _, S1 @ SortExec (_, _, ShuffleStage (left: ShuffleStageInfo), _), S2 @ SortExec (_, _, ShuffleStage (right: ShuffleStageInfo), _) _) if supportedJoinTypes.contains (joinType) = > assert (left.partitionsWithSizes.length = = right.partitionsWithSizes.length) val numPartitions = left.partitionsWithSizes.length / / Use the median size of the actual (coalesced) partition sizes to detect skewed partitions. Val leftMedSize = medianSize (left.partitionsWithSizes.map (_. _ 2) val rightMedSize = medianSize (right.partitionsWithSizes.map (_. _ 2)) logDebug (s "" | Optimizing skewed join. | | Left side partitions size info: | ${getSizeInfo (leftMedSize, left.partitionsWithSizes.map (_. _ 2))} | Right side partitions size info: | ${getSizeInfo (rightMedSize, right.partitionsWithSizes.map (_. _ 2))} "" .stripMargin) val canSplitLeft = canSplitLeftSide (joinType) val canSplitRight = canSplitRightSide (joinType) / / We use the actual partition sizes (may be coalesced) to calculate target size | So that / / the final data distribution is even (coalesced partitions + split partitions). Val leftActualSizes = left.partitionsWithSizes.map (_ 2) val rightActualSizes = right.partitionsWithSizes.map (_. _ 2) val leftTargetSize = targetSize (leftActualSizes, leftMedSize) val rightTargetSize = targetSize (rightActualSizes RightMedSize) val leftSidePartitions = mutable.ArrayBuffer.empty [ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty [ShufflePartitionSpec] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc for (partitionIndex true case p if! p.requiredChildDistribution.forall (_ = = UnspecifiedDistribution) = > true case p = > p.expressions.exists (_ .find {case _: SubqueryExpression = > true case _ = > false} .isDefined)} .isDefined } private def supportAdaptive (plan: SparkPlan): Boolean = {/ / TODO migrate dynamic-partition-pruning onto adaptive execution. SanityCheck (plan) & &! plan.logicalLink.exists (_ .isstreaming) & &! plan.expressions.exists (_ .find (_ .isInstanceOf [DynamicPruningSubquery]) .isDefined) & & plan.children.forall (supportAdaptive)}
AQE will not be enabled if the above conditions are not met. If you want to enable it forcefully, you can also configure spark.sql.adaptive.forceApply to true (internal configuration is indicated in the document)
Note:
The following configuration has been deprecated in spark 3.0.1:
Spark.sql.adaptive.skewedPartitionMaxSplits spark.sql.adaptive.skewedPartitionRowCountThreshold spark.sql.adaptive.skewedPartitionSizeThreshold above is all the content of the article "sample Analysis of AQE configuration in spark 3.0.1". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.