In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the Checkpoint mechanism of Flink". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the Checkpoint mechanism of Flink".
I. the concept of Checkpoint
Flink itself provides a powerful Checkpoint mechanism in order to ensure its highly available features and the rapid recovery of the functioning Exactly Once.
Checkpoint mechanism is the cornerstone of Flink reliability, which can ensure that when a certain operator fails for some reason (such as abnormal exit), the state of the whole application flow graph can be restored to a certain state before the failure, which ensures the consistency of the state of the application flow graph. The principle of Flink's Checkpoint mechanism comes from the "Chandy-Lamport algorithm" algorithm (distributed snapshots).
2. Checkpoint core element Barriers
The core element in Flink distributed snapshots is stream barriers. These barriers are injected into the data flow and flow together with the record as part of the data flow. Barriers never goes beyond the records, they are arranged in strict order. Barriers divides the records in the data stream into recordsets that enter the current snapshot and recordsets that enter the next snapshot. Each barriers contains the ID of the snapshot, and the record of the snapshot is pushed to the front of the snapshot. The barrier will not interrupt the flow of the flow, so it is very light. Multiple barriers from different snapshots can appear in the stream at the same time, which means that various snapshots may occur at the same time.
Stream barriers is injected into the parallel data stream of the stream data source. The location where the barriers of snapshot n is injected (we call it Sn) is the location of the snapshot overwritten data in the stream data source. For example, in Apache Kafka, this location is the offset of the last record in the partition. The Sn at this location is reported to checkpoint coordinator (Flink's JobManager).
These barriers then flow down the river. When the intermediate operator receives the barriers of snapshot n from all its input streams, it sends the barriers of snapshot n to all its output streams. Once the receive operator (the end of the stream DAG) receives barrier n from all its input streams, it acknowledges snapshot n to checkpoint coordinator. After all receivers have confirmed the snapshot, the snapshot is considered complete.
Barriers alignment
Operators that receives multiple input streams needs to align the input streams on the snapshot barrier. The following is the Barrier alignment flow chart of the Flink official website:
Once a.Operators receives a snapshot barriers n from an incoming stream, it cannot process any other records in that stream until it also receives barriers n from other inputs. Otherwise, it mixes records that belong to snapshot n and records that belong to snapshot n + 1.
b. The stream that reports barriers n is on hold. Records received from these streams are not processed but are placed in the input buffer.
c. Once the last stream receives the barriers nforce operators will send out all pending outgoing records, and then issue the barriers n barrier itself.
d. After that, it will resume processing the records in all input streams, process the records in the input buffer, and then process the records in the stream.
When an operator receives all the checkpoint n barrier sent upstream and sends them downstream, it takes a snapshot of the state and saves the offset state equivalent. By default, it is stored in the memory of JobManager. Because it may be large, it can be stored in the state backend. It is recommended to put hdfs in the generation.
Operators snapshots the state of all snapshot barriers at the point in time it receives all snapshot barriers from the input stream and before it issues the barriers to the output stream. At that time, all status updates will be made to the records prior to the barriers, and any updates that depend on the records after the barriers is applied will not be applied. Because the state of a snapshot can be large, it is stored in a configurable state backend. By default, this is JobManager memory, but for production purposes, distributed reliable storage (for example, HDFS) should be configured. After the state is stored, the operator confirms the checkpoint, sends the snapshot barriers to the output stream, and then continues.
3. Checkpoint setting
1. Some related configurations in the code
The default checkpoint function is disabled. If you want to use it, you need to enable checkpoint first. After enabling it, the default checkPointMode is Exactly-once. Here are some default configurations on the official website:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / start a checkpoint every 1000 msenv.enableCheckpointing (1000)
/ / advanced options:
/ set mode to exactly-once (this is the default) env.getCheckpointConfig () .setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE)
/ / checkpoints have to complete within one minute, or are discardedenv.getCheckpointConfig () .setCheckpointTimeout (60000)
/ / make sure 500 ms of progress happen between checkpointsenv.getCheckpointConfig () .setMinPauseBetweenCheckpoints (500)
/ / allow only one checkpoint to be in progress at the same timeenv.getCheckpointConfig () .setMaxConcurrentCheckpoints (1)
/ enable externalized checkpoints which are retained after job cancellationenv.getCheckpointConfig () .enableExternalizedCheckpoints (ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
/ This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure.env.getCheckpointConfig () .setFailOnCheckpointingErrors (true)
1) .Checkpoint defaults to disabled and is enabled by the enableCheckpointing method. Two functions are implemented:
EnableCheckpointing (long interval)
EnableCheckpointing (long interval, CheckpointingMode mode)
Interval is used to specify the trigger interval for checkpoint (in milliseconds)
CheckpointingMode defaults to CheckpointingMode.EXACTLY_ONCE, or it can be specified as CheckpointingMode.AT_LEAST_ONCE.
2)。 You can also set CheckpointingMode through the setCheckpointingMode method
3) .checkpointTimeout specifies the timeout of checkpoint execution (in milliseconds). If the timeout is not completed, it will be dropped by abort.
4) .minPauseBetweenCheckpoints specifies the minimum time after the completion of a checkpoint on the checkpoint coordinator to start another checkpoint. When this parameter is specified, the value of maxConcurrentCheckpoints is 1.
5) .maxConcurrentCheckpoints specifies the maximum number of checkpoint in operation. When 4) when minPauseBetweenCheckpoints is specified, it will no longer work and needs to be set to 1.
6). EnableExternalizedCheckpoints specifies that external persistence of checkpoints is enabled, but it does not clean up automatically when job fails. You need to clean state manually.
7). ExternalizedCheckpointCleanup specifies how to clean up externalized checkpoint when job canceled. If DELETE_ON_CANCELLATION, externalized state will be deleted automatically when job canceled, but if it is FAILED, it will be retained. RETAIN_ON_CANCELLATION will retain externalized checkpoint state when job canceled.
8) .failOnCheckpointingErrors specifies whether the task should be fail when an exception occurs in checkpoint. Default is true. If set to false, task will reject checkpoint and continue to run.
Some related configurations in 2.flink-conf.yaml
# = # Fault tolerance and checkpointing#====
# The backend that will be used to store operator state checkpoints if# checkpointing is enabled.## Supported backends are 'jobmanager',' filesystem', 'rocksdb', or the#. # # state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled# state backends.## state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.## state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that# support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false
1) .state.backend is used to specify the checkpoint state storage backend. By default, none,state is stored in taskmanager's memory and checkpoint is stored in JobManager's memory.
The values of state.backend can be as follows:
Jobmanager (MemoryStateBackend)
Filesystem (FsStateBackend)
Rocksdb (RocksDBStateBackend)
2) .state.back.async is used to specify whether backend uses asynchronous snapshot (default is true). Some state backend that does not support async or only supports async may ignore this parameter
3) .state.backend.fs.memory-threshold, default is 1024, used to specify the state size threshold stored in files, and if it is less than this value, it will be stored in root checkpoint metadata file
4) .state.backend.configuration, which defaults to false and is used to specify whether to use incremental checkpoint. Some backend that does not support incremental checkpoint will ignore this configuration.
5) .state.backend.local-recovery, which defaults to false, which configures the local recovery of the backend in this state. By default, local restores are not available. Local recovery is currently only available at the key state backend. Currently, memorystateback does not support local recovery and ignores this option
6) .state.checkpoints.dir, which defaults to none, is used to specify the directory of checkpoint's data files and meta data storage, which must be visible to all participating TaskManagers and JobManagers
7) .state.checkpoints.num-retained, default to 1, used to specify the number of completed checkpoints retained
8) .state.savepoints.dir, which defaults to none and the default directory of the SavePoint. State backend used to write savepoints to the file system (memorystate backend, fsstate backend, rocksdbstate backend)
9) .taskmanager.state.local.root-dirs, which by default defines the root directory for the none; configuration parameter, which is used to store the locally restored file-based state. Local recovery currently covers only the back end of the key state. Currently, memorystateback does not support local recovery and ignores this option
Thank you for your reading, the above is the content of "what is the Checkpoint mechanism of Flink". After the study of this article, I believe you have a deeper understanding of what the Checkpoint mechanism of Flink is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.