Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the use of Apache Hudi

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

What is the use of Apache Hudi? I believe many inexperienced people are at a loss about it. Therefore, this article summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.]

Real-time data processing and real-time data

Real-time divided into real-time processing and real-time impromptu analysis of data requires real-time processing of data. The corresponding results Flink and Spark Streaming are used for real-time processing of real-time data. Real-time data is required for real-time processing, data processing is not real-time, and the scene where processing is not timely is our data warehouse.

In the Apache Hudi discussed in this paper, the corresponding scenario is the real-time data, not the real-time processing. It aims to map the time in Mysql to big data platform, such as Hive, in near real time.

Business scenarios and technology selection

The traditional offline data warehouse, usually the data is Tunable 1, can not meet the needs of current-day data analysis, while streaming computing is generally based on the window, and the window logic is relatively fixed. The author's company has a kind of special needs, business analysis is more familiar with the data structure of the existing transaction database, and hope to have a lot of impromptu analysis, these analyses include the real-time data of the day. It is customary for them to do the corresponding analysis and calculation directly through Sql based on the Mysql slave library. But many times you will encounter the following obstacles

When the amount of data is large and the analysis logic is complex, it takes a long time for Mysql to get from the database.

Some cross-library analysis cannot be implemented.

As a result, some technical frameworks that bridge the gap between OLTP and OLAP have emerged, typically TiDB. It can support both OLTP and OLAP. On the other hand, Apache Hudi and Apache Kudu are equivalent to the bridges between existing OLTP and OLAP technologies. They can store data in existing OLTP data structures, support CRUD, and provide integration with existing OLAP frameworks (such as Hive,Impala) for OLAP analysis

Apache Kudu, you need to deploy the cluster separately. But Apache Hudi does not need, it can use the existing big data cluster such as HDFS to store data files, and then do data analysis through Hive, which is relatively more suitable for the resource-constrained environment # introduction to Apache hudi

Use the overall idea of Aapche Hudi

Hudi provides the concept of Hudi tables, which support CRUD operations. Based on this feature, we can replay the data of Mysql Binlog to the Hudi table, and then query and analyze the Hudi table based on Hive. The data flow architecture is as follows

Hudi data structure

The data files of the Hudi table can be stored in the file system of the operating system or distributed file system storage such as HDFS. In order to analyze the performance and reliability of data later, HDFS is generally used for storage. From the perspective of HDFS storage, the storage files of an Hudi table are divided into two categories.

The path containing _ partition_key is the actual data file, which is stored by partition. Of course, the path of partition key can be specified. I use _ partition_key here.

Hoodie because of the fragmented nature of CRUD, each operation will generate a file, more and more of these small files, will seriously affect the performance of HDFS, Hudi designed a set of file merging mechanism. The log files related to the file merge operation are stored in the .hoodie folder.

Data file

Hudi real data files are stored in Parquet file format

.hoodie file

Hudi calls a series of CRUD operations on a table over time Timeline. An operation in Timeline is called Instant. Instant contains the following information

Instant Action records whether this operation is a data submission (COMMITS), file merge (COMPACTION), or file cleanup (CLEANS).

Instant Time the time when this operation occurred

Status of the state operation, initiated (REQUESTED), in progress (INFLIGHT), or completed (COMPLETED)

The status record of the corresponding operation is stored in the .hoodie folder

Hudi record Id

In order to achieve CRUD of data, hudi needs to be able to uniquely identify a record. Hudi will combine the unique field in the dataset (record key) and the partition where the data resides (partitionPath) as the unique key of the data.

COW and MOR

Based on the above basic concepts, Hudi provides two types of table formats, COW and MOR. They will have some differences in data writing and query performance.

Copy On Write Table

COW for short. As the name implies, he makes a copy of the original when the data is written and adds new data on the basis of it. The request that is reading data is reading a nearly complete copy, which is similar to the idea of Mysql's MVCC.

In the image above, each color contains all the data as of its time. Old copies of data will be deleted after exceeding a certain limit. For this type of table, there is no compact instant because it is already compact when written.

When reading the advantages, only one data file of the corresponding partition can be read, which is more efficient.

When writing defect data, you need to copy a previous copy and generate a new data file based on it, which is a time-consuming process. And because of the time-consuming, the data read by the read request will lag behind.

Merge On Read Table

MOR for short. The newly inserted data is stored in delta log. Delta log is merged periodically for parquet data files. When reading the data, the delta log will be merge with the old data file to get the complete data back. Of course, the MOR table can also, like the COW table, ignore delta log and read only the most recent complete data file. The following figure illustrates two ways of reading and writing data in MOR

The advantage is that delta log is written first, and the delta log is small, so the writing cost is low.

The disadvantage needs to merge and sort compact regularly, otherwise there are more fragmented files. Poor read performance because of the need to merge delta log and old data files

Code implementation based on hudi

I put a Hudi-based wrapper implementation on github, and the corresponding source address is https://github.com/wanqiufeng/hudi-learn.

