Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the principle Analysis of Apache Hudi and Presto

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will introduce to you how to realize the principle analysis of Apache Hudi and Presto. The content of the article is good. Now I would like to share it with you. Friends who feel in need can understand it. I hope it will be helpful to you. Let's read it along with the editor's ideas.

1. Overview

Apache Hudi is a fast iterative data lake storage system that can help enterprises build and manage PB-level data lakes. Hudi brings streaming capabilities to batch processing by introducing primitives such as upserts, deletes and incremental queries. These features enable the unified services layer to provide faster and more fresh data. Hudi tables can be stored in Hadoop-compatible distributed file systems or cloud object storage, and integrate Presto, Apache Hive, Apache Spark and Apache Impala well. Hudi pioneered a new model (data organization) that writes files to a more managed storage layer that interoperates with major query engines and has some interesting experience in project evolution.

This blog discusses the evolution of Presto and Hudi integration, as well as the upcoming file Listing and query plan optimization for Presto-Hudi queries.

2. Apache Hudi

Apache Hudi (Hudi for short) provides the storage of very large datasets on DFS, while making streaming if batch processing is the same, the implementation is mainly implemented through the following two primitives.

Update/Delete records: Hudi supports updating / deleting records, using file / record level indexes, and providing transaction guarantees for write operations. The query can take the most recently submitted snapshot to produce the results. Change Streams: Hudi also supports incremental fetching of all updated / inserted / deleted records in the table and incremental queries starting at a specified point in time.

The figure above illustrates the primitives of Hudi, which can be used to unlock stream / incremental processing functions directly on top of the DFS abstraction. Similar to consuming events directly from Kafka Topic and then incrementally calculating temporary results using state storage, this architecture has many advantages.

Improve efficiency: data intake often requires processing updates (such as CDC), deletions (legal privacy regulations), and enforcing primary key constraints to ensure data quality. However, due to the lack of standard tools, data engineers often need to use batch jobs to reprocess events throughout the day or reload all upstream data every time they run, which can lead to a lot of waste of resources. Because Hudi supports record-level updates, only the updated / deleted records in the table need to be reprocessed, greatly improving the processing efficiency without having to rewrite all partitions or events of the table. Faster ETL/ derivation pipeline: it is also common to use Apache Spark/Apache Hive or any other data processing framework to build derived data pipelines for use cases such as data warehousing, machine learning function extraction, or even just analysis, once data is ingested from an external source. Typically, this process once again relies on batch jobs represented by code or SQL, batch processing all input data and recalculating all output. You can significantly speed up this data pipeline by querying one or more input tables using incremental queries instead of regular snapshot queries, so that only incremental changes from upstream tables are processed, and then performing upsert or delete operations on the target derived tables, as shown in the first figure. Newer data access: usually we add more resources (such as memory) to improve performance metrics (such as query latency). Hudi has fundamentally changed the traditional way of managing datasets, which may be the first time since the emergence of the big data era. Batch processing incrementally can make the pipeline run for much less time. Compared with previous data lakes, data can now be queried more quickly. Unified storage: based on the above three advantages, faster and lighter processing on the existing data lake means that there is no need to use specialized storage or data marts just for near-real-time data access. 2.1 Hudi tables and query types 2.1.1 Table types

Hudi supports the following two types of tables

Copy On Write (COW): stores data in a column storage format such as parquet, updating the version / rewriting data synchronously as it is written.

Merge On Read (MOR): use column storage format (such as parquet) + row storage (such as Avro) to store data. Updates are incrementally written to the delta file, which is then compressed synchronously / asynchronously to produce a new version of the column file.

The following table summarizes the trade-off of the two table types.

Trade-offCopyOnWriteMergeOnRead data latency is higher and lower update overhead (Imax O) higher (rewrite entire parquet file) lower (write incremental log file) Parquet file size is smaller (high update (Imax 0) overhead) larger (low updaet overhead) write enlarged lower (decided with Compaction strategy)

2.1.2 query type

Hudi supports the following query types

Snapshot query: queries the latest snapshot of a given commit/compaction table. For Merge-On-Read tables, near real-time data (minutes level) is provided by merging base and incremental files; for Copy-On-Write tables, a pluggable replacement is provided for existing Parquet tables, as well as upsert/delete and other features.

Incremental query: query the newly written data after a given commit/compaction to provide a change stream for the incremental pipeline.

Read optimized query: query the latest snapshot of the table for a given commit/compaction. Only the latest version of the base / column data file is provided, and the same column query performance as non-Hudi tables is guaranteed.

The following table summarizes the trade-off between different query types.

Trade-off snapshot read optimizes data latency lower and higher query latency COW: the same as the parquet table. MOR: higher (merging basic / column files and row memory increment files) has the same column query performance as COW snapshot queries

The following animation briefly demonstrates the steps of inserting / updating how to store in the COW and MOR tables, as well as the query results along the timeline. Where the X axis represents the timeline and query results for each query type.

