Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the optimization when Flink uses large states?

2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you what the optimization of Flink is when using large state, the content is concise and easy to understand, it can definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

First, why optimize it? (optimize the background)

Flink supports a variety of StateBackend, and when the status is large, only RocksDBStateBackend is currently available.

RocksDB is a KV database based on the principle of LSM tree. The problem of reading magnification of LSM tree is serious, so it requires high disk performance. It is strongly recommended that SSD be used as the storage medium of RocksDB in production environment. However, some clusters may not be equipped with SSD, just ordinary mechanical hard disk. When the Flink task is relatively large and the state access is more frequent, the disk IO of the mechanical hard disk may become a performance bottleneck. In this case, how to solve this bottleneck?

Use multiple hard drives to share the pressure

RocksDB uses memory plus disk to store data. When the state is relatively large, the disk takes up more space. If there are frequent read requests to the RocksDB, the disk IO will become the bottleneck of the Flink task.

It is strongly recommended that you configure the state.backend.rocksdb.localdir parameter in flink-conf.yaml to specify the directory where RocksDB is stored on disk. When a TaskManager contains three slot, then the three parallelism degrees on a single server cause frequent reads and writes to the disk, resulting in the three parallelism competing with each other for the same disk io, which will inevitably lead to a decline in the throughput of all three parallelism.

Fortunately, the state.backend.rocksdb.localdir parameter of Flink can specify multiple directories. Generally speaking, many hard disks are mounted on big data servers. We expect three slot of the same TaskManager to use different hard disks to reduce resource competition. The configuration of specific parameters is as follows:

State.backend.rocksdb.localdir: / data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb

Note: be sure to configure directories on multiple different disks. Do not configure multiple directories on a single disk. Multiple directories are configured here to share the pressure on multiple disks.

The following figure shows the IO utilization of the disk during the test. We can see that the parallelism of the three large-state operators corresponds to three disks. The average IO utilization of these three disks is about 45%, and the highest IO utilization is almost 100%, while the average IO utilization of other disks is about 10%, which is much lower. Thus it can be seen that when using RocksDB as the status backend and frequent read and write operations with large states, it does consume a lot of disk IO performance.

Ideally, when setting up multiple RocksDB local disk directories, Flink randomly selects the directory to use, so it is possible for three parallelism to share the same directory.

As shown in the following figure, two parallelism shares sdb disks, and one parallelism uses sdj disks. You can see that the average IO utilization of sdb disks has reached 91.6%. At this time, the disk IO of sdb will certainly become the bottleneck of the entire Flink task, which will greatly reduce the throughput of the two parallelism corresponding to sdb disks, thus reducing the throughput of the entire Flink task.

This usually does not happen if there are a large number of hard disks mounted on the server, but if the throughput is low after the task is restarted, you can check whether multiple parallelism share the same disk.

Flink may have the problem of sharing the same disk with multiple parallelism, so how to solve it?

Second, commonly used load balancing strategies

In terms of phenomenon, 12 disks are allocated to RocksDB, only 3 parallelism requires 3 disks, but there is a certain chance that 2 parallelism will share the same disk, and there may even be a small chance that 3 parallelism will share the same disk. This makes it easy for our Flink task to become a bottleneck because of disk IO.

The above strategy of allocating disks is actually the load balancing strategy of the industry. The general load balancing strategies include hash, random and round robin.

Hash strategy

After some kind of hash strategy, the task itself distributes the pressure on multiple Worker. Corresponding to the above scenario, the pressure of RocksDB directories used by multiple slot is shared on multiple disks. However, there may be conflicts in hash. Hash conflicts represent multiple different Flink parallelism, and the hashCode obtained after hash is the same, or the hashCode is assigned to the same hard disk after making a balance on the number of hard drives.

Random strategy

The random strategy is to generate a random number for each Flink task and randomly assign the pressure to a Worker, that is, randomly assign the pressure to a disk. But random numbers can also conflict.

Round Robin strategy

The round-robin policy is easy to understand. Multiple Worker can take turns to receive data. The Flink task uses directory 1 when applying for a RocksDB directory for the first time, and directory 2 when applying for a directory the second time, and then apply in turn. This policy is the most uniform strategy for allocating tasks, and if used, it ensures that the number of tasks assigned to all hard drives varies by up to 1.

Minimum load policy / Least Response Time (minimum response time) policy

Tasks are assigned according to the response time of Worker. Short response time means strong load capacity, so more tasks should be assigned. Corresponding to the above scenario, the IO utilization of each disk is tested. A low utilization rate means that the disk IO is relatively idle and should be assigned more tasks.

Specify weight policy

Each Worker is assigned a different weight value, the task with high weight value assigns more tasks, and the number of tasks assigned is proportional to the weight value.

For example, if the Worker0 weight value is 2 and the weight of Worker1 1 is 1, then the number of tasks assigned by Worker0 will be twice the number of tasks assigned by Worker1 when assigning tasks. This strategy may not be suitable for the current business scenario. Generally speaking, the load capacity of each hard disk on the same server will not vary much, unless the local dir of RocksDB contains both SSD and HDD.

Third, how to allocate disks in the source code?

When the author uses Flink version 1.8.1 online, some hard drives are assigned multiple parallelism, and some hard drives are not assigned a parallelism at all. You can boldly guess that the probability of using hash or random in the source code is relatively high, because in most cases, each hard disk is assigned only one task, and there is a small chance of assigning multiple tasks (the problem of assigning multiple tasks with low probability is to be solved).

If you use a round-robin strategy, it is certain that a degree of parallelism will be assigned to each hard disk before two tasks are assigned to a single hard disk. And the round-robin strategy ensures that the allocated hard drives are continuous.

