Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Method of Apache Hudi Asynchronous Clustering deployment Operation

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article Xiaobian for you to introduce in detail "Apache Hudi asynchronous Clustering deployment operation method", the content is detailed, the steps are clear, the details are handled properly, I hope this "Apache Hudi asynchronous Clustering deployment operation method" article can help you solve your doubts, following the editor's ideas slowly in-depth, together to learn new knowledge.

1. Abstract

Clustering's table service reorganizes data to provide better query performance without slowing down ingestion, and we already know how to deploy synchronous Clustering. In this blog, we will discuss some recent improvements made by the community and how to deploy asynchronous Clustering through HoodieClusteringJob and DeltaStreamer tools.

two。 Introduction

Typically, Clustering creates a plan based on configurable policies, groups eligible files according to specific rules, and then executes the plan. Hudi supports concurrent writes and provides snapshot isolation between multiple table services, allowing writers to continue ingesting while running Clustering in the background. For a more detailed overview of the architecture of Clustering, see the previous blog post.

3. Clustering strategy

As mentioned earlier, Clustering planning and execution depend on pluggable configuration policies. These strategies can be roughly divided into three categories: planning strategy, execution strategy and update strategy.

3.1 Planning Strategy

This policy plays a role in creating Clustering plans. It helps to decide which filegroups should be Clustering. Let's take a look at the different planning strategies provided by Hudi. Note that these policies can be easily plugged in and out with this configuration.

SparkSizeBasedClusteringPlanStrategy: select file slices and create Clustering groups based on the small file limit of the base file, up to the maximum file size allowed for each group. You can use this configuration to specify the maximum size. This strategy is useful for merging medium-sized files into large files to reduce the large number of files distributed across cold partitions.

SparkRecentDaysClusteringPlanStrategy: create a plan based on previous N-day partitions to Clustering small slices of files in these partitions, which is the default policy, which can be useful when the workload is predictable and the data is divided by time.

SparkSelectedPartitionsClusteringPlanStrategy: if you only want to Clustering specific partitions within a range, this strategy is useful regardless of whether these partitions are new or old, and you need to set up two configurations (including the start and end partitions) below to use this strategy:

Hoodie.clustering.plan.strategy.cluster.begin.partitionhoodie.clustering.plan.strategy.cluster.end.partition

Note: all policies are partition-aware, and the latter two policies are still constrained by the size limit of the first policy.

3.2 Enforcement Policy

After the Clustering groups are built in the planning phase, Hudi applies the enforcement policy for each group mainly based on the sort order and size, and you can use this configuration to specify the policy.

SparkSortAndSizeExecutionStrategy is the default policy. When using this configuration for Clustering, the user can specify the data ordering. In addition, we can also set the maximum file size for Parquet files generated by Clustering. This policy uses bulk_insert to write data to a new file, in which case Hudi implicitly uses a divider that sorts according to the specified column. Changing the data layout through this strategy not only improves the query performance, but also automatically balances the rewriting overhead.

This policy can now be executed as a single Spark job or as multiple jobs, depending on the number of Clustering groups created during the planning phase. By default, Hudi submits multiple Spark jobs and merges the results. If you want to force Hudi to use a single Spark job, set the execution policy class configuration to SingleSparkJobExecutionStrategy.

3.3 Update Policy

Currently, Clustering can only be scheduled for tables / partitions that do not receive any concurrent updates. By default, the configuration of the update policy is set to SparkRejectUpdateStrategy. If a filegroup has an update during Clustering, it rejects the update and throws an exception. In some use cases, however, updates are very sparse and do not involve most filegroups. The default policy of simply rejecting updates seems unfair. In this use case, the user can set the configuration to SparkAllowUpdateStregy.

We discussed the key policy configuration, and all other configurations related to Clustering are listed below. Some of the most useful configurations in this list include:

