Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to read and write deltalake flow table of data lake

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces you how to read and write the deltalake stream table of the data lake, the content is very detailed, interested friends can refer to it, I hope it can help you.

Delta lake and spark structured streaming can be deeply integrated. Delta lake overcomes many common limitations associated with streaming systems and file integration, as follows:

Guarantees that multiple streams (or concurrent batch jobs) are processed only once.

When files are used as stream sources, it is possible to efficiently discover which files are new.

1. as a stream source

1.1 case explanation

When your structured streaming uses delta lake as the stream source, the application processes data already in the delta table, as well as data added to the delta table.

spark.readStream.format("delta").load("/delta/events")

Some optimizations can also be made, as follows:

a. Control the number of microbatches loaded from delta lake by structured streaming via maxFilesPerTrigger configuration. Structured streaming is also a micro-batch concept. This parameter controls the maximum number of new files calculated by each trigger. The default value is 1000. The actual situation should be controlled according to the amount of data and resources.

b. MaxBytesPerTrigger controls the maximum amount of data processed by each trigger. This is to set a " soft max," which means that a batch can process approximately this amount of data, and may process more than this limit. If Trigger.Once is used, this configuration is invalid. If this configuration is used in conjunction with maxFilesPerTrigger, either parameter will take effect when the threshold condition is met.

1.2 Ignore updates and deletes

Structured streaming does not process input data that is not appended and throws an exception if any modifications are made to the delta table as source. For common enterprise scenarios of change, two strategies are provided to handle the impact of delta table changes on structured streaming tasks:

You can delete the output and checkpoint and restart structured streaming to compute the data, that is, recalculate once.

You can set one of two options:

Ignore Deletes: Ignore transactions that delete data in partitioned tables.

ignoreChanges: Reprocess updated files if they have to be overwritten in the source table due to data change operations such as UPDATE, MERGE INTO, Deleted (within partition), or OVERWRITE. So unchanged rows may still be processed and transmitted downstream, so downstream of structured streaming should be able to handle duplicate data. Deletions are not transmitted downstream. ignoreChanges contains ignoreDeletes. Therefore, if you use ignoreChanges, the flow is not interrupted by deletions or updates of the source table.

1.3 case

Suppose you have a table called user_events, with three fields: date,user_email, and action, and the table is partitioned by the date field. The structured streaming section handles this table and has programs that insert and delete from the delta table.

Assuming that it is only a delete operation, you can configure the stream as follows:

events.readStream .format("delta") .option("ignoreDeletes", "true") .load("/delta/user_events")

Assuming delta table modification operation, you can configure stream as follows:

events.readStream .format("delta") .option("ignoreChanges", "true") .load("/delta/user_events")

If a value in the user_email field is updated using the UPDATE statement, the file containing the associated user_email will be overwritten, as described later in the delta lake change operation implementation mechanism. When using ignoreChanges, the new record is transmitted downstream along with all other unchanged records in the same file. So downstream programs should be able to handle these incoming duplicate records.

2.delta table as sink

delta table can be used as a sink for Structured Streaming. Delta lake's transaction log ensures that it can be processed only once.

2.1 append mode

The default is append mode, which simply appends data to the delta table:

events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path

2.2 complete mode

You can also overwrite the entire table once per batch using Structured Streaming. This pattern is used in some aggregation scenarios:

.format("delta") .load("/delta/events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg") .start("/delta/eventsByCustomer")

For applications with more lenient latency requirements, Trigger.Once can be used to save computational resources. Once trigger processes data from the beginning to the latest, the typical kappa model fits this scenario well.

How to read and write the deltalake stream table of the data lake is shared here. I hope the above content can be of some help to everyone and can learn more knowledge. If you think the article is good, you can share it so that more people can see it.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report