Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze Iceberg data Lake's CDC data in Real time by Flink

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Today, I will talk to you about how Flink analyzes the CDC data of Iceberg data Lake in real time. Many people may not know much about it. In order to make you understand better, the editor summarized the following contents for you. I hope you can get something according to this article.

1. Common CDC analysis schemes

Let's take a look at what today's topic needs to design. The input is a CDC or upsert data, and the output is Database or storage for big data's OLAP analysis.

There are mainly two kinds of data in our common input, the first kind of data is the CDC data of the database, and the other scenario is the upsert data generated by stream computing, which has supported upsert data in the latest Flink version 1.12.

1.1 offline HBase cluster analysis of CDC data

The first solution we usually think of is to write the CDC upsert data to HBase in real time after some processing through Flink. HBase is an online database that can provide online point-checking ability. It has very high real-time performance, is very friendly to write operations, can also support some small-scale queries, and the cluster is scalable.

This scheme is actually the same as the ordinary point-and-check real-time link, so what is the problem with using HBase to do the query analysis of big data's OLAP?

First of all, HBase is a database for point-to-point design and an online service, and the index of its row storage is not suitable for analysis tasks. A typical warehouse design must be stored, so that the compression efficiency and query efficiency will be high. Second, the cost of cluster maintenance of HBase is relatively high. Finally, the data of HBase is HFile, which is not convenient to combine with the typical Parquet, Avro and Orc in big data's mileage warehouse.

1.2 Apache Kudu maintains CDC datasets

In view of the weak analytical ability of HBase, a new project appeared in the community a few years ago, which is the Apache Kudu project. Kudu projects use inventory while having the ability to check points of HBase, so that inventory acceleration is very suitable for OLAP analysis.

What's wrong with this scheme?

First of all, Kudu is a relatively small and independent cluster, and the maintenance cost is relatively high, which is relatively separate from HDFS, S3 and OSS. Secondly, because Kudu retains the ability of point search in design, its batch scanning performance is not as good as that of parquet. In addition, Kudu also has weak support for delete. Finally, it does not support incremental pull.

1.3Import CDC directly into Hive analysis

The third scheme, which is also commonly used in data warehouses, is to write the data of MySQL to Hive. The process is: maintain a full partition, then do an incremental partition every day, and finally write the incremental partition once after it is written, and write it to a new partition, which makes sense in the process. The full partition before Hive is not affected by increment, and the partition can be checked only after the incremental Merge is successful, which is a brand new data. This pure storage of append data is very friendly for analysis.

What's wrong with this scheme?

The Merge of incremental data and full data is delayed, and the data is not written in real time. It is typical to do Merge once a day, which is the data of Test1. Therefore, the timeliness is poor and real-time upsert is not supported. Every time Merge needs to reread and rewrite all the data, which is inefficient and a waste of resources.

1.4 Spark + Delta Analysis of CDC data

To solve this problem, Spark + Delta provides the syntax of MERGE INTO when analyzing CDC data. This is not just the syntax simplification of Hive data warehouse, Spark + Delta as a new data lake architecture (such as Iceberg, Hudi), its management of data is not partitioned, but files, so Delta optimizes MERGE INTO syntax to scan and rewrite only changed files, so it is much more efficient.

Let's evaluate this solution, its advantage is that it only relies on Spark + Delta architecture, no online services, storage, and the analysis speed is very fast. The optimized MERGE INTO syntax is also fast enough.

This solution, which is a Copy On Write solution in business, requires only a small number of copy files, which can make the latency relatively low. In theory, if there is not much overlap between the updated data and the existing stock, the day-level delay can be reached to the hour-level delay, and the performance can keep up.

This solution has taken a small step forward in the way Hive repositories deal with upsert data. However, the hourly delay is not as effective as real-time after all, so the biggest disadvantage of this scheme is that the Merge of Copy On Write has some overhead, and the delay cannot be too low.

In the first part, there are about so many existing solutions, and it needs to be emphasized that upsert is so important because in the data lake scheme, upsert is a key technical point to achieve quasi-real-time database access to the lake.

Second, why choose Flink + Iceberg