Binlog data is written to Hudi table

The binlog-consumer branch uses Spark streaming to consume Binlog data in kafka and write it to the Hudi table. The binlog in Kafka is pulled synchronously through Ali's Canal tool. The program entry is CanalKafkaImport2Hudi, which provides a series of parameters to configure the execution behavior of the program

Whether the default value is required for the meaning of the parameter name-- the basic path where the base-save-pathhudi table is stored in HDFS For example, hdfs://192.168.16.181:8020/hudi_data/ is none-- Mysql library name specified by mapping-mysql-db-name is none-- Mysql table name specified by mapping-mysql-table-name is none-- store-table-name specifies that the table name of Hudi is automatically generated based on-- mapping-mysql-db-name and-- mapping-mysql-table-name by default. Suppose-- mapping-mysql-db-name is crm,--mapping-mysql-table-name and order. Then the final hudi table name is crm__order--real-save-path, which specifies whether the hdfs path to the final storage of the hudi table is automatically generated according to-- base-save-path and-- store-table-name by default, and the generation format is'--base-save-path'+'/'+'--store-table-name'. Recommended default-primary-key specifies the name of the field that uniquely identifies the record in the synchronized mysql table whether the default id--partition-key specifies the time field in the mysql table that can be used for partition The field must be timestamp or dateime type is none-- the hoodie.datasource.write.precombine.field that precombine-key is ultimately used to configure hudi denotes the default id--kafka-server that specifies that the Kafka cluster address is none-- kafka-topic specifies that the queue of consuming kafka is none-- kafka-group specifies that the group of consuming kafka prefixes the storage table name with 'hudi' by default, such as' hudi_crm__order'--duration-seconds because this program is developed using Spark streaming. Here you specify whether the duration of the Spark streaming microbatch defaults to 10 seconds.

A demo used is as follows

/ data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit-- class com.niceshot.hudi.CanalKafkaImport2Hudi\-- name hudi__goods\-- master yarn\-- deploy-mode cluster\-- driver-memory 512m\-- executor-memory 512m\-- executor-cores 1\-num-executors 1\-- queue hudi\-- conf spark.executor.memoryOverhead=2048\ -conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=\ tmp\ hudi-debug"\-- conf spark.core.connection.ack.wait.timeout=300\-- conf spark.locality.wait=100\-- conf spark.streaming.backpressure.enabled=true\-- conf spark.streaming.receiver.maxRate=500\-- conf spark.streaming.kafka.maxRatePerPartition=200\-- conf spark.ui.retainedJobs=10\ -- conf spark.ui.retainedStages=10\-- conf spark.ui.retainedTasks=10\-- conf spark.worker.ui.retainedExecutors=10\-- conf spark.worker.ui.retainedDrivers=10\-- conf spark.sql.ui.retainedExecutions=10\-- conf spark.yarn.submit.waitAppCompletion=false\-- conf spark.yarn.maxAppAttempts=4\-- conf spark.yarn.am.attemptFailuresValidityInterval=1h\-conf spark. Yarn.max.executor.failures=20\-conf spark.yarn.executor.failuresValidityInterval=1h\-conf spark.task.maxFailures=8\ / data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar-kafka-server local:9092-kafka-topic dt_streaming_canal_xxx-base-save-path hdfs://192.168.2.1:8020/hudi_table/- -mapping-mysql-db-name crm-- mapping-mysql-table-name order-- primary-key id-- partition-key createDate-- duration-seconds 1200 Historical data synchronization and Table metadata synchronization to hive

The history_import_and_meta_sync branch provides operations to synchronize historical data to hudi tables and hudi data structures to hive meta.

Synchronize historical data to the hudi table

The idea adopted here is

The full amount of mysql data is imported into the hive table by injecting sqoop and other tools.

Then import the data into the Hudi table by using the tool HiveImport2HudiConfig in the branch code

HiveImport2HudiConfig provides the following parameters to configure program execution behavior

Whether the default value is required for the meaning of the parameter name-- the basic path where the base-save-pathhudi table is stored in HDFS For example, hdfs://192.168.16.181:8020/hudi_data/ is none-- Mysql library name specified by mapping-mysql-db-name is none-- Mysql table name specified by mapping-mysql-table-name is none-- store-table-name specifies that the table name of Hudi is automatically generated based on-- mapping-mysql-db-name and-- mapping-mysql-table-name by default. Suppose-- mapping-mysql-db-name is crm,--mapping-mysql-table-name and order. Then the final hudi table name is crm__order--real-save-path, which specifies whether the hdfs path to the final storage of the hudi table is automatically generated according to-- base-save-path and-- store-table-name by default, and the generation format is'--base-save-path'+'/'+'--store-table-name'. Recommended default-primary-key specifies the name of the field that uniquely identifies the record in the synchronized hive history table whether the default id--partition-key specifies the time field in the hive history table that can be used for partition The field must be timestamp or dateime type none-- hoodie.datasource.write.precombine.field ultimately used to configure hudi No default id--sync-hive-db-name full historical data hive library name is none-sync-hive-table-name full historical data hive table name is none-- hive-base-pathhive all data document storage address You need to see the specific hive configuration No / the address of the user/hive/warehouse--hive-site-pathhive-site.xml configuration file is none-- the temporary file storage path during the execution of the tmp-data-path program. The general default path is / tmp. It is possible that the disk on which / tmp is located is too small, resulting in the failure of the history program execution. When this occurs, you can use this parameter to customize the execution path to deny the default operating system temporary directory

