Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to parse Flink 1.11 Unaligned Checkpoint

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how to carry out Flink 1.11 Unaligned Checkpoint parsing, many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.

As the most basic and critical fault-tolerant mechanism of Flink, Checkpoint snapshot mechanism ensures the data accuracy of Flink applications after recovering from abnormal state. At the same time, Checkpoint-related metrics is also the most important indicator to diagnose the health status of Flink applications. Successful and time-consuming Checkpoint indicates that the job is running well and there is no abnormal or reverse pressure. However, due to the coupling of Checkpoint and backpressure, reverse pressure in turn will also act on Checkpoint, resulting in various problems of Checkpoint.

In view of this, Flink introduced Unaligned Checkpint to decouple the Checkpoint mechanism from the reverse pressure mechanism and optimize the Checkpoint performance in the case of high reverse pressure.

A brief introduction of current Checkpoint Mechanism

I believe that many readers are already familiar with Flink Checkpoint's distributed snapshot based on the Chandy-Lamport algorithm. This section briefly reviews the basic logic of the algorithm, and readers who are familiar with the algorithm can safely skip it.

The Chandy-Lamport algorithm abstracts the distributed system into DAG (not considering the graph with closed loop for the time being). The node represents the process and the edge represents the communication pipeline between the two processes. The purpose of distributed snapshot is to record the state of the whole system, that is, it can be divided into node state (process state) and edge state (channel state, that is, data in transmission). Because the system state is driven by the input message sequence, we can divide the input message sequence into several shorter sub-sequences, and each node or edge of the graph will enter the same stable global state after dealing with a certain sub-sequence. Using this feature, the process and channel of the system take local snapshots at the boundary points of the subsequence, and even if the snapshot time points of each part are different, they can finally be combined into a meaningful global snapshot.

Figure 1. Checkpoint Barrier

In terms of implementation, Flink divides the continuous data stream into multiple finite sequences corresponding to multiple Checkpoint cycles by injecting a special element called Barrier into the data stream at the time of the DAG data source. Whenever a Barrier is received, the operator takes a local Checkpoint snapshot, and asynchronously uploads the local snapshot after completion, while the Barrier is broadcast to the downstream. When all the Barrier of a Checkpoint reaches the end of the DAG and all operators complete the snapshot, it marks the success of the global snapshot.

Figure 2. Barrier Alignment

In the case of multiple input Channel, for the sake of data accuracy, the operator waits for the Barrier of all streams to arrive before starting the local snapshot, a mechanism called Barrier alignment. During the alignment process, the operator will only continue to process data from the non-Barrier Channel, while the rest of the Channel data will be written to the input queue until it is blocked after the queue is full. When all the Barrier arrives, the operator takes a local snapshot, outputs the Barrier downstream and returns to normal processing.

Compared with other distributed snapshots, the advantage of this algorithm is that it does not need "Stop The World" to affect the application throughput with the aid of Copy-On-Write technology, and basically does not need to persist the data in processing, but only saves the state information of the process, which greatly reduces the size of the snapshot.

Coupling of Checkpoint and reverse pressure

The current Checkpoint algorithm works well in most cases, but when the reverse pressure occurs in the job, the blocking Barrier alignment will aggravate the reverse pressure of the job, and even lead to the instability of the job.

First of all, the end of Chandy-Lamport distributed snapshots depends on the flow of Marker, while backpressure will limit the flow of Marker, resulting in longer completion time or even timeouts of snapshots. In either case, the point in time of the Checkpoint lags far behind the actual data flow. At this time, the computing progress of the job is not persisted and is in a fragile state. If the job is passively restarted due to an exception or actively restarted by the user, the job will roll back and lose a certain amount of progress. If the Checkpoint times out continuously and is not well monitored, the progress of rollback loss may be as high as one day, which is usually unacceptable for real-time services. To make matters worse, the backward Lag of the rolled-back assignment is larger, which usually brings greater reverse pressure, creating a vicious circle.

Secondly, Barrier alignment itself may become a source of reverse pressure, affecting the efficiency of upstream operators, which is unnecessary in some cases. For example, a typical job reads multiple Source, performs different aggregate calculations, and then writes the calculated results to different Sink. In general, these different Sink will reuse common operators to reduce double computation, but do not want different Source to interact with each other.

Figure 3. Barrier Alignment blocking upstream Task

Suppose a job needs to count the day-to-day granularity of two business lines An and B respectively, as well as the weekly indicators of all business lines. The topology is shown in the figure above. If the business volume of B business line soars one day, resulting in delay in Checkpoint Barrier, it will cause the common Window Aggregate to be Barrier aligned, which will block the FlatMap of business A, and eventually delay the calculation of business A.

Of course, this situation can be optimized by splitting jobs, but it is inevitable to introduce more development and maintenance costs, and more importantly, this is already in line with the conventional development ideas of Flink users, and the possibility of unexpected user behavior should be minimized within the framework.

Unaligned Checkpoint

To solve this problem, Flink introduced the Unaligned Checkpoint feature in version 1.11. To understand the principle of Unaligned Checkpoint, you first need to understand the description of Marker processing rules in the Chandy-Lamport paper:

Figure 4. Chandy-Lamport Marker processing

The key is if q has not recorded its state, that is, whether the operator has already taken a local snapshot when the Marker is received. Flink's Aligned Checkpoint has always delayed local snapshots until all Barrier arrives through Barrier alignment, so this condition is always true, thus cleverly avoiding snapshots of the state of the operator input queue, but at the cost of less controllable Checkpoint duration and throughput. In fact, this is different from the Chandy-Lamport algorithm.