Notice that the commit of the table is completely merged into the table as part of the write operation. For updates, the file containing the record is rewritten with the new values of all changed records. For inserts, the record is first written to the smallest file in each partition path until it reaches the configured maximum size. The remaining records will be written to the new file id group, ensuring that the size requirements are met again.

MOR and COW go through the same steps in terms of data intake. Updates are written to the latest log (delta) file that belongs to the latest file version without merging. For insertion, Hudi supports 2 modes:

Write log files-when the Hudi table can index log files (such as HBase indexes and upcoming record-level indexes). Write to the parquet file-when the Hudi table cannot index the log file (for example, the Bloom index).

The incremental log files are then merged with the underlying parquet file through a compaction operation in the Timeline. This table type is the most generic and highly advanced, providing a great deal of flexibility for writes (specifying different compression strategies, handling sudden write traffic, and so on) and queries (such as tradeoffs between data freshness and query performance).

3. Early Presto integration scheme of Presto3.1

Hudi was designed in mid to late 2016. That's when we started to integrate with the query engine in the Hadoop ecosystem. To achieve this in Presto, we introduced a custom annotation @ UseFileSplitsFromInputFormat, as suggested by the community. Any registered Hive table (if you have this note) will get the slice by calling the corresponding inputformat's getSplits () method (instead of the Presto Hive native slice loading logic). For the Hudi table queried by Presto, simply call HoodieParquetInputFormat.getSplits (). The integration is very simple, as long as you put the corresponding Hudi jar package in the / plugin/hive-hadoop2/ directory. It supports querying COW Hudi tables and reading optimized queries for MOR Hudi tables (only getting data from compressed basic parquet files). At Uber, this simple integration already supports more than 100000 Presto queries per day from 100PB data (raw data and model tables) in HDFS managed by Hudi.

3.2 remove InputFormat.getSplits ()

Calling inputformat.getSplits () is a simple integration, but it can lead to a large number of RPC calls to NameNode, and the previous integration approach has several disadvantages.

Insufficient InputSplits returned from Hudi. Presto needs to know the file status and block location returned by each InputSplit. Therefore, for each slice multiplied by the number of partitions loaded, this will add 2 additional NameNode RPC calls. Sometimes, NameNode is under a lot of pressure and back pressure is observed. In addition, for each partition loaded in the Presto Split calculation (each loadPartition () call), HoodieParquetInputFormat.getSplits () will be called. This results in redundant Hudi table metadata Listing, which can actually be reused by all partitions belonging to tables scanned from the query.

We began to rethink Presto-Hudi 's integration plan. At Uber, we change this implementation by adding a compile-time dependency to Hudi and instantiate HoodieTableMetadata once in the BackgroundHiveSplitLoader constructor. Then we use Hudi Api to filter the partition file instead of calling HoodieParquetInputFormat.getSplits (), which greatly reduces the number of NameNode calls in that path.

To popularize this approach and make it available to the Presto-Hudi community, we have added a new API to the DirectoryLister interface of Presto, which will accept PathFilter objects. For the Hudi table, we provide the PathFilter object HoodieROTablePathFilter, which will be responsible for filtering the files pre-listed for querying the Hudi table and getting the same results as the Uber internal solution.

This change is provided from the 0.233 version of Presto, depending on the Hudi version of 0.5.1-incubating. Because Hudi is now a compile-time dependency, you no longer need to provide the Hudi jar file in the plugin directory.

3.3 Presto supports querying Hudi MOR tables

We are seeing more and more people in the community interested in snapshot queries that use Presto to support Hudi MOR tables. Previously, Presto only supported query Hudi table read optimization query (pure column data). With the integration of the PR https://github.com/prestodb/presto/pull/14795, Presto (0.240 and later) now supports snapshot queries that query MOR tables, which will make newer data available for queries by merging base files (parquet data) and log files (avro data) at read time.

In Hive, this can be achieved by introducing a separate InputFormat class that provides a way to handle slices and introduces a new RecordReader class that scans slices for records. For querying MOR Hudi tables using Hive, similar classes are already available in Hudi:

InputFormat-org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormatInputSplit-org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplitRecordReader-org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader supports this in Presto requires understanding how Presto fetches records from the Hive table and makes the necessary modifications in that layer. Because Presto uses its native ParquetPageSource instead of InputFormat's record reader, Presto will display only the basic Parquet file, not real-time updates from the Hudi log file, which is avro data (essentially the same as a normal read-optimized Hudi query).

In order for Hudi real-time queries to work properly, we identified and made the following necessary changes:

Add additional metadata fields to the serializable HiveSplit to store Hudi slice information. Presto-Hive converts its split into a serializable HiveSplit for delivery. Because it requires standard slices, it loses the context of any additional information contained in complex slices extended from FileSplit. Our first idea is to simply add the entire slice as an additional field for HiveSplit. This does not work, however, because complex slices are not serializable and basic slice data is copied.