2.1 Flink support for CDC data consumption

First, Flink natively supports CDC data consumption. In the previous Spark + Delta scheme, the syntax of MARGE INTO, the user needs to perceive the attribute concept of CDC, and then write to the syntax of merge. But Flink natively supports CDC data. Users only need to declare that the SQL on the format,Flink of a Debezium or other CDC does not need to be aware of any CDC or upsert attributes. Hidden column is built into Flink to identify the type data of its CDC, so it is relatively simple for users.

In the following figure example, in the processing of CDC, Flink only declares a DDL statement of MySQL Binlog, and the subsequent select does not perceive the CDC attribute.

2.2 Flink support for Change Log Stream

The following figure shows that Flink natively supports Change Log Stream,Flink. After connecting to a Change Log Stream, the topology does not care about the SQL of Change Log flag. The topology is defined entirely according to its own business logic, and it is written to Iceberg until the end, without Change Log-aware flag in the middle.

2.3 Evaluation of Flink + Iceberg CDC Import Scheme

Finally, what are the advantages of Flink + Iceberg's CDC import scheme?

Compared with the previous solutions, both Copy On Write and Merge On Read have applicable scenarios with different emphasis. In the scenario where Copy On Write updates some files, it is very efficient when only some of the files need to be rewritten, and the resulting data is a full dataset of pure append, which is also the fastest when used for data analysis, which is the advantage of Copy On Write.

The other is Merge On Read, that is, the data, together with CDC flag, is directly append into Iceberg. In merge, these incremental data are merge according to a certain organizational format, a certain efficient calculation method and a full amount of the last data. This has the advantage of supporting near-real-time import and real-time data reading; the Flink SQL of this computing solution natively supports the intake of CDC and does not require additional business field design.

Iceberg is a unified data lake storage, supports a variety of computing models, but also supports a variety of engines (including Spark, Presto, hive) for analysis; the resulting file is pure memory, for the following analysis is very fast; Iceberg as a data lake based on snapshot design, support incremental reading The Iceberg architecture is simple enough, with no online service nodes and pure table format, which gives the upstream platform enough ability to customize its own logic and services.

Third, how to write and read in real time

3.1 batch update scenarios and CDC write scenarios

First of all, let's take a look at two scenarios of batch updates in the entire data lake.

In the scenario of the first batch of updates, we use a SQL to update thousands of rows of data, such as the GDPR policy in Europe. When a user logs out of his or her account, the backend system must physically delete all the relevant data of the user.

The second scenario is that we need to delete some data with common characteristics in date lake. This scenario also belongs to a scenario of batch update. In this scenario, the deletion condition may be any condition, which has nothing to do with the primary key (Primary key). At the same time, the data set to be updated is very large, and this job is a long time-consuming and low-frequency job.

The other is the scenario of CDC writing. For Flink, there are two commonly used scenarios. The first scenario is that the upstream Binlog can be quickly written to data lake and then used by different analysis engines for analysis; the second scenario is to use Flink to do some aggregation operations, and the output stream is a upsert type of data flow, which also needs to be written to the data lake or downstream system for analysis. In the following illustration, CDC writes to the SQL statement in the scenario, and we update a row of data with a single SQL. This calculation mode is a streaming incremental import and is a high-frequency update.

3.2 issues to be considered in Apache Iceberg Design of CDC Writing Scheme

Next, let's take a look at what issues iceberg needs to consider when designing a scenario for CDC writes.

The first is correctness, that is, we need to ensure the correctness of semantics and data, such as upstream data upsert to iceberg, when the upstream upsert stops, the data in iceberg needs to be consistent with the data in the upstream system.

The second is efficient writing, because the write frequency of upsert is very high, we need to maintain high throughput and high concurrency of writes.

The third is fast reading. When the data is written, we need to analyze the data, which involves two problems. The first problem is the need to support fine-grained concurrency. When a job uses multiple task to read, it can ensure a balanced allocation of each task to speed up data calculation. The second problem is that we should give full play to the advantages of column storage to speed up reading.

The fourth is to support incremental reading, such as ETL in some traditional data warehouses, through incremental reading for further data conversion.

