In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how to understand the Apache Hudi in the data lake technology. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
With the development of storage formats such as Apache Parquet and Apache ORC and query engines such as Presto and Apache Impala, the Hadoop ecosystem has the potential to serve as a universal unified service layer for minute-level delay scenarios. However, in order to achieve this, it is necessary to achieve efficient and low-latency data intake and data preparation in HDFS.
To solve this problem, Uber developed the Hudi project, which is an incremental processing framework that provides strong support for all business-critical data links with high efficiency and low latency. In fact, Uber has already made Hudi open source. Before delving into Hudi, let's first discuss why it is a good idea to use Hadoop as a unified service layer.
motivation
Lambda architecture is a common data processing architecture, and its data processing depends on the dual computing of streaming computing layer (Streaming Layer) and batch computing layer (Batch Layer). Every few hours, the batch process is started to calculate the exact business status and bulk updates are loaded into the service layer (Serving Layer). At the same time, in order to eliminate the waiting time of the above several hours, we will update the status of this business data in real time in the streaming computing layer. However, the state of the flow calculation is only an approximation of the final result and needs to be covered by the batch calculation result. Because of the difference in state provided by the two modes, we need to provide different service layers for batch and flow processing, and then merge abstractions on this, or design and apply a rather complex service system (such as Druid) to provide excellent performance in both row-level updates and batch loading.
Lambda architecture requires dual computing and dual services
As to whether an additional separate batch layer is needed, the Kappa architecture believes that a separate streaming computing layer is sufficient to be a general solution for data processing. In a broad sense, all data calculations can be described as producers producing a data stream, while consumers constantly iterate through the records in the stream, such as the volcanic model (Volcano Iterator model). This means that the streaming computing layer can rely on heap resources to recalculate and update the business state in a way that increases parallelism. Such systems can rely on effective checkpoints (checkpoint) and a large number of state management so that the result of streaming is no longer just an approximate value. This model is applied to many data acquisition tasks. However, although the batch layer is removed from this model, there are still two problems in the service layer.
Today, many streaming engines support row-level data processing, which requires that our service layer also need the ability to support row-level updates. In general, such systems cannot optimize query scanning for analysis classes to this extent unless we cache a large number of records in memory (such as Memsql) or have powerful index support (such as ElasticSearch). In order to obtain the performance of data intake and scanning, these systems often need to increase the cost and sacrifice the scalability of the service. For this reason, the data residence capacity of such service systems is often limited, and the time may be 30 to 90 days. In terms of total amount, a few TB data is their limit. The analysis of historical data will be redirected to HDFS, which requires less delay.
The Kappa architecture unifies the processing layer, but service complexity still exists
The tradeoff between data intake delay, scanning performance and computing resources and operational complexity is inevitable. However, if our business scenario does not require such a high latency, for example, we can accept a delay of about 10 minutes, and if we have a way to quickly do data intake and data preparation on the HDFS, the Speed Serving in the service layer will not be necessary. This can unify the service layer and greatly reduce the overall complexity and resource consumption of the system.
To use HDFS as a unified service layer, we need not only to make it support the storage of change logs (or logging systems), but also to support business state management that is partitioned, compressed, and de-duplicated according to the actual business dimensions. This type of unified service layer needs to have the following characteristics:
Rapid change capability of large HDFS datasets
Data storage needs to be optimized for analytical scanning (column storage)
The ability to effectively connect and propagate updates to the upper modeling dataset
Compressed business state changes are inevitable, even if we use event time (Event time) as the business partition field. Due to the inconsistency between late data and event time and processing time (Processing time), we still need to update the old partition in the data intake scenario. Finally, even if we use the processing time as a partition field, there are still some scenarios that need to be updated, such as the need to correct the original data for security and audit reasons.
Introduction to Hudi: Hi, Hudi
As an incremental processing framework, our Hudi supports all the requirements described in the previous section. In a word, Hudi is a scan-optimized data storage abstraction for analytical business, which enables the HDFS dataset to support changes within the delay of minutes, as well as the incremental processing of the dataset by downstream systems.
The Hudi dataset is compatible with the current Hadoop ecosystem, including Apache Hive,Apache Parquet,Presto and Apache Spark, through custom InputFormat, allowing end users to interface seamlessly.
Based on Hudi simplified service architecture, minute delay
The data flow model is balanced by the two dimensions of delay and data integrity to build the data pipeline. The following figure shows how Uber Engineering divides how it is processed according to these two dimensions.
Use case distribution of Uber at different levels of delay and integrity
For very few use cases that really need a delay of about 1 minute and simple business metrics presentation applications, we are based on row-level streaming. For the traditional use cases of machine learning and experimental effectiveness analysis, we choose batches that are better at heavy computing. For near-real-time scenarios with complex connections or important data processing, we get the best of both worlds based on Hudi and its incremental processing primitives. To learn more about the use cases and scenarios of Uber using Hudi, take a look at their Githup documentation (https://uber.github.io/hudi/use_cases.html).
Storage of Hudi dataset
The organizational directory structure of the Hudi dataset is very similar to the Hive representation, and a dataset corresponds to this root directory. The dataset is broken up into multiple partitions, and the partition field exists in the form of a folder that contains all the files for that partition. Under the root directory, each partition has a unique partition path. Each partition record is distributed in multiple files. Each file is identified by a unique fileId and the commit that generated the file. If an update operation occurs, multiple files share the same fileId, but there will be different commit.
Each record is identified by the key value of the record and mapped to a fileId. The mapping between a recorded key and fileId is permanently determined once the file is written in the first version. In other words, an fileId identifies a set of files, each containing a specific set of records, and the same records between different files are distinguished by version numbers.
Hudi Storage consists of three different parts:
Metadata-maintains operational metadata on the dataset in the form of a timeline to support a transient view of the dataset, which is stored in the metadata directory under the root directory. There are three types of metadata:
Commits-A separate commit contains information about an atomic write operation to a batch of data on a dataset. We use a monotonously increasing timestamp to identify the commits, which marks the beginning of a write operation.
Cleans-used to clear background activities of older versions of files in the dataset that are no longer used by queries.
Compactions-background activities used to coordinate differences in data structures within Hudi. For example, update operations are collected from row-based log files to column storage data.
Index-Hudi maintains an index to support the rapid mapping of the newly recorded key to the corresponding fileId in the presence of a record key. The implementation of the index is plug-in
Bloom filter-stored in the footer of the data file. The default option does not depend on external system implementation. The data and the index are always consistent.
Apache HBase-can efficiently find a small batch of key. This option may be a few seconds faster during the index tag.
Data-Hudi stores all ingested data in two different storage formats. The design of this block is also plug-in, and users can choose any data format that meets the following conditions:
Read the optimized column storage format (ROFormat). The default value is Apache Parquet
Write optimized row storage format (WOFormat). The default value is Apache Avro
Hudi storage kernel.
Write Hudi file 1. Compaction
Hudi optimizes the usage pattern of HDFS. Compaction is a key operation for converting data from write-optimized format to read-optimized format. The basic parallel unit of Compaction operation is the rewriting of a fileID. Hudi ensures that the size of all data files is aligned with the block size of HDFS, which can strike a balance between the parallelism of Compaction operations, the parallelism of queries and the total number of HDFS files. Compaction operations are also plug-in and can be extended to merge old data files that are not updated frequently to further reduce the total number of files.
two。 Writing mode
Hudi is a third-party library of Spark that runs data intake jobs in the form of Spark Streaming. It is generally recommended that these jobs be processed in a micro-batch of about 1 to 2 minutes. Of course, under the premise of weighing the delay requirements and resource level of our business, we can also use Apache Oozie or Apache Airflow to schedule offline jobs periodically.
In the default configuration, Hudi uses the write path:
Hudi loads the BloomFilter index from the parquet file under the relevant partition and marks whether to update or insert by passing in the key value mapped to the corresponding file. The connection operation here may cause data skew due to the size of the input data, the distribution of partitions, or the number of files under a single partition. By partitioning the connection fields in scope and creating new subpartitions, we can avoid the problem of 2GB restrictions when dealing with Shuffle files in some earlier versions of Spark-https://issues.apache.org/jira/browse/SPARK-6190.
Hudi groups insert by partition, allocates a fileId, and then append the corresponding log files until the file size reaches the HDSF block size. Then, a new fileId is generated, repeating the process until all the data is inserted.
A time-limited compaction operation is scheduled by the background in a few-minute cycle, generates a priority list of compactions, and compresses all avro files contained in an fileId to generate the next version of the current parquet file.
The Compaction operation is asynchronous, locking several specific log versions for compression and updating them to the corresponding fileId with new log records. Locks are maintained in Zookeeper.
The priority order of Compaction operations is determined by the size of the compressed log data and is configurable based on a Compaction policy. During each compression iteration, large files are compressed first, because the overhead of rewriting parquet files is not apportioned according to the number of updates to the file.
When Hudi updates a fileId, it will append if the corresponding log file exists, otherwise, it will create a new log file.
If the data ingest job is successful, a commit record is recorded in the Hudi metadata timeline, renaming the inflight file to the commit file, and recording the details of the partition and the fileId version created.
3. HDFS block alignment
As mentioned above, Hudi strives to align the file size with the underlying block size of HDFS. Depending on the total amount of data under a partition and the compression effect of column storage, compaction operations can still create parquet small files. Because inserting partitions will be done with updates to existing small files, all of these small file problems will eventually be fixed iteratively. Eventually, the file size grows until it matches the size of the HDFS block.
4. Fault recovery
First of all, Spark's own retry mechanism will cover some intermittent exceptions, and of course, if we exceed the threshold of the number of retries, our entire job will fail. The next iteration will retry on the same batch of data. Two important differences are listed below:
Ingestion failures may generate avro blocks that contain part of the data in the log file-this problem is resolved by storing the starting offset of the corresponding block and the log file version in the metadata. When reading a log file, occasional partially written blocks are skipped and the avro file is read from the correct location.
The failure of the Compaction process produces a parquet file that contains part of the data-this problem is resolved during the query phase and the file version is filtered through commit metadata. The query phase only reads the latest completed compaction files. These failed compaction files are rolled back in the next compaction cycle.
Read Hudi file
Commit timeline metadata allows us to have both read-optimized and real-time views on the same HDFS data. The client can decide which view to use based on latency requirements and query performance. Hudi provides these two views with a custom InputFormat and a Hive registration module, which can register both views as Hive Metastore tables. Both input formats recognize fileId and commit times, and can filter and read newly submitted files. Hudi then generates input fragments based on these data files for query use.
The specific information of InputFormat is as follows:
HoodieReadOptimizedInputFormat-provides a view of scanning optimization, filtering all log files and getting the latest version of the parquet zip file
HoodieRealtimeInputFormat-provides a real-time view that, in addition to getting the latest parquet compressed files, provides a RecordReader to merge log files related to parquet files.
Both types of InputFormat extend MapredParquetInputFormat and VectorizedParquetRecordReader, so all optimizations for parquet files are retained. Depending on the hoodie-hadoop-mr class library, Presto and Spark SQL can use Hive Metastore tables in Hudi format out of the box.
Hudi filters out the latest versions and merges records with log files before providing them
Incremental processing
As mentioned earlier, data model tables need to be processed and provided in HDFS so that HDFS can be regarded as a unified service layer. Building low-latency data model tables requires the ability to link HDFS dataset memory incremental processing. Because Hudi maintains the commit time of each submission and the corresponding file version in the metadata, we can extract incremental change datasets from specific Hudi datasets based on the start and end timestamps.
This process is basically the same as an ordinary query, except that the file version within a specific time range is selected for reading instead of the latest one, and the submission time is most filtered by the predicate down to the file scanning phase. This incremental result set is also affected by automatic file cleanup, and if files within a certain time range are automatically cleaned out, it is naturally inaccessible.
In this way, we can do dual-stream join and stream and static data join based on watermark to calculate and upsert the data model tables stored in HDFS.
Modeling process based on Hudi incremental calculation
On how to understand the data lake technology in the Apache Hudi to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.