A program executes demo

Nohup java-jar hudi-learn-1.0-SNAPSHOT.jar-- sync-hive-db-name hudi_temp-- sync-hive-table-name crm__wx_user_info-- base-save-path hdfs://192.168.2.2:8020/hudi_table/-- mapping-mysql-db-name crm-- mapping-mysql-table-name "order"-primary-key "id"-- partition-key created_date-- hive-site-path / etc/ Lib/hive/conf/hive-site.xml-- tmp-data-path / data/tmp > order.log & synchronize the hudi table structure to hive meta

The data structure and partition of hudi need to be synchronized to Hive meta in the form of hive appearance, so that Hive can perceive hudi data and query and analyze through sql. When Hudi consumes Binlog for storage, it can synchronize the relevant table metadata information to hive. However, considering that every piece of data written to the Apache Hudi table has to be read and written to Hive Meta, it may have a great impact on the performance of Hive. So I developed a separate HiveMetaSyncConfig tool for synchronizing hudi metadata to Hive. Considering that currently the program only supports partitioning by day, the synchronization tool can be executed once a day. The parameters are configured as follows

Whether the default value is required for the meaning of the parameter name-- hive-db-name specifies which hive database hudi table is synchronized to is none-- hive-table-name specifies which hive table is synchronized to which hive table is none-- hive-jdbc-url specifies the jdbc link address of hive meta For example, jdbc:hive2://192.168.16.181:10000 is none-- hive-user-name specifies the link username of hive meta No default hive--hive-pwd specifies the link password of hive meta No default hive--hudi-table-path specifies that the file path of hdfs where the hudi table is located is none-- hive-site-path specifies that the hive-site.xml path of hive is none

A program executes demo

Java-jar hudi-learn-1.0-SNAPSHOT.jar-- hive-db-name streaming-- hive-table-name crm__order-- hive-user-name hive--hive-pwd hive--hive-jdbc-url jdbc:hive2://192.168.16.181:10000-- hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order-- hive-site-path / lib/hive/conf/hive-site.xml some crater hive related configurations

The hive.input.format configuration of some hive clusters defaults to org.apache.hadoop.hive.ql.io.CombineHiveInputFormat, which causes the Hive appearance of mounting Hudi data to read the Parquet data of all Hudi, resulting in duplicate final reading results. The format of hive needs to be changed to org.apache.hadoop.hive.ql.io.HiveInputFormat. In order to avoid unnecessary impact on the rest of the offline Hive Sql at the entire cluster level, it is recommended that set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat be set only on the current hive session.

Some tuning of spark streaming

Since binlog writes Hudi table based on Spark streaming, some configurations at spark and spark streaming levels are given here, which can make the whole program work more stably.

Configuration means spark.streaming.backpressure.enabled=true starts the back pressure, which enables the Spark Streaming consumption rate to be adjusted based on the last consumption situation to avoid program crash spark.ui.retainedJobs=10

Spark.ui.retainedStages=10

Spark.ui.retainedTasks=10

Spark.worker.ui.retainedExecutors=10

Spark.worker.ui.retainedDrivers=10

By default, spark.sql.ui.retainedExecutions=10 spark stores some historical information of stage and task during the execution of spark programs in driver. When driver memory is too small, it may cause driver to crash. Through the above parameters, adjust the number of historical data stored, thus reducing the number of spark.yarn.maxAppAttempts=4 configurations for the inner layer. When driver crashes, spark.yarn.am.attemptFailuresValidityInterval=1h tries to restart once a week if driver crashes. Then we prefer to restart every time, and after the above configuration has been restarted for 4 times, driver will never be restarted. This configuration is used to reset the interval between maxAppAttempts and spark.yarn.max.executor.failures=20executor execution may fail. After failure, the cluster will automatically assign a new executor. This configuration is used to configure the number of times executor is allowed to fail. After this configuration, the program will report (reason: Max number of executor failures (400) reached). And exit spark.yarn.executor.failuresValidityInterval=1h to specify the time interval for the number of executor failed redistributions to reset spark.task.maxFailures=8 allows the number of task execution failures to improve in the future

Non-partitioned or non-date partitioned tables are supported. Currently, only date partition tables are supported

Multiple data types are supported. At present, for the stability of the program, all fields in Mysql are stored in Hudi as String type.

After reading the above, have you mastered how to use Apache Hudi? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report