Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is it like to control the execution of sql statements in Spark SQL

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

In this issue, the editor will bring you about how to control the execution of sql statements in Spark SQL. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

Background

Since the sql of spark 2.x and the DataFrame APi of Shen Ming Bank, it has become more and more convenient to query data in spark. Complex query logic and complex operations can be expressed in just a few lines of code. The biggest advantage of this api is that users do not need to consider too much execution, and the optimizer automatically optimizes the most efficient way to execute the query. And effective query execution is not only because it can save resources, but also can reduce the time for end users to wait for results.

The Spark SQL optimizer is actually quite mature, especially with the advent of 3. 0, which introduces new features such as dynamic branch clipping and dynamic query execution. The optimizer works within the query plan and can apply a variety of rules to optimize the query plan. For example, the execution order of transformation can be changed or the final result can be discarded directly. Although there are many excellent optimizations, there are some scenarios where people can do better. In this article, let's look at special cases and use some techniques to better execute query plans.

Examples

First, let's introduce an example. Join us with the following data in json format:

{"id": 1, "user_id": 100,50} {"id": 2, "user_id": 100,200} {"id": 3, "user_id": 101,120} {"id": 4, "price": 120}

Each record is like a transaction, and the user_id column may contain many duplicate values (or null), in addition to other columns to describe the transaction. Now our query is based on two aggregate union operations, and the difference between the two aggregations lies only in the filter conditions. In the first aggregation, we want to get users whose total price is less than 50, in the second aggregation, we want to get users whose price is more than 100, and in the second aggregation, we only consider those whose user_id is not null. This example is only a simplified version of a complex example, but such a complex example is real. Here is the use of PySpark DataFrame API to express the query we want:

