Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the merge scene of deltalake?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Deltalake merge scene is like, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

The following are mainly four cases of merge operation.

1. Data deduplication

In fact, when data sources report data in online business, data sources may report data repeatedly for various reasons, which will lead to data duplication. Using the merge function can avoid inserting duplicate data. The specific operation methods are as follows:

Sql

MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueIdWHEN NOT MATCHED THEN INSERT *

Scala

DeltaTable .as ("logs") .merge (newDedupedLogs.as ("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId") .inserNotMatched () .insertAll () .execute ()

Note: the dataset that needs to be written to the delta lake table itself needs to complete the de-duplication operation. We can remove duplicates between the new data and the existing data in the delta lake table through the merge semantic area, but if there is duplicate data inside the new dataset, the duplicate data will still be inserted. Therefore, the de-repetition operation must be completed before writing new data.

If the data is determined to be duplicated within certain time periods, the target table can be partitioned by time so that the time range can be specified during the merge operation.

Sql

MERGE INTO logsUSING newDedupedLogsON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date ()-INTERVAL 7 DAYSWHEN NOT MATCHED AND newDedupedLogs.date > current_date ()-INTERVAL 7 DAYS THEN INSERT *

Scala

DeltaTable.as ("logs") .merge (newDedupedLogs.as ("newDedupedLogs"), "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date ()-INTERVAL 7 DAYS"). Inserted NotMatched ("newDedupedLogs.date > current_date ()-INTERVAL 7 DAYS") .insertAll () .execute ()

This kind of predicate push-down using partitions can greatly reduce the amount of data loading, thus increasing the speed. In addition, for Structured Streaming, you can use insert-only merge operations to achieve continuous deduplication operations. There are mainly the following scenarios:

a. For some streaming operations, you can write data to the delta lake table continuously in the foreachBatch operation, with the function of removing duplicates.

b. For other streaming queries, you can continuously read deduplicated data from the delta lake table. The reason you can do this is that the insert-only merge operation simply appends new data to the delta lake table.

two。 Graded latitude data

Another common operation is SCD Type 2, which maintains a history of all changes made to each key in the dimension table. Such an operation requires updating an existing row to mark the previous value of key as the old value and inserting the new row as the latest value. Given a source table with updates and a target table with dimensional data, you can use merge to express SCD type 2.

Maintaining the history of customer addresses and the valid date range for each address is a common example operation in this section. When you need to update the customer's address, you must mark the previous address as not the current address, update its valid date range, and then add the new address as the current address. The expression of scala is as follows:

Val customersTable: DeltaTable =... / table with schema (customerId, address, current, effectiveDate, endDate)

Val updatesDF: DataFrame =. / / DataFrame with schema (customerId, address, effectiveDate)

/ / Rows to INSERT new addresses of existing customersval newAddressesToInsert = updatesDF .as ("updates") .join (customersTable.toDF.as ("customers"), "customerid") .where ("customers.current = true AND updates.address customers.address")

/ / Stage the update by unioning two sets of rows// 1. Rows that will be inserted in the whenNotMatched clause// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customersval stagedUpdates = newAddressesToInsert .selectExpr ("NULL as mergeKey", "updates.*") / / Rows for 1. Union (updatesDF.selectExpr ("updates.customerId as mergeKey", "*") / / Rows for 2.)

/ / Apply SCD Type 2 operation using mergecustomersTable .as ("customers") .merge (stagedUpdates.as ("staged_updates"), "customers.customerId = mergeKey") .updated match ("customers.current = true AND customers.address staged_updates.address") .updateExpr (Map (/ / Set current to false and endDate to source's effective date). "current"-> "false", "endDate"-> "staged_updates.effectiveDate") .insertMatched () .insertExpr (Map ("customerid"-> "staged_updates.customerId", "address"-> "staged_updates.address", "current"-> "true", "effectiveDate"-> "staged_updates.effectiveDate", / / Set current to true along with the new address and its effective date. "endDate"-> "null") .execute ()

3.cdc operation

Similar to scd, another common case is change data capture, also known as CDC, which simply synchronizes change data from external databases to deta lake. In other words, for update,delete,insert operations in an external database, it also acts on the delta table. This can also be achieved using the merge operation.

Val deltaTable: DeltaTable =. / / DeltaTable with schema (key, value) / / DataFrame with changes having following columns//-key: key of the change//-time: time of change for ordering between changes (can replaced by other ordering id) / /-newValue: updated or inserted value if key was not deleted//-deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame =. / / Find the latest change for each key based on the timestamp// Note: For nested structs Max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF .selectExpr ("key", "struct (time, newValue, deleted) as otherCols") .groupBy ("key") .agg (max ("otherCols"). As ("latest") .selectExpr ("key", "latest.*") deltaTable.as ("t") .merge (latestChangeForEachKey.as ("s")) "s.key = t.key") .insert match ("s.deleted = true") .delete () .updated match () .updateExpr (Map ("key"-> "s.key", "value"-> "s.newValue")) .insert NotMatched ("s.deleted = false") .insertExpr (Map ("key"-> "s.key", "value"-> "s.newValue")) .execute ()

4. Integrate foreachBatch

In fact, when using delta lake, you can combine foreachBatch and merge to implement the upsert function of complex stream query to delta lake table. There are a total of several scenarios:

a. Write the stream aggregation result to delta lake in update mode. This situation is actually more efficient than the Complete model.

Import io.delta.tables.*

Val deltaTable = DeltaTable.forPath (spark, "/ data/aggregates")

/ Function to upsert microBatchOutputDF into Delta table using mergedef upsertToDelta (microBatchOutputDF: DataFrame, batchId: Long) {deltaTable.as ("t") .merge (microBatchOutputDF.as ("s"), "s.key = t.key") .roommatched (). UpdateAll () .roomNotMatched (). InsertAll () .execute ()}

/ / Write the output of a streaming aggregation query into Delta tablestreamingAggregatesDF.writeStream .format ("delta") .foreachBatch (upsertToDelta _) .outputMode ("update") .start ()

b. Synchronize database change operations to delta lake. This scenario is to write the change data to delta lake, which is the third section of this question.

c. The stream data is written to the delta lake in a deduplicated manner. This is the first section of this article.

Note:

Make sure that the merge statement in foreachBatch is idempotent, because restarting the stream query repeats the operation on the same batch of data.

When using merge in foreachBatch, the input data rate of the stream query may be reported as several multiples of the actual rate at which the data is generated at the source. This is because merge reads the input data multiple times, resulting in a multiplier of input metrics. If this is a bottleneck, you can cache the batch DataFrame before the merge, and then uncache it after the merge.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report