Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

[big data] predicate push-down processing in SparkSql join query (1)

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

This article was first posted on the official account of Wechat, vivo Internet technology.

Author: Li Yong

Table of contents:

1.SparkSql

two。 Join queries and join conditions

3. Predicate pushdown

4. Predicate push-down rules in inner join queries

After 4.1.Join, the condition is connected through AND.

After 4.2.Join, the condition is connected through OR.

4.3. Partition tables use OR connections to filter criteria

1.SparkSql

SparkSql is a distributed Sql engine based on Spark computing framework. It uses DataFrame and DataSet to carry structured and semi-structured data to achieve complex data query processing. The DSL provided can directly use scala language to complete Sql query, while thriftserver is also used to provide service-oriented Sql query function.

SparkSql provides DataSource API, through which users can develop a set of Connector to directly query all kinds of data sources, including NoSql, RDBMS, search engine and files on distributed file systems such as HDFS. And SparkSql similar systems are Hive, PrestoDB and Impala, this kind of system belongs to the so-called "Sql on Hadoop" system, each is quite popular, after all, in this era of not doing SQL is playing hooligans, it is really difficult to find users without SQL.

two。 Join queries and join conditions

Join query (join) in Sql is mainly divided into inner join query (inner join), outer join query (outter join) and semi-join query (semi join). The specific difference can be explained by wiki.

A join condition (join condition) means that only when this condition is satisfied can two rows of data from two tables be returned together. For example, there is a query like this:

The "LT.id=RT.idAND LT.id > 1" part of the condition is called "join condition", which is directly used to judge whether the two row records of the two tables being join can be join together. If this condition is not met, the two row records of the two tables are not all kicked out, but are processed differently according to the type of join query, so this is not a single table filtering process or a "joint filtering" process of two tables. The part of "RT.id > 2" after where is called "post-join condition". Although it becomes the "post-join condition", it is not necessary to filter the data after join. It just states that if you filter after join, you can definitely get a correct result, which is also the benchmark method for us to get correct results when analyzing problems later.

3. Predicate pushdown

The English definition of the so-called predicate (predicate) is as follows: A predicate is a function that returns bool (or something that can be implicitly converted to bool), that is, a function whose return value is true or false. Students who have used scala or spark know that there is a filter method, and the argument passed in by this higher-order function is a function that returns true or false.

But if you are in SQL, there are no methods, only expressions. The expression after where acts as a filter, and this part of the statement is parsed by the sql layer and rendered in the form of predicates inside the database.

So the question is, why do predicates push down? The predicate push-down in SparkSql has two meanings: the first meaning refers to who completes the data filtering, and the second meaning refers to when the data filtering is completed. To answer these two questions, we need to understand the Sql statement processing logic of SparkSql. We can roughly divide the query processing flow in SparkSql as follows:

SparkSql first performs a series of Analyse on the input Sql statements, including lexical parsing (which can be understood as the process of word segmentation in search engines), syntax analysis and semantic analysis (such as rules such as judging the existence of database or table, group by must be combined with aggregate functions, and so on), followed by the generation of execution plans, including logical plans and physical plans. Among them, there will be a lot of optimization in the logical plan stage, and the processing of predicates will be completed at this stage; the physical plan is the process of generating RDD's DAG diagram; after these two steps are completed, they will be specifically implemented (that is, various heavyweight computing logic, such as join, groupby, filter, distinct, etc.), which will result in the intrusions of various physical operators (RDD's Transformation).

There are two subjects that can complete data filtering, the first is the distributed SQL layer (in the execute phase), and the second is the data source. So the first meaning of the predicate push down is whether the filtering is done by the Filter operator of the Sql layer or by the Scan operator during the scan phase.

As mentioned above, we can query all kinds of data sources through Data Source API encapsulating SparkSql, so if the underlying data source cannot efficiently complete data filtering, a global scan will be performed and each related data will be handed over to the Filter operator of SparkSql to complete the filtering. Although the Code Generation technology used by SparkSql has greatly improved the efficiency of data filtering, this process can not avoid reading a large amount of data to disk. Even in some cases, network IO is involved (for example, when data is not locally stored) If the underlying data source can filter the data very quickly during scanning, then the filtering will be handed over to the underlying data source (as to which data sources can filter data efficiently and how SparkSql completes efficient data filtering is not the focus of this article, but will be discussed in other series of articles).

Then the predicate pushes down the second meaning, that is, when to complete the data filtering generally refers to the join query, whether to filter the single table data first and then join with other tables or join multiple tables first and then filter the joined temporary tables? it is the focus of this series of articles to analyze and discuss.