Df = spark.read.json (data_path) df_small = (df.groupBy ("user_id") .agg (sum ("price"). Alias ("price")) .filter (col ("price"))

< 50))df_big = (df.filter(col("user_id").isNotNull()).groupBy("user_id").agg(sum("price").alias("price")).filter(col("price") >

Result = df_small.union (df_big) Plan interpretation and translation

The key to optimizing query performance is to be able to understand and interpret the translation query plan. The plan itself can be shown through the Spark DataFrame explain function, or if the plan is already running, we can find the tab of SQL through Spark UI to find the plan. This SQL tab has a list of completed and running queries, so select our query to see a graphical representation of the physical plan (here we remove the metric information to make the graph ⌚️ easier) the plan is a tree structure, each node represents some operations and carries some execution information. We can see that in this example we have two branches, and a root branch at the bottom, and the leaves at the top, where the execution begins. The scan json leaf node represents reading data from the source, and then there is a pair of hashAggregate operations that represent aggregation. There is an Exchange operation between the two aggregation operations, which represents shuffle. The filters operation carries the filter condition information.

This plan is a typical union operation, where each dataframe has a new branch, and because in our example DataFrame is based on the same data source, this means that the data source is scan twice. Now we can see that there is room for optimization. Making the data source scan only once is a good optimization, especially when IO is very expensive.

What we want to implement here is to reuse calculations-- scan data and aggregate calculations-- because the operation on DataFrame is the same, and in principle one calculation is sufficient.

Cache caching

A typical solution to recalculation in spark is to use cache. There is a cache function in DataFrame:

Df.cache ()

This is a delayed conversion, which means that data is put into the cache layer only after some action is triggered. Caching is a very common operation in spark, but this is limited, especially when the amount of data is large and the resources of the cluster are very tight. And we must be aware that storing data in the buffer layer requires additional overhead, and the operation itself requires overhead. Invoking the cache operation throughout the DataFrame df is not optimized because it caches all columns into the store. A better approach is to cache only the fields that are used.

Reuse Exchage

In addition to caching, there is another method that is not easy to describe graphically and is based on the reuse of Exchange. This Exchange operation represents the shuffle operation used to move data between clusters. Shuffle operations are generally used in aggregation, join, and some transformation operations. The important thing about shuffle is that spark always stores data written by shuffle on disk, and because it is stored on disk, it can be reused if necessary. In fact, spark will reuse this data at some point. For example, when spark finds multiple branches from the leaf node to the exchange node, the reuse operation [ReuseExchange rule] will be carried out. If this happens, it means that our duplicate branches have the same calculation and can be reused. We can tell if there is such a scenario from the plan, because these branches should look like this:

In our example, spark does not reuse Exchange, but there are some techniques that can be used to make it reuse. The reason why Exchange cannot be reused in our example is that the branch on the right has the condition that user_id is not null. This filter condition is the only difference between the two branches of the union operation, and if we can eliminate this difference, spark will reuse EXchange.

Improvement of the plan

How can we branch to be the same? If this filer operation is caused, we can reverse the order of the filter and filter after the aggregation, because this has no effect on the result. However, there is a trap. If we modify it as follows:

Df_big = (df.groupBy ("user_id") .agg (sum ("price"). Alias ("price"). Filter (col ("price") > 100) .filter (col ("price"). IsNotNull ()

When we check the final query plan again, we find that the plan has not changed. The explanation is simple-the filter operation is moved by the optimizer.

Conceptually, there are two types of planning, logical planning and physical planning, which are easy to understand. And the logical plan goes through an optimization phase before it is converted to a physical plan. When we change some transformations, we reflect directly in the logical plan. The optimizer applies a series of optimization rules, which are usually based on inference. In our example, the rule is PushDownPredicate, which ensures that the filters operation is moved as close to the data source as possible. It comes from the operation of filtering and then data set is more efficient. This rule is useful in most scenarios. However, our example does not apply here.

In order for filter to be in the right place, we must limit the optimizer. Since spark 2.4, we have been able to configure items to get the optimizer to exclude certain rules:

Spark.conf.set ("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.PushDownPredicate")

Once this is set, run the query statement again, and we can see that the location of the filters operation is just as we thought. The two branches are the same, spark will reuse Exchange, the data will be scanned only once, and the aggregation operation will be calculated only once.

In the case of spark 3.0, the optimization rule has a different name-PushDownPredicates, and there is an additional rule to push down filter-PushPredicateThroughNonJoin, so we actually need to exclude two rules.

Summary

We see that through this, spark developers give us the ability to control the optimizer. But it also comes with a responsibility, and we list some key points when using this technology:

When we exclude PushDownPredicate, we are responsible for all the filter in this query, not just the filter we want to relocate. There is also another kind of filter, which is very likely to occur, such as partition filter, so we need to make sure they are in the right place.

With the optimizer limited, it is the user's job to use filter. In our example, the accelerated query is in the case where IO is expensive, because we can realize that the data can only be browsed once, and if the data has many columns, this applies to cases where the file format is not a column format, such as json or csv format.

If the dataset is small, it is not worth controlling the optimizer, but cache can achieve the same effect. However, when the dataset is large, the extra overhead of storing the data is obvious. On the other hand, there is no extra overhead to reuse Exchange, because shuffle data is stored on disk.

This technology is based on spark's internal behavior, has no official documentation, and is difficult to detect if there are future functional changes. In our example, there is a change in spark 3.0, where the rule is renamed first and another rule is added

Conclusion

We know that the premise of optimization is that we can understand the query plan. Spark's optimizer can optimize our query well through a series of derivation rules. However, there are some scenario optimization rules that do not apply. Sometimes query rewriting is good and sometimes bad, because rewriting queries will implement different logical plans, and we have no direct control over the physical plans being executed. Because since spark 2.4, we can configure excludedRules to limit the optimizer, customizing some regular physical plans from the future.

In many scenarios, depending on the optimizer, we can get a fixed plan and have an efficient execution. However, there are some performance pressures, and here we can check the final plan and see if it can be optimized by limiting the optimizer.

This is how the Spark SQL controls the execution of sql statements shared by the editor. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report