The configuration item explains the default value of hoodie.clustering.async.enabled to enable asynchronous running of the Clustering service on the table. Falsehoodie.clustering.async.max.commits controls the frequency of asynchronous Clustering by specifying how many commits should be triggered. The existing _ hoodie_commit_time is retained when 4hoodie.clustering.preserve.commit.metadata rewrites the data. This means that users can run incremental queries on Clustering data without any side effects. False4. Asynchronous Clustering

We have seen how users set up synchronization Clustering before. In addition, users can use HoodiecClusteringJob to set two-step asynchronous Clustering.

4.1 HoodieClusteringJob

With the release of Hudi version 0.9.0, we can schedule and execute Clustering in the same step. We just need to specify the-mode or-m option. There are three modes:

Schedule (scheduling): make a Clustering plan. This provides an instant that can be passed in exec mode.

Execute (execution): executes the Clustering plan at a given instant, which means that instant is required here.

ScheduleAndExecute (scheduling and execution): first make a Clustering plan and execute it immediately.

Note that to run the job while the original writer is still running, enable multiple writes:

Hoodie.write.concurrency.mode=optimistic_concurrency_controlhoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

An example of submitting a HoodieClusteringJob using the spark submit command is as follows:

Spark-submit\-class org.apache.hudi.utilities.HoodieClusteringJob\ / path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar\-- props / path/to/config/clusteringjob.properties\-- mode scheduleAndExecute\-- base-path / path/to/hudi_table/basePath\-- table-name hudi_table_schedule_clustering\-- spark-memory 1g

An example of a clusteringjob.properties configuration file is as follows

Hoodie.clustering.async.enabled=truehoodie.clustering.async.max.commits=4hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824hoodie.clustering.plan.strategy.small.file.limit=629145600hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategyhoodie.clustering.plan.strategy.sort.columns=column1,column24.2 HoodieDeltaStreamer

Let's take a look at how to use HudiDeltaStreamer. Now we can use DeltaStreamer to trigger asynchronous Clustering. Simply set hoodie.clustering.async.enabled to true and specify a different Clustering configuration in the properties file, and you can set its location to-props when you start Deltastreamer (similar to the HoodieClusteringJob configuration).

An example of submitting a HoodieDeltaStreamer using the spark submit command is as follows:

Spark-submit\-class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer\ / path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar\-- props / path/to/config/clustering_kafka.properties\-- schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider\-- source-class org.apache.hudi.utilities.sources.AvroKafkaSource\-- source-ordering-field impresssiontime\-- table-type COPY_ON_ WRITE\-target-base-path / path/to/hudi_table/basePath\-target-table impressions_cow_cluster\-op INSERT\-hoodie-conf hoodie.clustering.async.enabled=true\-continuous4.3 Spark Structured Streaming

We can also enable asynchronous Clustering using the Spark structured flow, as shown below.

Val commonOpts = Map ("hoodie.insert.shuffle.parallelism"-> "4", "hoodie.upsert.shuffle.parallelism"-> "4", DataSourceWriteOptions.RECORDKEY_FIELD.key-> "_ row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key-> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key-> "timestamp", HoodieWriteConfig.TBL_NAME.key-> "hoodie_test") def getAsyncClusteringOpts (isAsyncClustering: String, clusteringNumCommit: String ExecutionStrategy: String): Map [String, String] = {commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key-> isAsyncClustering, HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key-> clusteringNumCommit, HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key-> executionStrategy)} def initStreamingWriteFuture (hudiOptions: Map [String) String]: Future [Unit] = {val streamingInput = / / define the source of streaming Future {println ("streaming starting") streamingInput .writeStream .format ("org.apache.hudi") .options (hudiOptions) .option ("checkpointLocation") BasePath + "/ checkpoint") .mode (Append) .start () .awaitTermination (10000) println ("streaming ends")} def structuredStreamingWithClustering (): Unit = {val df = / / generate data frameval hudiOptions = getClusteringOpts ("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") val F1 = initStreamingWriteFuture (hudiOptions) Await.result (F1, Duration.Inf)} This article "how to deploy Apache Hudi Asynchronous Clustering" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it. If you want to know more about the article, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report