Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of NetEase data Lake Iceberg

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Editor to share with you the example analysis of NetEase data Lake Iceberg, I believe that most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!

01 pain points in the construction of data warehouse platform

Pain point 1:

In the early morning, some of our large offline tasks are often delayed for some reason, which will lead to unstable output time of the core report, sometimes the output is relatively early, but sometimes the output is relatively late, and the business is difficult to accept.

Why did this phenomenon happen? At present, there are roughly the following elements:

The amount of data requested by the task itself will be very large. Generally speaking, the original amount of data in a day may be dozens of TB. Hundreds of partitions, even thousands of partitions, with a file count of 50,000 +. If you read these files in full, hundreds of partitions will send hundreds of requests to NameNode, and we know that NameNode is very stressful when offline tasks are running in the wee hours of the morning. Therefore, it is very likely that the Namenode response is very slow, and if the request response is slow, the task initialization time will be very long.

The ETL efficiency of the task itself is relatively inefficient, which does not mean that the Spark engine is inefficient, but that our storage is not particularly good. For example, at present, if we check a partition, we need to scan all the files and analyze them, but in fact, I may only be interested in some files. So relatively speaking, the scheme itself is relatively inefficient.

Once such a large offline task encounters a bad disk or machine downtime, it needs to be retried, which takes a long time, such as dozens of minutes. If you try again once or twice, the delay will be larger.

Pain point 2:

In terms of some trivial issues. Here are three scenarios to analyze:

Unreliable update operation. We often perform operations such as insert overwrite during the ETL process, which first delete the data of the corresponding partition, and then load the generated files into the partition. When we remove files, many tasks that are reading these files will have exceptions, which is an unreliable update operation.

Table Schema changes are inefficient. At present, we are doing some operations to add fields and change partitions to the table is actually very inefficient, we need to read out all the original data and then write it back. This will be very time-consuming and inefficient.

The reliability of data is not guaranteed. Mainly for our operation on partitions, we will divide the partition information into two places, HDFS and Metastore, and store one copy each. In this case, if an update operation is carried out, it is possible that one update succeeds and another fails, resulting in unreliable data.

Pain point 3:

There are many problems in the construction of real-time data warehouse based on Lambda. As shown in the architecture diagram above, the first link is based on a real-time link of kafka transit (delay requirement is less than 5 minutes), the other is offline link (delay is greater than 1 hour), and some companies even have a third quasi-real-time link (delay requirement of 5 minutes to 1 hour), or even more complex scenarios.

The two links correspond to two pieces of data, and in many cases, the processing results of the real-time link and the offline link do not match.

Kafka cannot store large amounts of data and cannot efficiently query data in Kafka based on the current OLAP analysis engine.

The maintenance cost of Lambda is high. Code, data consanguinity, Schema and so on all need two sets. Operation and maintenance, monitoring and other costs are very high.

Pain point 4:

Efficient updating of scenarios is not supported amicably. There are generally two update scenarios for big data. One is the update of CDC (Change Data Capture). Especially in the case of e-commerce, the update deletion in binlog is synchronized to HDFS. The other is to delay the update of the aggregated results brought by the data. Currently, HDFS only supports additional writes, not updates. Therefore, many companies in the industry have introduced Kudu. But Kudu itself has some limitations, such as computing storage is not separated. In this way, HDFS, Kafka and Kudu are introduced into the data warehouse system, and the operation and maintenance cost is not insignificant.

The above is a general introduction to the four pain points involved in the current data warehouse, so we also hope to be helpful to the construction of the data warehouse in these four aspects through the research and practice of the data lake. Next, we will focus on some thoughts on the data lake.

02 data Lake Iceberg Core principle

1. Research on Open Source products of data Lake

The data Lake has been popular since 19 years. At present, there are several core data Lake open source products on the market:

DELTA LAKE, DataBricks made the commercial version of DELTA LAKE in 17 years. What I mainly want to solve is the storage problem based on Lambda architecture. Its original intention is to turn Lambda architecture into kappa architecture through a kind of storage.

Hudi (Uber open source) can support fast updates and incremental pull operations. This is one of its biggest selling points.

The original intention of Iceberg is to do standard Table Format and efficient ETL.

The above picture is from some research and comparison of data Lake programs conducted by Ali Flink Group. On the whole, the basic functions of these programs are relatively perfect. The basic functions I am talking about include:

Changes to efficient Table Schema, such as adding or subtracting partitions, adding or subtracting fields, etc.

ACID semantic guarantee

At the same time, streaming batch reading and writing are supported, and there will be no dirty reading.

Support cheap storage such as OSS

two。 There are, of course, some differences:

The main feature of Hudi is to support fast update deletion and incremental pull.

The main feature of Iceberg is that the code is highly abstract and does not bind any Engine. It exposes a very core table-level interface, which can be easily interfaced with Spark/Flink. However, Delta and Hudi are basically heavily coupled to spark. If you want to connect to flink, it is relatively difficult.

3. The reason why we chose Iceberg is:

Now the domestic real-time data warehouse construction around the flink situation will be a little more. So being able to expand the ecology based on flink is a more important point for us to choose iceberg.

There are also many important forces in domestic Iceberg-based development, such as Tencent team and Ali Flink official team, whose data lake selection is also Iceberg. At present, they lead the ecological docking of update and flink respectively in the community.

4. Next, let's focus on Iceberg:

This is from an official introduction to Iceberg, which roughly means that Iceberg is an open source data lake based on tabular format. Let me give you a detailed introduction about table format:

