Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to troubleshoot Flink Checkpoint problems

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces you how to conduct Flink Checkpoint problem troubleshooting, the content is very detailed, interested friends can refer to, hope to be helpful to you.

In Flink, state reliability assurance is supported by Checkpoint, and when a job has a failover, Flink will succeed from the most recent

Checkpoint recovery. In practice, we may encounter Checkpoint failure or slow Checkpoint. This article will talk about the exception of Checkpoint in Flink (including failure and slow), as well as the possible causes and troubleshooting ideas.

1. Brief introduction of Checkpoint process

First of all, we need to understand the whole process of Checkpoint in Flink. After understanding the whole process, we can better locate and analyze when something goes wrong.

As we can see from the above figure, the Checkpoint of Flink consists of the following parts:

1.JM trigger checkpoint

2.Source received trigger checkpoint's PRC, started to do snapshot himself, and sent barrier downstream.

3. Receive barrier downstream (you need all the barrier to start checkpoint)

4.Task start synchronization phase snapshot

5.Task starts asynchronous phase snapshot

6.Task snapshot completed, report to JM

If any of the above steps fail, the entire checkpoint will fail.

* * 2 troubleshooting of Checkpoint anomalies

2.1 Checkpoint failed * *

You can see the following figure in the Checkpoint interface, where Checkpoint 10423 failed.

Click on the details of Checkpoint 10423 and we can see the table shown in the following figure (the operator name is truncated in the following figure).

In the figure above, we see three lines representing three operator, each of which has the following meanings:

1. The Acknowledged column indicates how many subtask have ack the Checkpoint. From the figure, we can see that the third operator has a total of 5 subtask, but only 4 have ack.

two。 The second column Latest Acknowledgement represents the time of the last ack of all subtask of the operator

3.End to End Duration represents the longest time it takes to complete a snapshot among all the subtask of the entire operator

4.State Size represents the state size of the current Checkpoint-mainly here, if it is an incremental checkpoint, it represents the incremental size

5.Buffered During Alignment indicates how much data has been accumulated during the barrier alignment phase. If the data is too large, it indirectly indicates that the alignment is slow.)

Checkpoint failures can be roughly divided into two situations: Checkpoint Decline and Checkpoint Expire.

2.1.1 Checkpoint Decline

We can see a log similar to the following in jobmanager.log

Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178. Among them

10423 is checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1, execution id,85d268e6fbc19411185f7e4868a44178 is job id. We can look for execution id in jobmanager.log to find the taskmanager to which it is scheduled, similar to the following:

