Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use RocksDB status backend in Apache Flink

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to use the RocksDB status backend in Apache Flink? for this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.

Stream processing applications are usually stateful, "remember" the information in processed events, and use it to influence further event processing. In Flink, the remembered information, the state, is stored locally in the configured state backend. To prevent data loss in the event of a failure, the state back end periodically persists snapshots of its contents to preconfigured persistent storage. The RocksDB stateful backend, or RocksDBStateBackend, is one of the three built-in stateful backends in Flink. This blog post will guide you through the benefits of using RocksDB to manage application state, explain when and how to use it, and clear up some common misunderstandings.

Status in Flink

In order to better understand the state and state backend in Flink, it is important to distinguish between flying state (in-flight state) and state snapshot (state snapshots). The flight state, also known as the working state, is the state in which Flink jobs are being processed. It is always stored locally in memory (it may overflow to disk) and may be lost when the job fails without affecting the job's recoverability. State snapshots, that is, checkpoints and Savepoints, are stored in remote persistent memory to restore the local state when a job fails. The appropriate state backend of a production deployment depends on scalability, throughput, and latency requirements.

What is RocksDB?

It is a common misconception that RocksDB is a distributed database that needs to be run on a cluster and managed by a dedicated administrator. RocksDB is an embeddable persistent key value store for fast storage. It interacts with Flink through the Java native interface (JNI). The following figure shows the location of RocksDB in the Flink cluster node. The following sections are described in more detail.

RocksDB in Flink

Everything you need to use RocksDB as a stateful backend is bundled in the Apache Flink distribution, including native shared libraries:

$jar-tvf lib/flink-dist_2.12-1.12.0.jar | grep librocksdbjni-linux64

8695334 Wed Nov 27 02:27:06 CET 2019 librocksdbjni-linux64.so

At run time, RocksDB is embedded in the TaskManager process. It runs in a native thread and processes local files. For example, if you have a job with RocksDBStateBendback configured in your Flink cluster, you will see something similar to the following, where 32513 is the TaskManager process ID.

$ps-T-p 32513 | grep-I rocksdb

32513 32633? 00:00:00 rocksdb:low0

32513 32634? 00:00:00 rocksdb:high0

Note: this command applies only to Linux. For other operating systems, refer to their documentation

When to use RocksDBStateBackend

In addition to RocksDBStateBackend,Flink, there are two other built-in status backends: MemoryStateBackend and FsStateBackend. They are all heap-based because the running state is stored in the JVM heap. For now, let's ignore MemoryStateBackend because it is only used for local development and debugging, not for production.

With RocksDBStateBackend, the running state is first written to out-of-heap / local memory and then flushed to the local disk when the configured threshold is reached. This means that RocksDBStateBackend can support states that are larger than the total configured heap capacity. The amount of state that can be stored in RocksDBStateBackend is limited only by the available disk space throughout the cluster. In addition, because RocksDBStateBackend does not use the JVM heap to store running state, it is not affected by JVM garbage collection and therefore has predictable latency.

In addition to complete, self-contained state snapshots, RocksDBStateBackend supports incremental checkpoints as a performance tuning option. The incremental checkpoint stores only changes that have occurred since the last completed checkpoint. This greatly reduces checkpoint time compared to performing a full snapshot. RocksDBStateBendback is currently the only status backend that supports incremental checkpoints.

RocksDB is a good choice in the following situations:

The status of the job exceeds the capacity of local memory (for example, long windows, large KeyedState)

You are studying incremental checkpoints as a way to reduce checkpoint time

Hope for a more predictable delay that is not affected by JVM garbage collection

Otherwise, FsStateBackend should be considered if the state of the application is small or requires a low latency. As a rule of thumb, RocksDBStateBackend is several times slower than the heap-based state backend because it stores key-value pairs as serialized bytes. This means that any state access (read / write) requires a deserialization / serialization process across JNI boundaries, which is more expensive than directly using the state representation on the heap. The advantage is that for the same number of states, it has a lower memory footprint than the corresponding heap representation.

How to use RocksDBStateBackend

RocksDB is fully embedded in the TaskManager process and is completely managed by the TaskManager process. RocksDBStateBackend can be configured as the default value for the entire cluster at the cluster level or as the default value for individual jobs at the job level. Job-level configuration takes precedence over cluster-level configuration.

Cluster level

Add the following configuration to conf/flink-conf.yaml:

State.backend: rocksdb

State.backend.incremental: true

State.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints job level