3.3 Apache Iceberg Basic

Before introducing the specific details of the scheme, let's take a look at the layout of Iceberg in the file system. Generally speaking, Iceberg is divided into two parts of data. The first part is the data file, such as the parquet file in the figure below. Each data file corresponds to a check file (.crc file). The second part is the table metadata file (Metadata file), including Snapshot file (snap-.avro), Manifest file (.avro), TableMetadata file (* .json) and so on.

The following figure shows the mapping of files in snapshot, manifest, and partition in iceberg. The following figure contains three partition. The first partition has two files F1 and f3, the second partition has two files f4 and f5, and the third partition has a file f2. For each write, a manifest file is generated, which records the correspondence between the file written this time and the partition. Then to the upper layer there is the concept of snapshot, snapshot can help quickly access the full amount of data of the entire table, snapshot records multiple manifest, such as the second snapshot contains manifest2 and manifest3.

3.4 INSERT, UPDATE, DELETE write

After understanding the basic concepts, let's introduce the design of insert, update, and delete operations in iceberg.

The table shown in the SQL of the following figure example contains two fields, id and data, both of which are of type int. In a transaction, we perform the data flow operation in the diagram, first insert a record (1Magne2), and then update the record to (1Magne3). In iceberg, the update operation will be split into two operations: delete and insert.

The reason for this is that considering iceberg as a unified storage layer for streaming batches, splitting the update operation into delete and insert operations can ensure the consistency of the read path when updating the streaming batch scenario. For example, in the scenario of batch deletion, taking Hive as an example, Hive will write the file offset of the line to be deleted into the delta file, and then do a merge on read, because it will be faster, and the original file and delta will be mapped through position during merge. All undeleted records will be available soon.

The next step is to insert the record (3d5), delete the record (1p3), insert the record (2p5), and the final query is that we get the record (3d5) (2jin5).

The above operation looks very simple, but there are some semantic problems in the implementation. In the following figure, the operation of inserting a record (1Magne2) is first performed in a transaction, which writes the INSERT (1Mague 2) in the data file1 file, then deletes the record (1Magin2), which writes the DELETE in the equalify delete file1 (1Magin2), then performs the insert record (1Magin2) operation, which rewrites the INSERT (1Pie2) in the data file1 file, and then performs the query operation.

Under normal circumstances, the query result should return the record INSERT (1Magne2), but in the implementation, the DELETE (1Magne2) operation does not know which line in the data file1 file was deleted, so both lines of INSERT (1Magne2) records will be deleted.

So how to solve this problem? the current way of the community is to adopt Mixed position-delete and equality-delete. Equality-delete deletes by specifying one or more columns, while position-delete deletes according to file path and line number, and combines these two methods to ensure the correctness of the deletion operation.

In the following figure, we insert three rows of records in the first transaction, namely, INSERT (1Magne2), INSERT (1Magne3), and INSERT (1Magin4), and then perform the commit operation to submit. Next, we open a new transaction and insert a row of data (1jue 5). Since it is a new transaction, a new data file2 is created and an INSERT record is written. Next, we delete the record (1jie 5). The actual write to delete is as follows:

Writing in the position delete file1 file (file2, 0) means deleting the record of line 0 in data file2, which is to solve the semantic problem of repeatedly inserting and deleting the same row of data in the same transaction.

Write DELETE (1Magne 5) in the equality delete file1 file. The reason for writing this delete is to ensure that the previous txn writes (1Magne5) can be deleted correctly.

Then a delete operation is performed, and since it has not been inserted in the current transaction, the operation uses the equality-delete operation, that is, to write the record in the equality delete file1. In the above process, we can see that there are three types of files: data file, position delete file and equality delete file in the current scheme.

After understanding the write process, how to read it. As shown in the following figure, for the record in position delete file (file2, 0), you only need to join with the data file of the current transaction, and join the data file in the equality delete file record (1Jing 4) and the previous transaction. Finally, the records INSERT (1Jue 3) and INSERT (1Jue 2) are obtained to ensure the correctness of the process.

