Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How Spark 3.0 improves the performance of SQL workloads

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Spark 3.0 how to improve the performance of SQL workload, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

AQE was originally introduced in Spark 2.4, but with the development of Spark 3.0, it has become more powerful. Although Cloudera recommends waiting for it to be used in production before we deliver Spark 3.1, you can now use AQE to start the evaluation in Spark 3.0.

First, let's take a look at the types of problems that AQE solves.

Defects in the design of initial catalyst

The following figure shows the type of distributed processing that occurs when a simple grouped query is performed using DataFrames.

Spark determines the appropriate number of partitions for the first stage, but for the second phase, the default magic number of200 is used.

There are three reasons why it is bad:

200 cannot be the ideal number of partitions, and the number of partitions is one of the key factors affecting performance

If you write the output of the second phase to disk, you may get 200 small files.

Optimizations and their deficiencies have a knock-on effect: if you continue after the second phase, you may miss out on potential opportunities for more optimizations.

What you can do is to manually set the value of this property for this shuffle before executing a query similar to the following statement:

Spark.conf.set ("spark.sql.shuffle.partitions", "2")

This also presents some challenges:

Set this property before each query

These values will become obsolete as the data evolves.

This setting will apply to all Shuffle operations in the query

Prior to the first phase of the previous example, the distribution and quantity of the data were known, and Spark could derive a reasonable partition value. However, for the second phase, this information does not yet know the price to pay to perform the actual processing of the first phase: therefore, resort to magic numbers.

Design principle of adaptive query execution

The main idea of AQE is to make the execution plan non-final and allow audits to take place at the boundaries of each phase. Therefore, the execution plan is broken down into a new "query phase" abstraction defined by the phase.

The catalyst now stops at the boundary of each stage to try and apply other optimizations based on the information available on the intermediate data.

Therefore, AQE can be defined as a layer above Spark Catalyst, which dynamically modifies the Spark plan.

Are there any shortcomings? There are some, but they are very small:

Execution stops at the boundary of each phase of the Spark to view its plan, but this is offset by a performance improvement.

Spark UI is more difficult to read because Spark creates more jobs for a given application, and these jobs do not take up the Job groups and descriptions you set.

Adaptive number of Shuffle partitions

This feature of AQE has been available since Spark 2.4.

To enable it, you need to set spark.sql.adaptive.enabled to true, which defaults to false. When AQE is enabled, the number of randomly adjusted partitions is automatically adjusted and is no longer the default value of 200or manually set.

This is the result of the execution of the first TPC-DS query before and after AQE is enabled:

Dynamically convert a sort merge join to a broadcast join

When the runtime statistics for any join side are less than the broadcast hash join threshold, AQE converts the sort merge join to the broadcast hash join.

This is the final phase of the execution of the second TPC-DS query before and after enabling AQE:

Dynamic merging of shuffle partitions

If the number of random playback partitions is greater than the number of keystroke groups, a lot of CPU cycles will be wasted due to the unbalanced distribution of keys.

When two

Spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled

Set to true, Spark merges contiguous shuffle partitions based on

Set the target size specified for spark.sql.adaptive.advisoryPartitionSizeInBytes to avoid performing too many small tasks.

Dynamically optimize inclined connections

Tilt is a stumbling block to distributed processing. It may actually pause your processing for several hours:

If no optimization is made, the time required to perform the connection is defined by the largest partition.

Therefore, the skew join optimization divides partition A0 into subpartitions using the values specified by spark.sql.adaptive.advisoryPartitionSizeInBytes, and joins each of them to the corresponding partition B0 of table B.

Therefore, you need to provide your skew definition to AQE.

This involves two attributes:

Spark.sql.adaptive.skewJoin.skewedPartitionFactor is relative: if the partition size is greater than this factor times the median partition size and is also larger, the partition is considered to be skewed

Spark.sql.adaptive.skewedPartitionThresholdInBytes, this is absolute: this is the threshold, below which will be ignored.

Dynamic partition pruning

The idea of dynamic partition pruning (DPP) is one of the most effective optimization techniques: reading only the data you need. DPP is not part of AQE; in fact, AQE must be disabled for DPP. On the plus side, this allows reverse migration of DPP to Spark 2.4 for CDP.

The optimization is implemented in both logical and physical plans.

At the logical level, the dimension filter is identified and propagated to the other side of the scan through the connection.

Then, at the physical level, the filter is executed once on the dimension side, and the result is broadcast to the main table, where the filter is also applied.

If spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is disabled, DPP can actually be used with other types of joins (for example, SortMergeJoin).

In that case, Spark estimates whether the DPP filter actually improves query performance.

DPP can greatly improve the performance of highly selective queries, for example, if your query is filtered out of one month out of five years of data.

Not all queries have such a significant improvement in performance but 72 of the 99 TPC-DS queries have been positively affected by DPP.

Spark is still a long way from its original core paradigm: lazily executing optimized static plans on static datasets.

The static dataset part is challenged by streaming technology: the Spark team first created a clumsy design based on RDD, and then came up with a better solution involving DataFrames.

The static planning part is challenged by the SQL and Adaptive Query Execution frameworks, and in a sense what the structured flow is for the initial flow library: it should always be an elegant solution.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 265

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report