Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The relationship between Checkpoint and state and the execution Mechanism of Checkpoint

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains the relationship between Checkpoint and state and the implementation mechanism of Checkpoint. The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn the relationship between Checkpoint and state and the implementation mechanism of Checkpoint.

Hello, everyone. Today I will share with you the Checkpoint in Flink, which is divided into four parts. First talk about the relationship between Checkpoint and state, then introduce what is state, the third part describes how to use state in Flink, and the fourth part introduces the implementation mechanism of Checkpoint.

The relationship between Checkpoint and state

Checkpoint is a global operation triggered from source to all nodes downstream. The following figure can have an intuitive feeling of Checkpoint. In the red box, you can see that a total of 569K Checkpoint has been triggered, and then all of them have been completed successfully, without fail.

State is actually the main data of the main persistent backup done by Checkpoint. If you look at the specific data statistics in the figure below, its state is also the size of 9kb.

What is state?

Let's see what state is next. First look at a very classic word count code, this code will monitor the local port 9000 data and the network port input word frequency statistics, we locally act netcat, and then enter hello world in the terminal, what will the executive output?

The answer is obvious, (hello, 1) and (word,1)

So the question is, if you type hello world on the terminal again, what will the program enter?

The answer is also obvious, (hello, 2) and (world, 2). Why does Flink know that hello world has been processed once before? this is why state works. Here, it is called keyed state that stores the data that needs to be counted before, so help Flink know that hello and world have appeared once respectively.

Review the word count code just now. The call to the keyby API creates a keyed stream to partition the key, which is a prerequisite for using keyed state. After that, the sum method invokes the built-in StreamGroupedReduce implementation.

What is keyed state?

For keyed state, there are two characteristics:

Can only be applied to functions and operations of KeyedStream, such as Keyed UDF, window state

Keyed state has been partitioned / partitioned, and each key can only belong to a certain keyed state

For how to understand the concept of partitioning, we need to take a look at the semantics of keyby. You can see that there are three concurrency on the left and three concurrency on the right. After the words on the left come in, they will be distributed through keyby. For example, for the word hello word,hello, the hash operation will always only go to the lower right side of the concurrent task.

What is operator state?

Also known as non-keyed state, each operator state is bound to only one instance of operator.

A common operator state is source state, such as an offset that records the current source

Take another look at the word count code that uses operator state:

The fromElements here invokes FromElementsFunction's class, where operator state of type list state is used. Make a classification according to the state type as shown below:

In addition to this classification point of view, there is also a classification point of view from whether the Flink takes over directly:

Managed State: state managed by Flink. All the state in the example are managed state.

Raw State:Flink only provides stream to store data. For Flink, raw state is just some bytes.

In actual production, only managed state is recommended, and this article will focus on this topic.

How to use state in Flink

The following figure shows how to use keyed state in your code using the StreamGroupedReduce class used by word count's sum as an example:

The following figure details the FromElementsFunction class in the word count example and shares how to use operator state in your code:

The execution Mechanism of Checkpoint

Before we introduce the execution mechanism of Checkpoint, we need to take a look at the storage of state, because state is the main role of Checkpoint for persistent backup.

Classification of Statebackend

The following figure illustrates the three types of state backend built into Flink. Both MemoryStateBackend and FsStateBackend are stored in java heap at run time, and FsStateBackend persists the data to remote storage in file format only when Checkpoint is executed. On the other hand, RocksDBStateBackend borrows RocksDB (LSM DB of mixed memory and disk) to store state.

For HeapKeyedStateBackend, there are two implementations:

Support for asynchronous Checkpoint (default): storage format CopyOnWriteStateMap

Only synchronous Checkpoint: storage format NestedStateMap is supported

Especially when using HeapKeyedStateBackend within MemoryStateBackend, the Checkpoint serialization data phase has a default limit of up to 5 MB data.

For RocksDBKeyedStateBackend, each state is stored in a separate column family, where keyGroup,Key and Namespace are serialized and stored in DB as key.

Detailed explanation of Checkpoint implementation Mechanism

This section explains the step-by-step disassembly of the execution process of Checkpoint. On the left side of the figure is Checkpoint Coordinator, which is the initiator of the entire Checkpoint. In the middle is a Flink job composed of two source and one sink, and on the far right is persistent storage, which corresponds to HDFS in most user scenarios.

The first step is to trigger Checkpoint; Checkpoint Coordinator to all source nodes.

In the second step, the source node broadcasts the barrier downstream, which is the core of implementing the Chandy-Lamport distributed snapshot algorithm. The downstream task will execute the corresponding Checkpoint only if it receives all the barrier of the input.

Third, when task completes the state backup, it notifies Checkpoint coordinator of the address (state handle) of the backup data.

Step 4, after the downstream sink node collects the barrier of the two upstream input, it executes the local snapshot. The process of RocksDB incremental Checkpoint is shown here. First, RocksDB brushes all the data to disk (red triangle indicates), and then the Flink framework selects files that have not been uploaded for persistent backup (small purple triangle).

Similarly, the sink node notifies Coordinator that the state handle is returned after completing its own Checkpoint.

Finally, when Checkpoint coordinator collects all the state handle of task, it is considered that this time the Checkpoint is completed globally, and one more Checkpoint meta file is backed up to the persistent storage.

EXACTLY_ONCE semantics of Checkpoint

In order to implement EXACTLY ONCE semantics, Flink caches the data received during the alignment phase through an input buffer, and then processes it after the alignment is complete. For AT LEAST ONCE semantics, the collected data does not need to be cached and will be processed directly later, so when resulting in restore, the data may be processed multiple times. The following is a schematic diagram of Checkpoint align in the official website document:

It should be noted that the Checkpoint mechanism of Flink can only ensure that the calculation process of Flink can be EXACTLY ONCE, and end-to-end EXACTLY ONCE needs source and sink support.

The difference between Savepoint and Checkpoint

When the job resumes, both can be used, and the main differences are as follows:

When the SavepointExternalized Checkpoint user is triggered by the command, and the user manages the creation and deletion of the Checkpoint, the standardized format storage is saved in the external persistent storage given by the user, allowing job upgrades or configuration changes. When the job FAILED (or CANCELED), the externally stored Checkpoint will be retained. The user needs to provide the savepoint path used to restore the job state during the restore. The user needs to provide the Checkpoint path for the job state used to restore.

Thank you for your reading, the above is the content of "the relationship between Checkpoint and state and the implementation mechanism of Checkpoint". After the study of this article, I believe you have a deeper understanding of the relationship between Checkpoint and state and the implementation mechanism of Checkpoint, and the specific use needs to be verified by practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report