Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Case Analysis of Jindo SQL performance Optimization

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)05/31 Report--

In this article, the editor introduces in detail the "case analysis of Jindo SQL performance optimization", with detailed content, clear steps and proper handling of details. I hope that this "case analysis of Jindo SQL performance optimization" article can help you solve your doubts.

Background introduction

The TPC-DS test set uses star, snowflake and other multidimensional data models, including 7 fact tables and 17 dimension tables. Take store channel as an example, the relationship between fact tables and dimension tables is as follows:

By analyzing all 99 query statements of TPC-DS, it is not difficult to find that the filtering conditions of most statements are not directly applied to the fact table, but are indirectly accomplished by filtering the dimension table and join the result set with the fact table. Therefore, it is difficult for the optimizer to directly use the fact table index to reduce the amount of data scanning. How to make good use of the dimension table when the query is executed to filter information, and push the information down to the storage layer to complete the fact table filtering is very important to improve performance.

In the ranking test in 2019, our RuntimeFilter optimization based on Spark SQL Catalyst Optimizer improved the overall performance of 10TB data 99 query by about 35%. To put it simply, RuntimeFilter includes two core optimizations:

Dynamic partition clipping: the fact table builds the table with the date column (date_sk) as the partition column. When the fact table and the date_dim table join, optimizer collects all date_sk values of the date_dim filtered result set at run time, and filters out all missed partition files before scanning the fact table.