Instead, we added a CustomSplitConverter interface. It accepts a custom slice and returns an easy-to-serialize String- > String Map that contains additional data from the custom slice. To achieve this, we also add this Map to Presto's HiveSplit as an additional field. We created HudiRealtimeSplitConverter to implement the CustomSplitConverter interface for Hudi real-time queries.

Recreate the Hudi slice from the extra metadata of HiveSplit. Now that we have the complete information about the custom slices contained in HiveSplit, we need to identify and recreate the HoodieRealtimeFileSplit before reading the slices. The CustomSplitConverter interface has another method that accepts normal FileSplit and additional split information mappings and returns the actual complex FileSplit, in this case HudiRealtimeFileSplit.

Use HoodieRealtimeRecordReader in HoodieParquetRealtimeInputFormat to read the recreated HoodieRealtimeFileSplit. Presto needs to use a new record reader to properly handle the extra information in HudiRealtimeFileSplit. To do this, we introduced another annotation, @ UseRecordReaderFromInputFormat, similar to the first annotation. This instructs Presto to use the Hive record cursor (using InputFormat's record reader) instead of PageSource. The Hive record cursor can understand the re-created custom slice and set additional information / configuration based on the custom slice.

With these changes, Presto users can query more fresh data in the Hudi MOR table.

4. Plan for the next step

Here are some interesting tasks (RFCs) that may also need to be supported in Presto.

RFC-12: Bootstrapping Hudi tables efficiently

ApacheHudi maintains metadata for each record, enabling us to provide record-level updates, unique key semantics, and database-like change flows. However, this means that to take advantage of the upsert and incremental processing power of Hudi, the user needs to rewrite the entire dataset to make it an Hudi table. This RFC provides a mechanism to migrate their datasets efficiently without having to rewrite the entire dataset, while providing all the capabilities of Hudi.

This will be achieved through the mechanism of referencing external data files (from the source table) in the new bootstrap Hudi table. Because the data may reside in external locations (bootstrap data) or under the basepath of the Hudi table (the most recent data), FileSplits will need to store more metadata in those locations. This work will also leverage and build on the Presto MOR query support we currently add.

Support Hudi table increment and time point time travel query

Incremental queries allow us to extract the change log from the source Hudi table. A point-in-time query allows you to get the status of the Hudi table between time T1 and T2. These are already supported in Hive and Spark. We are also considering supporting this feature in Presto.

In Hive, incremental queries are supported by setting some configurations in JobConf, such as-query mode set to INCREMENTAL, start commit time, and the maximum number of commits to use. There is a specific implementation in Spark to support incremental queries-IncrementalRelation. To support this in Presto, we need a way to identify incremental queries. If Presto does not pass the session configuration to the hadoop Configuration object, the original idea is to register the same table as an increment table in metastore. Then use the query predicate to get other details, such as the start commit time, the maximum commit time, and so on.

RFC-15: query plan and Listing optimization

Hudi write client and Hudi queries require a listStatus operation on the file system to get the current view of the file system. There are a lot of optimizations for Listing in the Uber,HDFS infrastructure, but this can be an expensive operation for large datasets with thousands of partitions and for large datasets with thousands of files per partition on cloud / object storage. The above RFC work is designed to eliminate Listing operations and provide better query performance and faster lookups by gradually compressing Hudi's timeline metadata into a snapshot of the table state.

The solution is designed to address:

Stores and maintains statistics for all columns in the metadata maintenance table for the latest file to help effectively trim the file before scanning, which can be used during the query planning phase of the engine.

To do this, Presto also needs some changes. We are actively exploring ways to leverage this metadata during the query planning phase. This will be an important complement to Presto-Hudi integration and will further reduce query latency.

Record level index

Upsert is a popular write operation on Hudi tables that relies on indexes to mark incoming records as Upsert. HoodieIndex provides record id to file id mapping in partitioned or non-partitioned datasets, with BloomFilters/Key ranges (for temporary data) and Apache HBase (for random updates) support. Many users find that Apache HBase (or any similar key-value-store-backed index) is expensive and adds operational overhead. This work attempts to propose a new index format for record-level indexes, which is implemented in Hudi. Hudi will store and maintain record-level indexes (supported by HFile, RocksDB and other pluggable storage implementations). This will be used by writer (ingestion) and reader (ingestion / query), and will significantly improve upsert performance, rather than a join-based approach, or a Bloom index used to support random update workloads. This is another area where the query engine can take advantage of this information when pruning files before listing them. We are also considering a way to leverage metadata in Presto when querying.

The above is how to achieve Apache Hudi and Presto principle analysis of all the content, more and how to achieve Apache Hudi and Presto principle analysis related content can search the previous article or browse the following article to learn ha! I believe the editor will add more knowledge to you. I hope you can support it!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report