The figure on the left shows an abstract data processing system, which is composed of SQL engine, table format, file collection, and distributed file system. On the right are the corresponding real-life components, SQL engines such as HiveServer, Impala, Spark, etc., table format such as Metastore or Iceberg, file collections mainly include Parquet files, and the distributed file system is HDFS.

For table format, I think it mainly contains four levels of meaning, namely, the definition of table schema (whether complex data types are supported or not), the organization of files in the table, table-related statistics, table index information and the API implementation of reading and writing tables. The details are as follows:

The table schema defines a table that supports field types, such as int, string, long, and complex data types.

The most typical form of file organization in the table is the Partition schema, Range Partition or Hash Partition.

Metadata data statistics.

Encapsulate the read and write API of the table. The upper engine reads or writes data in the table through the corresponding API.

A component that is almost equivalent to Iceberg is Metastore. But Metastore is a service, and Iceberg is a jar package. Here is a comparative introduction of Metastore and Iceberg in four aspects of table format:

① makes no difference at the schema level:

All support int, string, bigint and other types.

The ② partition implementation is completely different:

There is a big difference between the two on partition:

The partition field in metastore cannot be a table field because the partition field is essentially a directory structure, not a column of data in the user table. Based on metastore, if you want to locate all the data under a partition, you first need to locate the location information of the directory corresponding to the partition in metastore, then execute the list command on HDFS to get all the files under this partition, and scan these files to get all the data under this partition.

The partition field in iceberg is a field in the table. Each table in Iceberg has a corresponding file metadata table, and each record in the file metadata table represents the relevant information of a file, and one of these fields is the partition field, indicating the partition where the file is located.

Obviously, the iceberg table has one less step than metastore in locating files according to partition, which is to execute the list command on HDFS to get the files under the partition according to the directory information.

Just imagine, for a large table with a secondary partition, the primary partition is an hourly partition, and the secondary partition is an enumerated field partition. If there are 30 secondary partitions under each primary partition, then this table will have 24 * 30 = 720 partitions per day. Based on Metastore's partition scheme, if a SQL wants to scan yesterday's data based on this table, it needs to send 720 list requests to Namenode, and if scanning one week's data or one month's data, the number of requests is even more exaggerated. In this way, on the one hand, it will lead to a lot of pressure on Namenode, on the other hand, it will also lead to a great delay in the response of SQL requests. However, the partition scheme based on Iceberg does not have this problem at all.

The implementation granularity of ③ table statistics is different:

The statistics of a table in Metastore are statistics of table / partition level granularity, such as the number of records recorded in a column in a table, the average length, the number of records for null, the maximum / minimum, and so on.

The statistics in Iceberg are accurate to the file granularity, that is, each data file records the number of records, average length, maximum and minimum values of all columns.

Obviously, the statistics of file granularity are more effective for filtering predicates (that is, where conditions) in the query.

④ reads and writes API implementations differently:

In metastore mode, the upper engine writes a batch of files and calls the add partition interface of metastore to add these files to a partition.

In Iceberg mode, the upper layer business writes a batch of files and calls the commit API of iceberg to submit this write to form a new snapshot snapshot. This submission method ensures the ACID semantics of the table. At the same time, incremental pull can be realized based on snapshot snapshot submission.

Summarize the advantages of Iceberg over Metastore:

New partition mode: avoid calling namenode's list method n times during query, reduce namenode pressure, and improve query performance.

New metadata mode: file-level column statistics can be used to filter files according to where fields, which can greatly reduce the number of scanned files and improve query performance in many scenarios.

New API mode: integrated storage batch stream

1. Streaming write-incremental pull (based on Iceberg unified storage mode can meet the needs of business batch reading and incremental subscription at the same time)

two。 Batch streams are supported to read and write the same table at the same time, unified table schema. FileNotFoundException will not appear during task execution.

The improvement of Iceberg is reflected in:

03 current situation of Iceberg Community in data Lake

Currently, the main computing engines supported by Iceberg include Spark2.4.5, Spark 3.x and Presto. At the same time, some operation and maintenance work, such as snapshot expiration, small file merge, incremental subscription consumption and other functions can be achieved.

On this basis, the main functions being developed by the community are Hive integration, Flink integration and supporting Update/Delete functions. I believe that the next version will be able to see the relevant features of Hive/Flink integration.

04 the practical way of NetEase data Lake Iceberg

In view of the current large number of ETL tasks, Iceberg can greatly improve the efficiency of ETL task execution, mainly due to the fact that there is no need to request NameNode partition information in the new Partition mode, and because many data files that do not meet the criteria can be filtered in the file-level statistics mode.

The current iceberg community only supports Spark2.4.5, and we have done more work on computing engine adaptation on this basis. It mainly includes the following:

Integrate Hive. You can create and delete iceberg tables through Hive, and query data in Iceberg tables through HiveSQL.

Integrate Impala. Users can create a new iceberg inner table\ table through Impala, and query the data in the Iceberg table through Impala. Currently, this feature has been contributed to the Impala community.

Integrate Flink. A sink implementation of Flink to Iceberg has been implemented, and businesses can consume data from kafka to write the results to Iceberg. At the same time, we implement the function of asynchronous merging of small files based on Flink engine, so that Flink can write data files and merge small files at the same time. The merging of small files based on Iceberg is submitted by commit, so there is no need to delete the small files before the merge, so it will not cause any exception to the read task.

The above is all the contents of the article "sample Analysis of NetEase data Lake Iceberg". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report