Design of 3.5 Manifest files

Insert, update and delete are introduced above, but we have made some designs for manifest when designing the execution plan of task, so that we can quickly find the data file through manifest and split it according to the data size to ensure that the data processed by each task is distributed as evenly as possible.

The following figure shows an example that contains four transaction. The first two transaction are INSERT operations, corresponding to M1 and M2, the third transaction is the DELETE operation, the corresponding M3 is the UPDATE operation, and the fourth transaction contains two manifest files, namely data manifest and delete manifest.

For why split the manifest file into data manifest and delete manifest, it is essentially to quickly find the corresponding delete file list for each data file. You can see the following example. When we read in partition-2, we need to do a join operation between deletefile-4 and datafile-2 and datafile-3, as well as a join operation between deletefile-5 and datafile-2 and datafile-3.

Take datafile-3 as an example, the deletefile list contains two files, deletefile-4 and deletefile-5. How to quickly find the corresponding deletefIle list? we can query according to the upper manifest. When we split the manifest file into data manifest and delete manifest, we can first join M2 (data manifest) and M3, M4 (delete manifest), so that we can quickly get the deletefile list corresponding to datafile.

3.6 concurrency at the file level

Another problem is that we need to ensure high enough concurrent reads, which is done very well in iceberg. Concurrent reading at the file level or even more fine-grained segments in a file can be achieved in iceberg. For example, a file with 256MB can be divided into two 128MB for concurrent reading. Here is an example, assuming that the layout of the insert file and the delete file in the two Bucket is shown in the following figure.

Through manifest comparison, we find that there is only deletefile-4 in the deletefile list of datafile-2, so the two files can be executed as a separate task (Task-2 in the figure), and other files are similar, which ensures that each task data carries out merge operations in a more balanced manner.

We have made a brief summary of this scheme, as shown in the following figure. First of all, the advantages of this scheme can meet the correctness, and can achieve high throughput write and concurrent efficient reading, in addition, it can achieve snapshot-level incremental pull.

At present, the scheme is still relatively rough, and there are some points that can be optimized below.

First, if there are duplicates in the same task, the delete file can be cached, which can improve the efficiency of the join.

Second, when delete file is relatively large and needs to be overwritten to disk, you can use kv lib for optimization, but this does not rely on external services or other heavy indexes.

Third, Bloom filter (Bloom filter) can be designed to filter invalid IO, because a delete operation and an insert operation are generated for the upsert operation commonly used in Flink, which results in a small difference between data file and delete file size in iceberg, so that join will not be very efficient. If you use Bloom Filter, when the upsert data arrives, it will be split into insert and delete operations, and if you filter out those delete operations that have no previous insert data through bloom filter (that is, if this data has not been inserted before, there is no need to write delete records to delete file), this will greatly improve the efficiency of upsert.

Fourth, some background compaction strategies are needed to control the size of delete file files. The less delete file, the more efficient the analysis. Of course, these strategies will not affect normal reading and writing.

3.7 Transaction submission of incremental filesets

The writing of the file was introduced earlier, and in the following figure we show how to write according to the semantics of iceberg and make it available for users to read. It is mainly divided into two parts: data and metastore. First, there will be IcebergStreamWriter to write data, but at this time, the metadata information written to data is not written to metastore, so it is not visible to the outside world. The second operator is IcebergFileCommitter, which collects the data file and writes it through commit transaction.

There is no dependency on any other third-party services in Iceberg, and Hudi does some abstractions of service in some ways, such as abstracting metastore as an independent Timeline, which may rely on some independent indexes or even other external services.

IV. Future planning

The following are some of our future plans, first of all, some optimization of the Iceberg kernel, including full-link stability testing and performance optimization involved in the scheme, and provide some CDC incremental pull related Table API interfaces.

In Flink integration, it realizes the ability of CDC data to merge data files automatically and manually, and provides the ability of Flink to incrementally pull CDC data.

In other ecological integration, we will integrate engines such as Spark and Presto, and speed up data query with Alluxio.

After reading the above, do you have any further understanding of how Flink analyzes the CDC data of Iceberg data lake in real time? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report