For example, suppose we equal-join two data streams to output the matching elements. According to Flink Aligned Checkpoint, the state of the system changes as follows (the elements of different colors in the figure represent different Checkpoint cycles):

Figure 5. Aligned Checkpoint state change

Figure a: there are 3 elements for input Channel 1, 2 in front of Barrier, and 4 elements for Channel 2, with 2, 9, and 7 in front of Barrier. Figure b: the operator reads an element of Channel and outputs 2. Then the Barrier of Channel 1 is received, and the subsequent data of Channel 1 is stopped, and only the data of Channel 2 is processed. Figure c: the operator consumes two more elements from Channel 2, receives the Barrier, starts the local snapshot and outputs the Barrier.

For the same case, the state change of the Chandy-Lamport algorithm is as follows:

Figure 6. Chandy-Lamport state change

Figure a: ditto. Figure b: the operator processes two Channel and one element respectively, and outputs the result 2. After receiving the Barrier of Channel 1, the operator starts a local snapshot to record its own state and outputs the Barrier. Figure c: the operator continues to normally process two Channel inputs and outputs 9. What is special is that the subsequent elements of Channel 2 are preserved until the Barrier of Channel 2 appears (that is, 9 and 7 of Channel 2). The saved data becomes part of the snapshot as the state of the Channel.

The differences between the two can be summarized into two points:

Whether the snapshot is triggered when the first Barrier is received or the last Barrier is received. Whether it is necessary to block the calculation of the Channel that has received Barrier.

From these two points, the new Unaligned Checkpoint changes the trigger of the snapshot to the first Barrier and unblocks the calculation of Channel, which is basically the same as Chandy-Lamport in algorithm, and makes several improvements in terms of implementation details combined with the positioning of Flink.

First of all, different from the Chandy-Lamport model, we only need to consider the state of the input Channel of the operator, the operator of Flink has two kinds of Channel: input and output, both of which need to be considered in the snapshot.

Secondly, in both Chandy-Lamport and Flink Aligned Checkpoint algorithms, Barrier must follow its position in the data stream, and operators need to wait for Barrier to be actually processed before starting snapshots. Unaligned Checkpoint changes this setting to allow operators to take in and output Barrier first. In this way, the first to arrive at Barrier will jump forward some distance in the operator's cache data queue (including input Channel and output Channel), and the data that was "jumped in line" and other input Channel before its Barrier will be written to the snapshot (the yellow part of the figure).

Figure 7. Barrier crosses over data

The main benefit of this is that if the processing of the operator itself is a bottleneck, Chandy-Lamport 's Barrier will still be blocked, but Unaligned Checkpoint can start snapshots as soon as Barrier enters the input Channel. This can greatly speed up the flow of Barrier through the entire DAG, thereby reducing the overall duration of the Checkpoint.

Going back to the previous example, implemented with Unaligned Checkpoint, the state changes as follows:

Figure 8. Unaligned-Checkpoint state change

Figure a: there are 3 elements for input Channel 1, 2 in front of Barrier, and 4 elements for Channel 2, with 2, 9, and 7 in front of Barrier. Result data 1 already exists in the output Channel. Figure b: the operator gives priority to the Barrier input Channel 1, starts the local snapshot to record its own state, and inserts the Barrier into the output Channel end. Figure c: the operator continues to normally process the inputs of two Channel and outputs 2 and 9. At the same time, the operator writes data crossed by Barrier (that is, 2 of input Channel 1 and 1 of output Channel) to Checkpoint, and continues to write data of input Channel 2 that is earlier than Barrier (that is, 2, 9, 7) to Checkpoint.

Compared with the data of different Checkpoint periods in Aligned Checkpoint is clearly separated by operator snapshots, when Unaligned Checkpoint takes snapshots and outputs Barrier, part of the input data that belongs to the current Checkpoint has not been calculated (so it is not reflected in the current operator state), while part of the output data belonging to the current Checkpoint falls behind the Barrier (and therefore not reflected in the state of the downstream operators).

This is what Unaligned means: data misalignment for different Checkpoint cycles, including misalignment between different input Channel, and misalignment between input and output. This part of the misaligned data is recorded by the snapshot to be replayed when the state is restored. In other words, when recovering from Checkpoint, misaligned data cannot be calculated from the data replayed on the Source side and not reflected in the operator state, but because they will be restored by Checkpoint to the corresponding Channel, it can still provide accurate results that can only be calculated once.

Of course, Unaligned Checkpoint is not 100% better than Aligned Checkpoint, and the known problems it brings are:

Due to the persistence of cached data, the State Size will grow greatly and the disk load will increase. With the growth of State Size, the job recovery time may increase, and the difficulty of operation and maintenance management will increase.

At present, it seems that Unaligned Checkpoint is more suitable for complex operations which are easy to produce high reverse pressure and more important at the same time. For simple jobs such as data ETL synchronization, a lighter Aligned Checkpoint is obviously a better choice. The Unaligned Checkpoint of Flink 1.11 mainly solves the problem that it is difficult to complete Checkpoint in the case of high reverse pressure. at the same time, at the cost of disk resources, it avoids the blocking that may be caused by Checkpoint, and helps to improve the resource utilization of Flink. After reading the above, do you have any further understanding of how to do Flink 1.11 Unaligned Checkpoint parsing? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report