4. Predicate push-down rules in inner join queries

Suppose we have two tables with a simple structure and only two pieces of data, but enough to explain our push-down rules. The two tables are as follows, one lefttable and one righttable:

After 4.1.Join, the condition is connected through AND.

Let's first look at a query statement:

This query is an inner join query. The condition after join is the filter condition of the two tables joined with and. Assuming that we do not push down, but do the inner join judgment first, we can get the correct result. The steps are as follows:

The row with id 1 in the left table can be found in the right table, that is, the two rows of data can be "join" together.

The row with id 2 in the left table can be found in the right table, and the two rows can also be "join" together.

At this point, the temporary result table for join (because it is temporary because it has not been filtered yet) is as follows:

Then the where condition is used to filter. Obviously, the first row in the temporary table does not meet the condition and is filtered out. The final result is as follows:

Let's take a look at the pushdown of advanced line predicates. First, filter the two tables, and the filtering results are as follows:

Then the two filtered tables are internally joined, and the results are as follows:

It can be seen that this is consistent with the result of join and then filtering.

After 4.2.Join, the condition is connected through OR.

Let's take a look at another query statement:

Let's do join processing first, and the result of the temporary table is as follows:

Then use the where condition to filter, and the final query result is as follows:

If we first use the where condition followed by each table's own filter condition to filter, the filtering results of the two tables are as follows:

Then the two temporary tables are internally joined, and the results are as follows:

There is something wrong with the form, only the field name, no field value, how to return a responsibility? Yes, you read it correctly, there is no value, because the filter result of the left table only has the row with id 1, and the filter result of the right table only has the row with id 2. These two rows cannot be joined internally, so there is no result.

So why do the conditions of the two tables in the where condition get the wrong query results when they are joined by or? The main reason for the analysis is that, for the filter conditions on both sides of or, any one that meets the conditions can return TRUE, then for the query condition "LT.value='two' OR RT.value = 'two'", if you use LT.value='two' to filter out the records of the left table with only LT.value as' two', then for the rows in the left table whose LT.value is not two, they may use the id field to join with the right table The RT.value in the right table happens to be two, which also satisfies "LT.value = 'two' OR RT.value =' two'", but unfortunately, this line of records have been filtered out because of previous rough processing, resulting in incorrect query results. Therefore, predicates cannot be pushed down in this case.

However, there are two exceptions to the condition after OR joins two tables join, and the first exception is analyzed here by the way. The first exception is that the filter condition field happens to be the Join field, such as the following query:

In this query, the condition after join is still the filter condition of using OR to join the two tables. The difference is that the condition in join is no longer id equality, but the value field is equal, that is, the filter condition field happens to be the join condition field. You can use the above step-by-step method to analyze the query results when the predicate pushes down and does not push, and the results are the same.

Let's see if what happens when the above cannot be pushed down will occur in this kind of query. For the left table, if you use LT.value='two' to filter out other rows that do not meet the criteria, then because the join condition field is also a value field, it means that the row in the left table whose LT.value is not equal to two cannot be equal to two in the right table, otherwise "LT.value=RT.value" will not be satisfied. There is actually a process of condition passing. Through the conditions in join, the two tables have been logically integrated into one table in advance.

As for the second exception, it involves an optimization in SparkSql, so it needs to be described separately.

4.3. Partition tables use OR connections to filter criteria

What happens if both tables are partitioned? Let's first look at the following query:

At this point, the left and right tables are no longer ordinary tables, but partitioned tables. The partition field is pt, and the data is partitioned by date. At the same time, the query conditions of the two tables are still joined by OR. Just imagine, if the two tables cannot be filtered in advance, there will be a very large amount of data to be joined first, which is very expensive. But what if, according to our analysis in 2, we use OR to join the filter conditions of two tables, and you can't push down predicates at will? SparkSql uses an optimization method called "partition clipping" here, that is, partitioning is not regarded as a common filter condition, but an one-size-fits-all approach is used to exclude directories that do not meet the query partition criteria directly from the directories to be scanned.

We know that the partition table stores the data of a partition by directory on HDFS, so when clipping the partition, directly inform the Spark's Scan operator of the HDFS directory to be scanned, so that when Spark scans, it can directly click off other partition data. However, in order to complete this optimization, the semantic analysis logic of SparkSql is needed to correctly analyze the exact purpose of the Sql statement, so the partition field is also independent of other ordinary fields in the metadata of SparkSql, and is marked separately, in order to facilitate the semantic analysis logic to distinguish and deal with this special case in the where condition of the Sql statement.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report