Dynamic filtering of non-partitioned columns: when the join of the fact table and the dimension table is listed as a non-partition column, optimizer dynamically builds and collects the Min-Max Range or BloomFilter of the join column in the dimension table result set, and pushes it down to the storage layer when scanning the fact table, using storage layer indexes (such as Parquet, ORCFile's zone map index) to reduce the amount of scanned data.

Analysis of problems

In order to further tap the potential of RuntimeFilter optimization, we select some query with long execution time for detailed performance analysis. These query contain complex join that are larger than one fact table and multiple dimension tables. After analyzing the performance improvement effect of RuntimeFilter on each query, we found that:

The performance improvement of dynamic partition tailoring is obvious, but it is difficult to have room for further optimization.

The contribution of non-partitioned column dynamic filtering to overall improvement is much smaller than that of partition clipping, mainly because many filtering conditions pushed down to the storage layer do not achieve the effect of index scanning.

Smart students should have found that only the dimension table date_dim is related to the partition column, then all join queries with other dimension tables benefit less from RuntimeFilter optimization. For this situation, we have done a further disassembly analysis:

Most of the join columns are self-increasing primary keys of the dimension table and have no correlation with the filtering conditions, so the values of the result set are often evenly and sparsely distributed in the whole value space of the column.

For fact tables, consider the most common method of Zone Map indexing. Because the load phase does not do any aggregation operation (Clustering) for non-partitioned columns, the values of each zone are generally sparsely scattered in the value range of each column.

Compared with BloomFilter,Min-Max Range, the construction cost and index query cost are much lower, but because the information granularity is too coarse, the hit effect of index filtering will be much worse.

Considering the above, one possible optimization direction is to sort the fact table by join column during the load phase. However, this method will significantly increase the execution time of the load phase, which may lead to a decrease in the total score of TPC-DS evaluation. At the same time, due to the complexity of the optimization in the table building stage, the promotion and use of the actual production environment will be limited.

RuntimeFilter Plus

Based on the above analysis, we believe that it is difficult to improve query performance by pushing to the storage layer under filtering conditions, and try to explore in other directions:

Do not depend on the storage layer index

Not only optimize the fact table and dimension table join

Finally, we extract two new runtime filtering optimization points: dimension table filtering broadcast and fact table join dynamic filtering, and extend the implementation on the basis of the original RuntimeFilter optimization.

Dimension table filter broadcast

The scenario for which it is targeted is shown below:

When the fact table (lineorder) is continuously multi-join with the filtering results of multiple dimension tables, the filtering information of all dimension tables can be pushed down to before the join. The main difference between this method and our RuntimeFilter is that the complete multi-join tree rather than the local binary-join tree is considered when pushing down. The optimization effect is that even if the join ordering is bad case, the useless fact table data can be filtered out as soon as possible, that is, making the query execution more robust.

With reference to the algorithm in this paper, we implemented the first version of the filter push-down rule, but did not achieve the expected performance improvement, the main reasons are:

Spark CBO Join-Reorder combined with our genetic algorithm optimization, has achieved close to the optimal join ordering effect.

The performance of the front-end LIP filters is not significantly better than that of the Spark BroadcastHashJoin operator.

Based on the idea that filter conditions can be transferred to any node of complex multi-join tree to diverge thinking, we find that when there are multiple fact tables in multi-join tree, the dimension table filter conditions can be broadcast to all fact tables scan, thus reducing the amount of data that time-consuming operators such as subsequent fact tables SortMergeJoin need to process. Take a simplified version of query 64 as an example:

With cs_ui as (select cs_item_sk,sum (cs_ext_list_price) as salefrom catalog_sales,catalog_returnswhere cs_item_sk = cr_item_skand cs_order_number = cr_order_numbergroup by cs_item_sk) select i_product_name product_name,i_item_sk item_sk,sum (ss_wholesale_cost) s1from store_sales,store_returns,cs_ui Itemwhere ss_item_sk = i_item_sk andss_item_sk = sr_item_sk andss_ticket_number = sr_ticket_number andss_item_sk = cs_ui.cs_item_sk andi_color in ('almond','indian','sienna','blue','floral','rosy') andi_current_price between 19 and 19 + 10 andi_current_price between 19 + 1 and 19 + 15group by iTunes productName

The plan tree of the query is shown in the following figure:

Considering the execution flow of unrealized dimension table filtering broadcast, the store_sales data is filtered by RuntimeFilter and BroadcastHashJoin operators, but because the filtered data is still large, all subsequent join need to use the expensive SortMergeJoin operator. However, if you push LIP filter down to the scan operator of four fact tables (no need to push down to the storage layer), it not only reduces the amount of join data, but also reduces the amount of group-by aggregation data after the catalog_sales and catalog_returns tables join.

LIP implementation

In the optimizer layer, we insert the PropagateDynamicValueFilter rule after the SyntheticJoinPredicate rule of the original RuntimeFilter to broadcast the synthesized dynamic predicate to all legitimate join subtrees; at the same time, combined with the original predicate push-down logic, we ensure that the dynamic predicate will eventually propagate to all related scan operators. In the operator layer, the underlying implementation of LIP filters can be HashMap or BloomFilter. According to the data characteristics of TPC-DS, we choose BitMap as the underlying implementation of broadcast filtering conditions. Because BitMap itself is accurate (Exact Filter), semi-join elimination optimization can be further done with primary and foreign key constraint information. Optimization rules based on primary and foreign key constraints will be described in detail in a series of subsequent articles.

After applying the optimization, the execution time of query 64 is reduced from 177s to 63s, and the speedup is 2.8times.

Fact table Join dynamic filtering

Using BloomFilter to optimize large table join is a common query optimization technique. For example, in the paper "Building a Hybrid Warehouse: Efficient Joins between Data Storedin HDFS and Enterprise Warehouse" https://researcher.watson.ibm.com/researcher/files/us-ytian/published_tods.pdf, the zig-zag join method of alternately applying BloomFilter to two tables of join is proposed to reduce the total amount of data transmission in distributed join. For TPC-DS test set, take query 93 as an example, the size of the result set after store_sales and store_returns join is much smaller than the original data amount of store_sales, so it is very suitable to apply this optimization.

The construction and application of BloomFilter have high computing overhead. For join with large selectivity, blindly using this optimization may lead to performance degradation. The estimation of join selectivity based on static stats often has errors, and the existing CBO optimization rules of Spark are not suitable for robust BloomFilter join optimization decisions. Therefore, we implement dynamic BloomFilter join optimization rules based on Spark Adaptive Execution (AE) runtime re-optimization mechanism. The basic principle of AE is that after each stage execution of the query job is completed, it allows the optimizer to readjust the subsequent physical execution plan according to the stage stats information collected at run time. Currently, three main optimizations are supported:

(1) reduce stage concurrency adjustment

(2) shuffle data equilibrium distribution in the case of skew

(3) convert SortMergeJoin to BroadcastHashJoin

The flow of optimization rules based on AE is as follows:

Based on the static stats, it is determined whether the size on one end of the join may be suitable for building BloomFilter (build side). If so, the scan stage of build side and stream side will be submitted for execution in sequence; otherwise, the two stage will be executed in parallel.

After the scan stage execution of build side is completed, AE estimates the cost based on the size and join column histogram collected at run time, and decides whether to go to BroadcastHashJoin, BloomFilter-SortMergeJoinJoin or the original SortMergeJoin.

When the physical execution plan is BloomFilter-SortMergeJoinJoin, the optimizer inserts a new job and scans build side's shuffle data to build BloomFilter and pushes it down to stream side's scan stage.

BloomFilter operator implementation

In order to reduce the extra overhead caused by BloomFilter, we re-implement efficient BuildBloomFiler and Native-InBloomFilter operators. In the construction phase, using RDD aggregate to merge the BloomFiler of data fragments will cause driver to become the performance bottleneck of data transmission and bitmap merging computing; using RDD treeAggregate to implement parallel hierarchical merging significantly reduces the overall construction delay. In the filtering phase, the Native-InBloomFilter operator is pushed into the scan operator and executed. The operator directly accesses the Spark column read memory format and calls the SIMD-optimized native function according to batch data to reduce the CPU execution overhead. At the same time, we replace the original algorithm with the Blocked BloomFilter algorithm, which sacrifices a small amount of bitmap storage space in exchange for a lower CPU cache miss rate when accessing memory.

After applying the optimization, the execution time of query 93 is reduced from 225s to 50s, and the speedup is up to 4.5times.

After reading this, the article "case Analysis of Jindo SQL performance Optimization" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it yourself. If you want to know more about related articles, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report