Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to gain an in-depth understanding of CBO optimization based on the statistics in Spark SQL explaind

2025-04-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

It is believed that many inexperienced people have no idea about how to understand CBO optimization deeply according to the statistical information in Spark SQL explaind. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

The Spark SQL optimizer uses two optimization methods: rule-based and cost-based. The former depends on heuristic rules, while the latter depends on the statistical properties of the data. In this article, we explain how these statistics are used at the bottom, and in which scenarios they are useful, and how to use them.

Most heuristic-based optimization rules do not take into account the data attributes being processed. For example, the heuristic-based PredicatePushDown rule is based on the assumption that it is filtered first and then calculated.

However, there are scenarios where spark can come up with a better plan based on the statistics of the data, which is often referred to as cost-based optimization or CBO. Let's explore the details.

How do you see statistics?

In order to see the statistics of a table, we first need to calculate by running the sql statement (all SQL statements can be specified by using the sql () function, spark.sql (a specified sql string is required):

ANALYZE TABLE table_name COMPUTE STATISTICS

After running this, the table-level statistics are counted and stored in the metadata, which can be viewed by the following statement:

DESCRIBE EXTENDED table_name

This will show some table properties and table-level statistics. There are two kinds of dimensional information: rowCount and sizeBytes: in addition to table-level statistics, there are also column-level statistics, which can be calculated and viewed by the following statement:

ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS col_nameDESCRIBE EXTENDED table_name column_name

This will show us a table similar to the following (the column we use in this example is user_id): as you can see, there are a variety of column dimension information, such as maximum, maximum and minimum values, the number of null values, the number of deduplicated values (approximate values), and so on. Since Spark 3.0, there are more options to display this information, not only the tables but also the actual query statements. This can be achieved through the mode parameter of explain:

Spark.table (table_name). Explain (mode='cost')

This will show us two kinds of query plan, the physical plan and the optimized logical plan, which will show some statistics, as shown in the following picture:

The important point is that you can see the statistics for each operation of the plan, so you can see the estimation of the statistics after various changes. These statistics are first calculated through the Relation operation, the so-called leaf node, and each leaf node is responsible for the calculation, and then some rules are propagated through the logical plan.

Next, we'll see how leaf nodes calculate statistics and how they spread.

How is statistics calculated?

There are three ways for leaf nodes to calculate statistics: the first and best is the statistics obtained from metadata. The second is that spark will use InMemoryFileIndex, and he will call the underlying Hadoop API to collect the size of each file in the data source and sum the total sizeInBytes (here only the sizeInBytes metric will be calculated). The last way is that spark will use the default sizeInBytes dimension value, which is configured by spark.sql.defaultSizeInBytes and the default value is 8EIB, so basically spark will recalculate and overwrite Relation sizeInBytes as much as possible. (this is also the only measure used by sizeInBytes), these three ways can be described by the following chart: this icon is a tree, each node is a condition, if the condition is true, we will go to T, otherwise to F. The leaves will represent the actual way the statistics will be calculated. For example: InMemoryFI means that only sizeInBytes will call Hadoop API for calculation. On the other hand, Stats From M means that statistics will be obtained from metadata, while all statistics on the left will be obtained from metadata, while only metricsInBytes dimensions on the right will be fetched. The leaf node CatalogFileIndex represents the last method-the sizeInBytes with the default value of 8EIB will be used.

In the chart, we have four conditions, the first one determines how the statistics are obtained: if we read the data as a table DF = spark.table (table_name), then we go to the left, otherwise to the right, the next condition is whether cost-based optimization (CBO) is turned on, this is configured through spark.sql.cbo.enabled, and the default value is false (to spark 3.0.0). The third condition is whether the statistics of the metadata are calculated by analyzed table command (ATC), and the last one is whether the table is partitioned. At best, we read data as a table, CBO is on, and ATC is already running, in which case all statistics will be obtained from metadata (except sizeInBytes calculated from rowCount). On the other hand, in the worst case, we read data as a table, but ATC is not running and the table is partitioned, in which case the default sizeInBytes will be read from the configuration And the calculation is very inaccurate, note that the worst-case scenario has nothing to do with whether CBO is turned on or not. One thing to note: if the table is not partitioned, spark will use Hadoop API to calculate the sizeInBytes, so whether the table is partitioned or not directly affects how leaf node statistics are calculated.

How is the statistics spread through the plan?

Once the statistics of the leaf node are calculated, the statistics will be propagated to other nodes. There are two modes of communication: the first (what we call the old way) is very basic and only one dimension sizeInBytes is propagated, and the way in which the dimension is adjusted in various operations is also very basic. For example, the Filter operation does not adjust the value of sizeInBytes, as follows:

(spark.table (table_name) .filter (col ("user_id") < 0) .explain (mode= "cost")

In this query, we filter that there is actually no record except for all records whose user_id is negative, but spark cannot get this information because it requires column-level statistics, which would not be used in the old way. So you can see from this query plan that only sizeInBytes is propagated and the value remains the same in both operations.

The second way to spread statistics is more mature, starting with spark 2.2, but it requires CBO to be turned on, and it requires metadata to store statistics through ATC. At this point, all the information will be propagated, adding the dimension we provided at the column level, spark will calculate the filter operation and calculate a better statistical information: as you can see, the statistics for the fiter operation have been changed, rowCount is non-zero, sizeInBytes is 1B, which is the minimum. From this user_id column-level statistics, spark can know that negative user_id records exist. This can be reflected in the query plan.

In this new approach, in order to calculate sizeInBytes,spark, we first calculate the size of a single row based on each data type, and then multiply it by rowCount to get the final sizeInBytes. If rowCount is zero, sizeInBytes will be set to 1 to avoid zero division problems in other unified calculations. This also applies to project operations (spark knows which columns will be projected, so you need to calculate the size of a single row in advance)

How statistics are used

Now that we know how statistics are calculated and propagated through logical plans, let's take a look at how they are used in query plans to get better plans.

There are two places where statistics will be used: the first is the JoinSelection strategy, where spark will decide which algorithm to use for join and two DataFrame (see here for more details. The basic logic is that if a df is less than a certain threshold, spark will use BraodcastHashJoin (BHJ), because if the broadcast df is very small, this will be a very effective way. This threshold is configured through spark.sql.autoBroadcastJoinThreshold, and the default is 10MB, so if there is a good estimate of the size of df, it can help us to choose a better join to optimize short hair.

The second place is also related to join, that is, the joinRecorder rule, which spark will find the optimal order of join operations (if you join more than two tables). This rule is off by default, and if you want to turn it on, configure it as follows:

Spark.conf.set ("spark.sql.cbo.joinReorder.enabled", True)

We can control the maximum number of df through the following properties:

Spark.conf.set ("spark.sql.cbo.joinReorder.dp.threshold", n)

The default value for n is 12.

When to use ANALYZE TABLE command (ATC)?

We already know that if a table is partitioned and we are not running ATC,spark, the default value 8EIB will be used, which is large. So when we join a lot of tables and these tables are partitioned and very small, they can do BHJ, and it makes sense to run ATC. Of course, we must be clear that if the data added to a table is appended or overwritten, the previous statistics will be deleted, so we have to rerun ATC. In some cases, updating the statistics of metadata is more complex. One solution is to take advantage of the new features of adaptive query-spark 3.0.

Adaptive query

The new feature, Adaptive query (AQE), was developed in spark 3.0, which will use statistics in a more advanced way. If AQE is enabled (not enabled by default), the statistics will be recalculated after each stage is executed. This allows you to get more accurate statistics so that you can decide whether or not to use BHJ,AQE itself is a big topic, and we'll cover it in several articles.

After reading the above, have you learned how to learn more about CBO optimization based on the statistics in Spark SQL explaind? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report