Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the merge operation and performance tuning of delta lake?

2025-02-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

What is the merge operation and performance tuning of delta lake? in order to solve this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.

In view of the complexity of merge operation, we will mainly explain it below.

1.merge operator operation syntax

The sql expression for the merge operation is as follows:

Import io.delta.tables._import org.apache.spark.sql.functions._

DeltaTable.forPath (spark, "/ data/events/") .as ("events") .merge (updatesDF.as ("updates"), "events.eventId = updates.eventId") .Map match .updateExpr (Map ("data"-> "updates.data")) .insertMatched .insertExpr (Map ("date"-> "updates.date", "eventId"-> "updates.eventId") "data"-> "updates.data") .execute ()

Merge coding operations still have some constraints that need to be described in detail.

1. 1 can have (1, 2, 3) wenMatched or whenNotMatched substatements. Among them, the whenMatched operation has at most two statements, and whenNotMatched has at most one child statement.

1.2 when the data of the source table and the target table meet the matching condition, the whenMatched statement is executed. These statements can have the following semantics:

A) the whenMatched statement has at most one update and one delete expression. The update behavior in merge simply updates the specified column of a row of data in the target table that meets the criteria. The delete operation deletes all matching rows.

B) each whenMatched statement can have an optional condition. If the optional condition exists, the update and delete operations perform the corresponding operation on the matching target data only if the optional condition is true.

C) if there are two whenMatched clauses, they will be executed in the order in which they are specified (that is, the order of the clauses is important). The first clause must have a clause condition (otherwise, the second clause will never be executed).

D) if both scheduled match substatements have conditions and neither clause has a condition of true, no modification will be made to the target data.

C) support all columns of the relevant rows in the source dataset that meet the criteria to be updated to the relevant columns of the target detla table at the same time, with the following expression:

WhenMatched (...). UpdateAll ()

Equivalent to:

WhenMatched (...) .updateExpr (Map ("col1"-> "source.col1", "col2"-> "source.col2",...))

Make sure that the source and target tables have the same columns, otherwise an exception will be thrown.

1.3 the whenNotMatched statement is executed when a row of data from the source table does not match the target table under the given condition. The substatement has the following syntax:

A) whenNotMatched only supports insert expression. Depending on the specified column and related conditions, this operation inserts a new piece of data in the target table, and null when the column that exists in the target table is not explicitly specified.

B) whenNotMatched statements can have optional conditions. If an optional condition is specified, the data will be inserted only if the optional condition is true. Otherwise, the source column is ignored.

C) you can also insert data columns that match all source table rows that match the relevant rows of the target table, the expression:

WhenNotMatched (...). InsertAll ()

Equivalent to:

WhenNotMatched (...) .insertExpr (Map ("col1"-> "source.col1", "col2"-> "source.col2",...))

Make sure that the source and target tables have the same columns, otherwise an exception will be thrown.

2.schema check

The merge operation automatically verifies that the insert and update operation production data schema matches the schema of the target table. The rules are as follows:

A) for update and insert behaviors, the specified target column must exist in the target delta lake table.

B) for updateAll and insertAll operations, the source dataset must contain the columns of all target tables. The source dataset can have columns that do not exist in the target table, but these columns are ignored. Of course, you can also configure to keep only the columns of the source dataset.

C) for all operations, if the data type generated by the expression that generates the target column is different from the corresponding column in the target Delta table, merge attempts to cast it to the type in the table.

3. Automatic schema conversion

By default, updateAll and insertAll operations only update or insert columns with the same column names in the target table, and columns that exist only in the source dataset but not in the target table are ignored. But in some scenarios, we want to keep the new columns in the source dataset. First, you need to set one of the parameters described earlier, spark.databricks.delta.schema.autoMerge.enabled, to true.

Note:

A. schema auto-increment is only for updateAll operations or insertAll operations, or both.

b. Only the top-level columns are changed, not nested columns.

