In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the Flink checkpoint mechanism". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the Flink checkpoint mechanism".
Checkpoint introduction
Checkpoint mechanism is the cornerstone of Flink reliability, which can ensure that when a certain operator fails for some reason (such as abnormal exit), the state of the whole application flow graph can be restored to a certain state before the failure, which ensures the consistency of the state of the application flow graph. The principle of Flink's checkpoint mechanism comes from the "Chandy-Lamport algorithm" algorithm.
When each application that needs checkpoint is started, the JobManager of Flink creates a CheckpointCoordinator (checkpoint coordinator) for it, and CheckpointCoordinator is fully responsible for making snapshots of this application.
1) CheckpointCoordinator (checkpoint coordinator) periodically sends barrier (barrier) to all source operators applied by the stream.
2) when a source operator receives a barrier, it pauses the data processing process, then takes a snapshot of its current state and saves it to the specified persistent storage, finally reports its snapshot production to the CheckpointCoordinator, and broadcasts the barrier to all its downstream operators to resume data processing.
3) after receiving the barrier, the downstream operator will suspend its own data processing, then make a snapshot of its relevant status and save it to the specified persistent storage, and finally report its own snapshot to the CheckpointCoordinator. At the same time, broadcast the barrier to all its downstream operators to resume data processing.
4) each operator continuously makes a snapshot and broadcasts it downstream according to step 3, until the barrier is passed to the sink operator, and the snapshot is completed.
5) when CheckpointCoordinator receives reports from all operators, it is considered that the snapshot for this period was made successfully; otherwise, if it does not receive reports from all operators within the specified time, it is considered to have failed to make snapshots for this period.
If an operator has two input sources, temporarily block the input source that received the barrier first, wait until the second input source with the same number of barrier arrives, then take a snapshot of itself and broadcast the barrier downstream. It is shown in the following figure:
1) assume that operator C has two input sources An and B
2) in the I snapshot cycle, due to some reasons (such as processing delay, network delay, etc.), the barrier sent by the input source An arrives first, and the operator C temporarily blocks the input channel of the input source An and only receives the data of the input source B.
3) when the barrier sent by the input source B arrives, operator C takes its own snapshot and reports its own snapshot production to the CheckpointCoordinator, then merges the two barrier into one and broadcasts to all operators downstream.
4) when a failure occurs due to some reason, CheckpointCoordinator notifies all operators on the flow graph to return to the checkpoint state of a certain period, and then resumes the data flow processing. The distributed checkpoint mechanism ensures that the data is processed only once (Exactly Once).
Persistent storage MemStateBackend
This persistent storage mainly stores snapshot data in JobManager memory, which is only suitable for testing and a very small amount of snapshot data, and is not recommended for large-scale commercial deployment.
Limitations of MemoryStateBackend:
By default, the size of each state is limited to 5 MB. You can increase this value in the constructor of MemoryStateBackend.
Regardless of the configured maximum state size, the state cannot be greater than the size of the akka frame (see configuration).
The aggregation state must be appropriate for JobManager memory.
It is recommended that MemoryStateBackend be used to:
Local development and debugging.
Jobs with few statuses, such as jobs that contain only one recording function (Map,FlatMap,Filter, … Consumers of kafka need very little status.
FsStateBackend
This persistent storage mainly saves snapshot data to the file system, which is currently supported by HDFS and local files. If you use HDFS, you need to pass in a path that starts with "hdfs://" (that is, new FsStateBackend ("hdfs:///hacluster/checkpoint") when initializing FsStateBackend, or a path that starts with "file://" (" file:///Data")) ") if you are using a local file. In distributed cases, local files are not recommended. If an operator fails on node A, it is restored on node B. when local files are used, the data on node A cannot be read on B, resulting in state recovery failure.
Recommended FsStateBackend:
Jobs with large status, long windows, and large key / value status.
All high availability settings.
RocksDBStateBackend
RocksDBStatBackend is between the local file and HDFS, and usually uses the function of RocksDB to persist the data to the local file. When making a snapshot, the local data is made into a snapshot and persisted to FsStateBackend (FsStateBackend does not need to be specifically specified by the user, just pass in HDFS or local path during initialization, such as new RocksDBStateBackend ("hdfs:///hacluster/checkpoint") or new RocksDBStateBackend ("file:///Data"))").
If the user uses a custom window (window), RocksDBStateBackend is not recommended. In a custom window, the state is saved in StatBackend in the form of ListState, and if there are multiple value values in a key value, RocksDB reads the ListState very slowly, affecting performance. Users can choose FsStateBackend+HDFS or RocksStateBackend+HDFS according to the specific situation of the application.
Syntax val env = StreamExecutionEnvironment.getExecutionEnvironment () / / start a checkpoint every 1000 msenv.enableCheckpointing (1000) / / advanced options:// sets the execution mode of checkpoint, at most once or at least once env.getCheckpointConfig.setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE) / / sets the checkpoint timeout env.getCheckpointConfig.setCheckpointTimeout (60000) / / if an error occurs during snapshot only Whether to let the overall task fail: true is false, not env.getCheckpointConfig.setFailTasksOnCheckpointingErrors (false) / / set how many checkpoint can execute env.getCheckpointConfig.setMaxConcurrentCheckpoints at the same time (1) two ways to modify State Backend
First: single task adjustment
Modify the current task code
Env.setStateBackend (new FsStateBackend ("hdfs://namenode:9000/flink/checkpoints"))
Or new MemoryStateBackend ()
Or new RocksDBStateBackend (filebackend, true); [third-party dependency needs to be added]
The second kind: global adjustment
Modify flink-conf.yaml
State.backend: filesystem
State.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
Note: the values of state.backend can be the following: jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
Advanced options for Checkpoint
The default checkpoint function is disabled. If you want to use it, you need to enable checkpoint first. After enabling it, the default checkPointMode is Exactly-once.
/ / configure to enable one checkpointenv.enableCheckpointing (1000) per second / / specify the execution mode of checkpoint / / two options: / / CheckpointingMode.EXACTLY_ONCE: default / / CheckpointingMode.AT_LEAST_ONCEenv.getCheckpointConfig.setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE) generally select CheckpointingMode.EXACTLY_ONCE, unless the scenario requires a very low latency (milliseconds) Note: if you need to guarantee both EXACTLY_ONCE,source and sink requirements, you must guarantee EXACTLY_ONCE// if the program is cancle Keep previously done checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints (ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) by default, checkpoints are not retained and are only used to resume jobs in the event of a failure. External persistence checkpoints can be enabled while specifying the retention policy: ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: keep checkpoints when the job is cancelled, note, in this case, you must manually clean up the checkpoint state ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: when the job is cancel Delete checkpoints. Checkpoints are available only when the job fails. The checkpoint timeout env.getCheckpointConfig.setCheckpointTimeout (60000) / / Checkpointing timeout is set. If the timeout is not completed, it is terminated / / Checkpointing minimum interval, which is used to specify how long after the last checkpoint is completed / / minimum, to trigger another checkpoint. When this parameter is specified, The value of maxConcurrentCheckpoints is 1env.getCheckpointConfig.setMinPauseBetweenCheckpoints (500) / / sets whether multiple checkpoint can execute env.getCheckpointConfig.setMaxConcurrentCheckpoints at the same time (1) specifies the maximum number of env.getCheckpointConfig.setFailOnCheckpointingErrors (true) that can be used for running checkpoint to specify whether the task should be fail when an exception occurs in checkpoint. The default is true. If set to false, task will reject checkpoint and continue to run Flink restart policy.
Flink supports different restart policies that control how to restart after a job failure. The cluster can be restarted through the default restart policy, which is usually used when the restart policy is not specified. If the restart policy is specified when the Job is submitted, the restart policy will override the default restart policy of the cluster.
Overview
The default restart policy is specified through Flink's flink-conf.yaml, and this configuration parameter restart-strategy defines which policy will be adopted. If checkpoint is not started, the no restart policy will be adopted. If the checkpoint mechanism is started, but no restart policy is specified, the fixed-delay policy will be adopted and the Integer.MAX_VALUE will be retried. Please refer to the available restart policies below to see which values are supported.
Each restart policy has its own parameters to control its behavior, these values can also be set in the configuration file, and the description of each restart policy contains its own configuration value information.
In addition to defining a default restart policy, you can also specify its own restart policy for each Job, which can be invoked programmatically by calling the setRestartStrategy () method in ExecutionEnvironment. Note that this approach also applies to StreamExecutionEnvironment.
The following example shows how to set a fixed delay restart policy for Job. If it fails, the system will try to restart every 10 seconds and restart 3 times.
Val env = ExecutionEnvironment.getExecutionEnvironment () env.setRestartStrategy (RestartStrategies.fixedDelayRestart (3, / / number of restarts Time.of (10, TimeUnit.SECONDS) / / delay interval)) fixed delay restart policy (Fixed DelayRestart Strategy)
The fixed delay restart policy will attempt to restart Job a given number of times. If the maximum number of restarts is exceeded, Job will eventually fail. The restart policy waits for a fixed amount of time between two consecutive restart attempts.
The restart policy can be enabled by configuring the following configuration parameters of flink-conf.yaml as the default restart policy:
Restart-strategy: fixed-delay
Example:
Restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
A fixed delay restart can also be set in the program:
Val env = ExecutionEnvironment.getExecutionEnvironment () env.setRestartStrategy (RestartStrategies.fixedDelayRestart (3, / / number of restarts Time.of (10, TimeUnit.SECONDS) / / restart interval)) failure rate restart policy
The failure rate restart strategy will restart after the Job fails, but after the failure rate is exceeded, the Job will eventually be judged to have failed. Between two consecutive restart attempts, the restart policy waits for a fixed amount of time.
The failure rate restart policy can be enabled by setting the following configuration parameters in flink-conf.yaml:
Restart-strategy:failure-rate
Example:
Restart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s
The failure rate restart policy can also be set in the program:
Val env = ExecutionEnvironment.getExecutionEnvironment () env.setRestartStrategy (RestartStrategies.failureRateRestart (3, / / maximum number of failures per measurement interval Time.of (5, TimeUnit.MINUTES), / / interval between failure rate measurements Time.of (10, TimeUnit.SECONDS) / / interval between two consecutive restart attempts)) No restart policy
Job fails directly and will not attempt to restart
Restart-strategy: none
No restart policy can also be set in the program.
Val env = ExecutionEnvironment.getExecutionEnvironment () env.setRestartStrategy (RestartStrategies.noRestart ()) Thank you for your reading. The above is the content of "what is the Flink checkpoint mechanism?" after the study of this article, I believe you have a deeper understanding of what the Flink checkpoint mechanism is, and the specific use needs to be verified by practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 234
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.