Look directly at part of the source code of the RocksDBStateBackend class:

/ * Base paths for RocksDB directory, as initialized. Here are the 12 rocksdb local dir * / private transient File [] initializedDbBasePaths that we set up above

/ * The index of the next directory to be used from {@ link # initializedDbBasePaths}. Next time, use dir's index. If nextDirectory = 2, use the directory with the subscript 2 in initializedDbBasePaths as the storage directory for rocksdb * / private transient int nextDirectory

/ / in the lazyInitializeForJob method, it is decided through this line of code that the index,// of dir will be used next time to generate random numbers based on initializedDbBasePaths.length, / / if initializedDbBasePaths.length = 12, the range of random numbers generated is 0-11nextDirectory = new Random () .nextInt (initializedDbBasePaths.length)

After analyzing the simple source code, we know that the random strategy is used in the source code to allocate dir, which matches what we have seen. There is a small probability of conflict in random distribution. (at the time of this writing, Flink's latest master branch code is still the same strategy as above, and no changes have been made.)

Fourth, which strategy is more reasonable?

(challenges posed by various strategies)

When the number of tasks is relatively large, random and hash policies can guarantee that each Worker will undertake basically the same amount of tasks, but if the amount of tasks is relatively small, for example, when 20 tasks are assigned to 10 Worker by random algorithm, there will be no tasks assigned to the existing Worker, and some Worker may be assigned 3 or 4 tasks. So random and hash strategies can not solve the pain point of uneven disk distribution in rocksdb, what about round robin strategy and minimum load strategy?

Round robin strategy

A round-robin strategy can solve the above problems in the following ways:

/ / private static final AtomicInteger DIR_INDEX = new AtomicInteger (0) is defined in the RocksDBStateBackend class

/ / the allocation policy of nextDirectory is changed to the following code, which adds DIR_INDEX + 1 each time, and then nextDirectory = DIR_INDEX.getAndIncrement ()% initializedDbBasePaths.length to the total number of dir.

Through the above, you can realize the round-robin strategy. When you apply for a disk, start from disk 0 and use the next disk at a time.

Problems caused by ■:

Static variables in Java belong to the JVM level, and each TaskManager belongs to a separate JVM, so the round robin policy is guaranteed internally in TaskManager. If multiple TaskManager are running on the same server, then multiple TaskManager will be used from disks with index 0, so disks with smaller index will be used frequently, while disks with larger index may not be used frequently.

■ solution 1:

During DIR_INDEX initialization, instead of initializing to 0 each time, you can generate a random number, which ensures that a disk with a smaller index will not be used every time. The implementation code is as follows:

/ / private static final AtomicInteger DIR_INDEX = new AtomicInteger (new Random () .nextInt (100) is defined in the RocksDBStateBackend class)

However, the above scheme cannot completely solve the problem of disk conflicts. For 12 disks on the same machine, TaskManager0 uses three disks with index of 0, 1 and 2, and TaskManager1 may use three disks with index of 1, 2 and 3. The result is that from an internal point of view of TaskManager, the round robin strategy is implemented to ensure load balancing, but from a global point of view, the load is not balanced.

■ solution 2:

In order to achieve global load balancing, multiple TaskManager must communicate with each other in order to achieve absolute load balancing. You can communicate with third-party storage. For example, in Zookeeper, generate a znode,znode name for each server that can be host or ip. Use the DistributedAtomicInteger of Curator to maintain the DIR_INDEX variable, which is stored in the corresponding znode of the current server. No matter which TaskManager applies for disk, you can use DistributedAtomicInteger to change the corresponding DIR_INDEX + 1 of the current server, thus realizing the global round-robin policy.

The idea of DistributedAtomicInteger's increment: first use the withVersion api of Zookeeper for the + 1 operation (that is, the CAS api provided by Zookeeper). If it succeeds, it succeeds; if it fails, it uses a distributed mutex for the + 1 operation.

Based on the above description, we have two strategies to achieve round robin. AtomicInteger can only guarantee the internal round robin of TaskManager, not global round robin. If it is based on global round robin, it needs to be implemented with the help of Zookeeper or other components. If you have strict requirements for a round-robin policy, you can use a Zookeeper-based round-robin policy, and if you do not want to rely on external components, you can only use AtomicInteger.

Minimum load policy

The idea is that when TaskManager starts, we should monitor the average IO utilization of all the disks corresponding to rocksdb local dir in the last 1 or 5 minutes, screen out the disks with high IO utilization, and give priority to the disks with low average IO utilization. At the same time, we still need to use the round-robin strategy in the disks with low average IO utilization.

The problems faced by ■

When the Flink task starts, you can only get the current IO utilization of the disk, which is an instantaneous value, isn't it unreliable?

The Flink task starts. It is impossible to wait for the task to collect IO utilization for 1 minute before starting it.

Do not want to rely on external monitoring system to get this IO utilization, to consider versatility.

Assuming that you have got the IO utilization of all the hard drives in the last 1 minute, how do you make a decision?

For disks with low average IO utilization, you still need to use a round-robin strategy.

The average utilization rate of IO is low, which is difficult to judge here, and the difference of 10% is low, or 20% or 30%.

And different new tasks have different requirements for disk utilization, so it is difficult to judge.

New ideas for ■ (discussing)

The load pressure of the hard disk is not collected during the startup phase, and the load balance of each hard disk can be basically guaranteed by using the previous DistributedAtomicInteger. However, for a period of time after the task starts, if the average utilization of one disk IO is very high compared to other disks because of the Flink task. We can choose to migrate data from high-load hard drives to low-load hard drives.

The above is what the optimization of Flink is when using large state. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report