c. Update and insert operations cannot explicitly reference a target column that does not exist in the target table (even if it has updateAll or insertAll as one of the clauses).

Comparison of 4.schema inference or not

According to some examples, schema automatic inference is compared with non-automatic inference.

Contrast one

Target column (key,value), source column (key,value,newValue), perform the following sql operation on the source table:

TargetDeltaTable.alias ("t") .merge (sourceDataFrame.alias ("s"), "t.key = s.key"). Compare matched (). UpdateAll () .merge NotMatched (). InsertAll () .execute ()

Without automatic schema inference: the schema information of the target table will not change. Only the key,value column is updated.

Using schema inference: the schema of the table evolves to (key,value,newValue). The updateAll operation, which updates the value and newValue columns. For insertAll operations, the entire row (key,value,newValue) is inserted.

Contrast two

Target table (key,oldValue), source table (key,newValue), execute the following sql on the source table:

TargetDeltaTable.alias ("t") .merge (sourceDataFrame.alias ("s"), "t.key = s.key"). Compare matched (). UpdateAll () .merge NotMatched (). InsertAll () .execute ()

Do not use schema inference: both updateAll and insertAll operations throw exceptions.

Use schema to infer that the shema of the table evolves to (key,oldValue,newValue). The updateAll operation updates the key and value columns, but the oldValue column remains the same. The insertAll operation inserts (key,null,newValue) and the oldValue inserts null.

Contrast three

Target table (key,oldValue), source table (key,newValue), execute the following sql on the source table

TargetDeltaTable.alias ("t") .merge (sourceDataFrame.alias ("s"), "t.key = s.key") .roommatched (). Update (Map ("newValue"-> col ("s.newValue")) .merge NotMatched (). InsertAll () .execute ()

Do not use schema inference: the update operation throws an exception because newValue does not exist in the target table.

Use schema to infer that the update operation throws an exception because newValue does not exist in the target table.

Comparison 4:

Target table (key,oldValue), source table (key,newValue), execute the following sql on the source table

TargetDeltaTable.alias ("t") .merge (sourceDataFrame.alias ("s"), "t.key = s.key") .matching match (). UpdateAll (). Insert (Map ("key"-> col ("s.key"), "newValue"-> col ("s.newValue") .execute ()

Do not use schema inference: the insert operation throws an exception because newValue does not exist in the target table.

Use schema to infer that the insert operation still throws an exception because the newValue does not exist in the target table.

5. Performance tuning

The following methods can effectively reduce the processing time of merge:

a. Reduce the amount of data for matching lookups

By default, the merge operation scans the entire delta lake table for data that meets the criteria. Predicates can be added to reduce the amount of data. For example, the data is partitioned by country and date, and you only want to update yesterday's data for a specific country. You can add some conditions, such as:

Events.date = current_date () AND events.country = 'USA'

In this way, only the data of the specified partition will be processed, which greatly reduces the amount of data scanning. Some conflicts between operations between different partitions can also be avoided.

b. Merge Fil

If there are many small files when the data is stored, it will slow down the reading speed of the data. You can merge small files into some large files to improve the speed of reading. I'll talk about this later.

c. Controls the number of partitions for shuffle

In order to calculate and update the data, the merge operation shuffle the data multiple times. The number of task in the shuffle process is set by the parameter spark.sql.shuffle.partitions, which defaults to 200. This parameter can not only control the parallelism of shuffle, but also determine the number of files output. Although increasing this value can increase the degree of parallelism, it also increases the number of small files.

d. Repartition between written data

For partitioned tables, the merge operation produces a lot of small files, which is much more than the number of shuffle partitions. The reason is that each shuffle task produces more files for multi-partitioned tables, which can be a performance bottleneck. Therefore, it is effective to repartition data before writing using the partitioning column of the table in many scenarios. You can take effect by setting spark.delta.merge.repartitionBeforeWrite to true.

This is the answer to the question about delta lake's merge operation and performance tuning. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel for more related knowledge.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report