019-09-02 16 org.apache.flink.runtime.executiongraph.ExecutionGraph 26 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph-XXXXXXXXXXX (100,289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING.2019-09-02 16 Visual20972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph-Deploying XXXXXXXXXXX (100Univer 289) (attempt # 0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

From the log above, we know that the execution is dispatched to the container_e24_1566836790522_8088_04_013155_1 slot of hostnameABCDE, and then we can find the specific reason for the failure of Checkpoint in the taskmanager.log of container container_e24_1566836790522_8088_04_013155.

In addition, for the case of Checkpoint Decline, there is one situation that we separately extract here to introduce: Checkpoint Cancel.

In the current Flink, if the smaller Checkpoint is not aligned and a larger Checkpoint is received, the smaller Checkpoint will be cancelled. We can see logs similar to the following:

TaskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

This log indicates that currently Checkpoint 19 is still in the alignment phase and we have received a barrier for Checkpoint 20. Then the downstream task checkpoint 19 will be notified step by step, and JM will also be notified that the current Checkpoint has been dropped by decline.

When the downstream task receives the cancelBarrier, it prints a log similar to the following:

DEBUG$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment. Or DEBUG$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment. Or WARN$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

The above three logs all indicate that the current task received a barrierCancel message sent upstream, thus canceling the corresponding Checkpoint.

2.1.2 Checkpoint Expire

If the Checkpoint is too slow to complete the timeout, the entire Checkpoint will fail as well. When a Checkpoint fails due to a timeout, you will see the following log in jobmanager.log:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178 expired before completing.

Indicates that Chekpoint 1 failed due to a timeout. At this time, you can see whether there is a log similar to the following:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

You can follow the method in 2.1.1 to find the corresponding taskmanager.log to view the specific information.

If the following log is DEBUG, we will mark DEBUG at the beginning

According to the following log, we divide the snapshot on the TM side into three stages: pre-snapshot, synchronous and asynchronous:

DEBUGStarting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4amp 4)

This log indicates that after the barrier on the TM side is aligned, you are ready to start Checkpoint.

DEBUG2019-08-06 13 DEBUG2019-08-06 13 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy-DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference= (default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread [Async calls on Source: xxxxxx_source-> Filter (27xpx 70), 5thecommerce Flink Task Threads] took 0 ms.

The above log indicates that the synchronization phase of the current backend is complete, using a total of 0 ms.

DEBUGDefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx, metadataFilePath=xxxxxx, reference= (default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread [pool-48murthreadmilet 1450] took 369ms

The above log indicates that the asynchronous phase is complete, and the asynchronous phase uses 369 ms

In the case of existing logs, we use the above three logs to determine whether the snapshot starts late, the synchronous phase is slow, or the asynchronous phase is slow. And then continue to investigate the problem further according to the situation.

2.2 Checkpoint slow

In Section 2.1, we introduce the troubleshooting ideas for Checkpoint failures, and this section describes the slow Checkpoint situation by case.

The slow Checkpoint is as follows: for example, Checkpoint interval 1 minute, timeout 10 minutes, Checkpoint often takes 9 minutes (we hope to be able to do it in about 1 minute), and we don't expect the state size to be very large.

For slow Checkpoint cases, we can check them one by one in the following order.

2.2.0 Source Trigger Checkpoint slow

This is generally rare, but it is also possible, because when source does snapshot and sends barrier downstream, it needs to grab the lock (the community is now working on replacing the current lock grab with mailBox. For more information, please see [1]). If you can't get the lock all the time, it may lead to no chance for Checkpoint to proceed all the time. If you cannot find the log that started to do Checkpoint in the taskmanager.log where Source is located, you can consider whether this is the case, and you can further confirm the holding of the lock through jstack.

2.2.1 using incremental Checkpoint

Now there are two modes of Checkpoint in Flink, full Checkpoint and incremental Checkpoint, in which full Checkpoint backs up all the current state to persistent storage, while incremental Checkpoint backs up only the state that did not exist in the last Checkpoint, so the content uploaded by incremental Checkpoint is relatively better and has a greater advantage in speed.

Incremental Checkpoint is only supported in RocksDBStateBackend in Flink. If you have already used RocksDBStateBackend, you can accelerate it by enabling incremental Checkpoint. For more information, please see [2].

2.2.2 there is reverse pressure or data tilt in the job.

We know that task will only perform snapshot after receiving all barrier. If the job has reverse pressure or data skew, it will cause slow transmission of all channel or some channel barrier, thus affecting the time of Checkpoint as a whole. These two can be checked through the following page:

In the figure above, we selected a task to check the backpressure of all subtask, and found that it is high, which means that the backpressure is serious, which will cause the downstream to receive barrier later.

In the figure above, we select one of the operator, click on all the subtask, and then sort the Records Received/Bytes Received/TPS from largest to smallest. We can see that the first few subtask will have more data to process than the other subtask.

If there is backpressure or data skew, we need to first solve the backpressure or data tilt problem, and then check to see if the Checkpoint time is as expected.

2.2.2 slow Barrier alignment

We know that Checkpoint is divided into barrier alignment (collecting all barrier sent upstream) on the task side, and then starts the synchronization phase, and then does the asynchronous phase. If the barrier is not aligned all the time, you won't start to do snapshot.

After barrier alignment, the log is printed as follows:

DEBUGStarting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4amp 4)

If this log is not in taskmanager.log, it means that the barrier has not been aligned all the time. Next, we need to know which upstream barrier has not been sent. If you use At Least Once, you can observe the following log:

DEBUGReceived barrier for checkpoint 96508 from channel 5

Indicates that the task has received a barrier from channel 5, and then look at the corresponding Checkpoint to see which upstream barrier has not been received. For ExactlyOnce, you can consider adding it yourself or checking it with jmap.

2.2.3 the main thread is too busy to have a chance to do snapshot

On the task side, all processing is single-threaded, and data processing and barrier processing are handled by the main thread. If the main thread is too slow in processing (for example, slow RocksDBBackend,state operation leads to slow overall processing), it will also affect the progress of the overall Checkpoint. In this step, we need to be able to view the hotmethod corresponding to a PID. Here are two recommended methods:

1. Jstack several times in succession to see which threads are in the RUNNABLE state all the time

two。 Use the tool AsyncProfile dump to view a flame chart to view the stack that takes up the most CPU

Of course, it would be better if there are other more convenient ways, and you are welcome to recommend them.

2.2.4 the synchronization phase is slow

The synchronization phase is generally not too slow, but if we find that the synchronization phase is slow through the log, we can consider whether asynchronous snapshot is enabled for non-RocksDBBackend. If asynchronous snapshot is enabled or slow, we need to see what the whole JVM is doing, or we can use the tools in the previous section. For RocksDBBackend, we can use iostate to check the pressure on the disk. In addition, we can check the log of the log of the RocksDB on the tm side, and see how much time spent on SNAPSHOT.

The log of RocksDB starting snapshot is as follows:

2019-09-10-14 Started the snapshot process 22 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] 55.734684-creating snapshot in directory / tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

The log at the end of snapshot is as follows:

2019-09-10-14 Snapshot DONE 22 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] 56.001275. All is good

2.2.5 the asynchronous phase is slow.

For the asynchronous phase, the tm mainly backs up the state to persistent storage. For non-RocksDBBackend, the main bottleneck comes from the network. At this stage, you can consider observing the metric of the network, or the situation where the network traffic can be observed on the corresponding machine (such as iftop).

For RocksDB, you need to read files locally and write to remote persistent storage, so you need to consider not only the bottleneck of the network, but also the performance of the local disk. In addition, for RocksDBBackend, if you feel that network traffic is not a bottleneck, but upload is relatively slow, you can also try to turn on multi-thread upload function [3].

3 Summary

In the second part, we introduce the main scenarios for troubleshooting some Checkpoint exceptions in the case of officially compiled packages, as well as the corresponding troubleshooting methods. If you have checked all the above situations and still do not find the bottleneck, you can consider adding more detailed logs, gradually narrowing the scope, and finally locating the cause.

For some of the DEBUG logs mentioned above, if the flink dist package is compiled by yourself, it is recommended to change some DEBUG in the whole step of the Checkpoint to INFO, so that you can know the overall phase of the Checkpoint through the log, when and what phase is completed, and how much time is spent in each phase when the Checkpoint is abnormal.

On how to conduct Flink Checkpoint troubleshooting to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report