Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to rewrite the execution plan of EMR Spark Relational Cache

2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to rewrite the implementation plan of EMR Spark Relational Cache. The content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Background

The Relational Cache function provided by EMR Spark can accelerate Spark SQL through pre-calculation and efficient storage of the data model, and realize the purpose of real-time query of massive data by using Spark SQL for customers. The working principle of Relational Cache is similar to materialized view. When the user submits the SQL statement, it analyzes the statement and selects the available precomputation results to speed up the query. In order to achieve efficient reuse of precomputation results, the precomputation cache we build is generally universal, so for the user query, further calculation is needed to get the final result. Therefore, how to quickly find the matching cache and build an accurate new execution plan is particularly important.

Materialized View, supported in Hive 3.x, takes advantage of Apache Calcite to rewrite the execution plan. Considering that Spark SQL uses Catalyst for execution plan optimization, introducing Relational Cache is too heavy, so Relational Cache in EMR Spark implements its own Catalyst rules to rewrite the execution plan. The content related to the execution plan rewriting is described below.

Implementation plan rewriting preparation work

Spark parses user query statements into Unresolved Logical Plan (unbound logical plan), Resolved Logical Plan (bound logical plan), Optimized Logical Plan (optimized logical plan), and Physical Plan (physical plan). Among them, the unoptimized logical plan will vary greatly according to different user query statements, and Relational Cache, as a part of optimization, is also more appropriate to be placed in the process of logical plan optimization, so the user query plan we get will be the logical plan in optimization. To match the logical plan in optimization, we choose to put this rewriting process in the later step of the Spark optimizer. At the same time, we parse the logical plan of Relational Cache in advance to obtain the optimized Cache plan and reduce the complexity of matching. In this way, we only need to match the two logical plans after the optimization of predicate pushdown, predicate merging, and so on.

Basic process

When matching, we want to match as many calculations and IO operations as possible, so we traverse the target plan in pre-order, match in turn, and try to find the most matching nodes. When judging whether the two nodes match, we use the method of post-order traversal, hoping to find the mismatch as soon as possible and reduce the execution time of plan matching. Then we will rewrite the plan according to the matching results, including further operations such as Filter, Project, Sort, and even Aggregate on the Cache data to make it completely equivalent to the matching node, and then update the reference binding of the logical plan node and seamlessly replace it into the logical plan, so that the final rewritten plan can be easily obtained.

Join matching

The Join in Spark is a binary operation, and the actual Join order may vary greatly according to some policies, so special handling must be done for Join nodes. We will first process the logical plan and rearrange the Join according to the Join order of the cache plan. This step is done before the tree match, avoiding the waste of time caused by repeated Join rearrangements. The rescheduled Join can be matched by us more likely.

In order to realize the versatility of Cache, according to the characteristics of star data model, we introduce the concept of Record Preserve. This is similar to the relationship of Primary Key/Foreign Key in traditional databases. When a table with a primary key and a table with a non-empty foreign key point to Join on a foreign key, the number of records will not change, a record will not expand, and a record will not be lost. The semantics of PK/FK are often missing in big data's processing framework, and we have introduced a new DDL that allows users to customize Record Preserve Join relationships. When the user-defined An Inner Join B is Record Preserve for table A, we also match the relationship between An Inner Join B and A. With the help of PK/FK, we can greatly increase the number of matches, and a Relational Cache can be shared by more seemingly different queries, which can save users additional storage and precomputing overhead.

Aggregate matching

In general, Aggregate matching is relatively simple, but the Grouping Set operation supported by Spark will build an Expand logical plan node, which is equivalent to converting a record into multiple records and marking them with Grouping ID. Since the child nodes of Expand are common to all Grouping cases, here we only match the child nodes once, and then match the above Grouping attribute and Aggregate attribute respectively. The main reason is that the attributes or aggregation functions needed to verify the target aggregation can be calculated from the aggregation results corresponding to a certain Grouping ID. For example, coarse-grained Sum can sum fine-grained Sum for secondary Sum summation, while coarse-grained Count for fine-grained Count should also be summed through secondary Sum, coarse-grained Average can not only be restored from fine-grained Average, and so on.

Plan rewrite

After finding the matching logical plan, it is the process of rewriting the logical plan. For the logical plan that does not need secondary aggregation, select the desired columns from the Relation of the cached data directly according to the schema of the cached data, filter according to the conditions, and then carry out subsequent operations. If you need secondary aggregation, you need to retain all the columns to be used externally, as well as the columns needed for aggregation, as well as the data needed by the aggregation function when selecting the required columns. The aggregate function of secondary aggregation needs to be rewritten on a case-by-case basis to ensure that the results that have been initially aggregated in Relational Cache can be used. It is necessary to judge whether the second polymerization can be made according to the meaning of the aggregation. If the Grouping Set is aggregated, the correct Grouping ID should be selected for filtering before the second polymerization. After the second aggregation, the steps are roughly the same as the normal rewriting, and only need to be replaced into the target plan.

Result

We use an example to illustrate the rewriting result of the logical plan. Star Schema Benchmark (thesis Link https://www.cs.umb.edu/~poneil/StarSchemaB.pdf) is a standard Benchmark for star model data analysis, and its structure definition is shown in the figure.

The SQL statement we use to build Relational Cache is as follows:

SELECT GROUPING_ID () AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM (lo_revenue) AS lo_revenue_SUM, SUM (lo_supplycost) AS lo_supplycost_SUM, SUM (V_REVENUE) AS V_REVENUE_SUMFROM supplier, p_lineorder, dates, customer PartWHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkeyGROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity) (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city) (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category) (d_year, s_nation, s_city, c_region, p_category, p_brand))

We choose a query as an example. Specific query statements:

Select c_city, s_city, d_year, sum (lo_revenue) as revenue from customer, lineorder, supplier Dates where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and c_nation = 'UNITED KINGDOM' and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and s_nation =' UNITED KINGDOM' and d_yearmonth = 'Dec1997' group by c_city, s_city D_year order by d_year asc, revenue desc

The original logical plan is as follows:

Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47 Sum (cast (lo_revenue_SUM#773L as bigint)) AS revenue#558L] +-Filter (isnotnull (s_nation#40) & & (s_city#39 = UNITED KI1) | | (s_city#39 = UNITED KI5) & & (s_nation#40 = UNITED KINGDOM) & & isnotnull (d_yearmonth#49)) & & (d_yearmonth#49 = Dec1997)) & & isnotnull (c_nation#23) ) & & (c_nation#23 = UNITED KINGDOM) & & ((c_city#22 = UNITED KI1) | | (c_city#22 = UNITED KI5) & & (grouping_id#662 = 19322)) +-relationship [grouping _ id#662 Lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet

Thus it can be seen that the execution plan is greatly simplified and we can respond to the hit query of the user in subseconds.

Further optimization

In the actual testing process, we find that when there are multiple Relational Cache, the matching time increases linearly. Because what we store in metastore is the SQL statement of Cache, the time to take the SQL statement and parse again should not be underestimated, which makes the matching process grow obviously, which deviates from our original intention of pursuing subsecond response. Therefore, we build a logical plan cache in Spark, caching the parsed Relational Cache plans in memory, only one copy of each Relational Cache is cached, and the plan itself takes up limited space, so we can cache almost all the optimized logical plans of Relational Cache, so that after the first query, all queries no longer receive the delay of fetching SQL statements and parsing again. After such optimization, the matching time is greatly reduced to the order of magnitude of 100ms.

On how to rewrite the EMR Spark Relational Cache implementation plan to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report