After you create the StreamExecutionEnvironment, add the following to the code of the job:

# 'env' is the created StreamExecutionEnvironment

# 'true' is to enable incremental checkpointing

Env.setStateBackend (new RocksDBStateBackend ("hdfs:///fink-checkpoints", true))

Note: in addition to HDFS, other local or cloud-based object stores can be used if the corresponding dependencies are added under FLINK_HOME/plugins.

Best practices and advanced configuration

We hope this overview will help you better understand the role of RocksDB in Flink and how to run jobs successfully using RocksDBStateBackend. Finally, we will explore some best practices and reference points for further troubleshooting and performance tuning.

The position of the state in RocksDB

As mentioned earlier, the running state in RocksDBStateBackend overflows to files on disk. These files are located in the state.backend.rocksdb.localdir directory specified by the Flink configuration. Because disk performance directly affects the performance of RocksDB, it is recommended that you place this directory on your local disk. Configuring it to a remote network-based location, such as NFS or HDFS, is discouraged because writing to remote disks is usually slow. High availability is also not a flight status (in-flight state) requirement. If high disk throughput is required, local SSD disks are preferred.

Status snapshots are persisted to remote persistent storage. During the status snapshot, TaskManager takes a snapshot of the state in flight (in-flight state) and stores it remotely. Transferring state snapshots to remote storage is entirely handled by the TaskManager itself, without the involvement of the state back end. So, the state.checkpoints.dir directory or the parameters you set for a specific job in the code can be different locations, such as local HDFS clusters or cloud-based object stores, such as Amazon S3, Azure Blob Storage, Google cloud Storage, Alibaba OSS, and so on.

RocksDB fault diagnosis

To check the behavior of RocksDB in production, look for the RocksDB log file named LOG. By default, this log file is located in the same directory as the data file, that is, the directory state.backend.rocksdb.localdir specified by the Flink configuration. When enabled, RocksDB statistics are also recorded there to help diagnose potential problems. For more information, see Troubleshooting Guide in RocksDB Wiki. If you are interested in RocksDB behavior trends, consider enabling RocksDB native metrics for your Flink assignments.

Note: starting with Flink1.10, logging is effectively disabled by setting the logging level to HEADER,RocksDB. To enable it, check How to get RocksDB's LOG file back for advanced troubleshooting.

Warning: enabling native metrics for RocksDB in Flink may have a negative impact on your work.

Starting with Flink 1.10, Flink defaults to configuring the memory allocation of RocksDB to the amount of managed memory (managed memory) per task slot. The main mechanism for improving memory-related performance problems is to increase the managed memory of Flink through Flink configuration taskmanager.memory.managed.size or taskmanager.memory.managed.fraction. For finer-grained control, you should first set state.backend.rocksdb.memory.managed to false, and then start with the following Flink configuration: state.backend.rocksdb.block.cache-size (corresponding to the block size in RocksDB), state.backend.rocksdb.writebuffer.size (corresponding to write_buffer_size in RocksDB), and state.backend.rocksdb.writebuffer.count (corresponding to the maximum number of write buffers in RocksDB). For more details, check out this article on how to manage RocksDB memory size in Flink and the RocksDB memory usage Wiki page.

When writing or overwriting data in RocksDB, the RocksDB thread manages refresh and data compression from memory to the local disk in the background. On multi-core CPU machines, you should increase the parallelism of background refresh and compression by setting Flink configuration state.backend.rocksdb.thread.num (corresponding to max_background_jobs in RocksDB). The default configuration is usually too small for production settings. If your work often reads content from RocksDB, then you should consider enabling the Bloom filter.

For other RocksDBStateBackend configurations, review the Flink documentation Advanced RocksDBState Backends Options. For further tuning, see RocksDB Tuning Guide in RocksDB Wiki.

The RocksDB stateful backend, or RocksDBStateBackend, is one of the three state backends bundled in Flink and is a good choice when configuring streaming applications. It enables scalable applications to maintain a state of up to several TB and guarantees exactly-once. If the state of the Flink job is too large to fit into the JVM heap, or if you are interested in incremental checkpoints, or if you want predictable delays, you should use RocksDBStateBackend. Because RocksDB is embedded in the TaskManager process as a native thread and can handle files on the local disk, RocksDBStateBackend supports it right out of the box without having to set up and manage any external systems or processes.

This is the answer to the question about how to use the RocksDB status back-end in Apache Flink. I hope the above content can be of some help to you. If you still have a lot of doubts to solve, you can follow the industry information channel for more related knowledge.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report