Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to deeply parse FaultTolerance in ApacheFlink

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

It is believed that many inexperienced people have no idea about how to deeply analyze FaultTolerance in ApacheFlink. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Summary: practical problems in the stream computing scenario, data will flow into the Apache Flink system continuously, and each piece of data entering the Apache Flink system will trigger computing. So what will Apache Flink do if Task fails to run due to network, machine and other reasons in the process of calculation? We introduced that Apache Flink will use State to record the status of the calculation, and during Failover, Task will recover according to State.

In the streaming computing scenario, data will flow into the Apache Flink system continuously, and each piece of data entering the Apache Flink system will trigger computing. So what will Apache Flink do if Task fails to run due to network, machine and other reasons in the process of calculation? In the article "Apache Flink talk Series-State", we introduced that Apache Flink uses State to record the status of calculations, and during Failover, Task recovers based on State. But how is the content of State recorded? How does Apache Flink guarantee the semantics of Exactly-Once? This involves the fault tolerance (Fault Tolerance) mechanism of Apache Flink, and this article will introduce the relevant content for you.

Fault tolerance (Fault Tolerance) refers to the tolerance of faults, which can be automatically detected when a fault occurs, and enable the system to automatically return to normal operation. When some specified network failures, hardware failures, and software errors occur, the system can still execute a specified set of programs, or the program will not be terminated because of the fault in the system, and the execution result does not include the errors caused by the system failure.

We know that MySql's binlog is an Append Only log file, Mysql's active / standby replication is the main way of high availability, and binlog is the core means of active / standby replication (of course, mysql high availability details are complex and there are many different optimization points, such as pure asynchronous replication is optimized to semi-synchronous and synchronous replication to ensure that the synchronous network of master and slave caused by asynchronous replication binlog goes down, resulting in inconsistency between master and slave, etc.). Mysql active / standby replication is part of the Mysql fault tolerance mechanism, which also includes transaction control. In traditional databases, things can be set at different levels to ensure different data quality, ranging from low to high as follows:

Read uncommitted-read uncommitted, which means that one transaction can read data from another uncommitted transaction. So this kind of thing has the lowest cost of control, but it will cause another thing to read dirty data, so how to solve the problem of reading dirty data? Take advantage of Read committed level.

Read committed-read commit, which means that one transaction cannot read data until another transaction commits. This level can solve the problem of reading dirty data, so what's wrong with this level? There is also a problem that can not be read repeatedly at this level, that is, if you open a read thing T1, first read the field F1 value is V1, and then another thing T2 can UPDATA the field value V2, resulting in T1 getting V2 when reading the field value again, and the two reads in the same thing are inconsistent. So how to solve the problem of unrepeatable reading? Take advantage of Repeatable read level.

Repeatable read-repeat read, which means that modification operations are no longer allowed when the data is started to be read (transaction is open). Repeated reading mode has to wait for things in order, and requires a certain cost to achieve high-quality data and information, so is there any problem with repeated reading? Yes, another problem with repeat reading level is phantom reading. The cause of phantom reading is INSERT, so how to solve it? Take advantage of Serializable level.

Serializable-serialization is the highest transaction isolation level at which transaction serialization is executed sequentially, avoiding dirty reads, unrepeatable reads, and phantom reads. However, this transaction isolation level is inefficient, consumes database performance, and is generally not used.

Active and standby replication and thing control are all fault-tolerant mechanisms of traditional databases.

One of the great challenges of stream computing Fault Tolerance is low latency. Many Apache Flink tasks are 7 x 24 hours uninterrupted, with end-to-end second delay. It is extremely difficult to quickly return to normal in case of unexpected problems such as network flash break and machine failure, and does not affect the correctness of the calculation results. At the same time, in addition to the low latency requirements of flow computing, there are also the challenges of computing modes above. How to support Exactly-Once and At-Least-Once computing modes in Apache Flink, how to avoid repeated calculation in Failover, and then accurately achieve Exactly-Once is also the key problem to be solved in stream computing Fault Tolerance.

The core of Apache Flink's Fault Tolerance mechanism is to continuously create snapshots of distributed stream data and their status. These snapshots act as a fallback point when the system encounters a failure. The theoretical basis of the mechanism for creating snapshots in Apache Flink, called Checkpointing,Checkpointing, is described in detail in Lightweight Asynchronous Snapshots for Distributed Dataflows. The mechanism comes from Determining-Global-States-of-a-Distributed-System Paper published by K. MANI CHANDY and LESLIE LAMPORT, which describes how to solve the problem of global state consistency in distributed systems.

Using the mechanism of Checkpointing for fault tolerance in Apache Flink, Checkpointing produces data files similar to binlog that can be used to restore the state of the task. There are also semantic controls for data computing similar to database transaction control in Apache Flink, such as At-Least-Once and Exactly-Once.

As we mentioned above, Checkpointing is the core mechanism of Fault Tolerance in Apache Flink. We create snapshots containing stateful Operator such as timer,connector,window,user-defined state in the way of Checkpointing. In the global state consistency algorithm of Determining-Global-States-of-a-Distributed-System, the problem of global state alignment is described emphatically. In Lightweight Asynchronous Snapshots for Distributed Dataflows, the core describes the way of alignment, and in Apache Flink, asynchronous snapshots in DAG are completed by inserting barrier in the flow information. The following figure (from Lightweight Asynchronous Snapshots for Distributed Dataflows) describes Asynchronous barrier snapshots for acyclic graphs, which is also the approach used in Apache Flink.

The figure above depicts a Job logic for incremental computing word count. The core logic is as follows:

The barrier is issued by the source node

Barrier will split the event on the stream into different checkpoint

Multi-streaming barrier converged to the current node to be aligned

After barrier alignment, Checkpointing is performed to generate snapshot

After completing the snapshot, issue the barrier downstream and continue until the Sink node

In this way, Checkpointing is performed in barrier mode in the whole flow calculation. With the passage of time, Checkpointing is carried out continuously in the whole flow calculation process, as shown in the following figure:

The generated snapshot is stored in StateBackend, and an introduction to the State can be found in the "Apache Flink talk Series-State". This allows you to recover from the last successful checkpoint during the Failover.

As we learned above, we will continue to Checkpointing and generate snapshot and store it in Statebackend over time, so how often do we do Checkpointing? How is the resulting snapshot persisted? With these questions in mind, let's see how Apache Flink controls Checkpointing. What are the configurable parameters: (these parameters are defined in CheckpointCoordinator)

CheckpointMode-checkpoint mode, which is divided into AT_LEAST_ONCE and EXACTLY_ONCE modes

CheckpointInterval-checkpoint interval in milliseconds.

CheckpointTimeout-checkpoint timeout in milliseconds.

There are some other configurations in Apache Flink, such as whether to delete checkpoints data stored in external storage. If not, even if the job is dropped by cancel, the checkpoint information will not be deleted. When you restore job, you can use checkpoint for state recovery. We have two configuration methods, as follows:

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION-when job is cancel, externally stored checkpoints is not deleted.

ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION-when job is cancel, the externally stored checkpoints is deleted.

From the above, we know that Exactly-Once and At-Least-Once in Apache Flink are only in the configuration mode of checkpointing, and the principle of checkpointing in the two modes is the same, so what is the essential difference in implementation?

Semantic meaning

At-Least-Once-semantics means that all data on the stream has been processed at least once (do not lose data)

Exactly-Once-semantics means that all data on the stream must be processed and can only be processed once (no data is lost and cannot be repeated)

Semantically, Exactly-Once has more stringent and accurate requirements for data processing than At-Least-Once, so a higher requirement means a higher cost, and the price here is delay.

Realize

So what's the difference between At-Least-Once and Exactly-Once in implementing the above Apache Flink? The difference is reflected in the case of multiple inputs (such as Join). When all input barrier is not fully arrived, the early event is cached in Exactly-Once mode (no processing), while in At-Least-Once mode, even if all input barrier is not fully arrived, the early event is processed. In other words, for At-Least-Once mode, for downstream nodes, the data that originally belongs to checkpoint N may also have been processed in checkpoint NMUI 1.

I'll take Exactly-Once as an example to illustrate why Exactly-Once mode has a higher latency than At-Least-Once mode. As shown below:

The figure above shows the process of a node performing Checkpointing:

The barrier alignment phase begins when the Operator receives a barrier sent from the upstream.

Data from input that arrives early during alignment will be cached in buffer

When Operator receives all upstream barrier, the current Operator will Checkpointing, generate snapshot and persist

Broadcast the barrier to the downstream Operator after Checkpointing

When the multi-input barrier is not aligned, the first-arrived input data of the barrier will be cached in the buffer and will not be processed, so the more data in the buffer downstream, the greater the delay. The benefit of this delay is that the data (calculation result or event) recorded by the adjacent Checkpointing is not duplicated. Relative to At-Least-Once mode data will not be buffer, the advantage of reducing latency is at the cost of tolerating data duplication.

In the code implementation of Apache Flink, the CheckpointBarrierHandler class is used to deal with barrier, and its core interface is:

Public interface CheckpointBarrierHandler {

...

BufferOrEvent getNextNonBlocked () throws Exception

...

}

BufferOrEvent may be a normal data event or a special event, such as barrier event. There are two different implementations for At-Least-Once and Exactly-Once, as follows:

Exactly-Once mode-BarrierBuffer

BarrierBuffer is used to provide Exactly-Once consistency assurance, which behaves as follows: it blocks input with barrier until all inputs receive barrier based on a checkpoint, which is called alignment. To avoid the backpressure input stream, BarrierBuffer will continuously receive buffer from the blocked channel and store them internally until the blocking is released.

BarrierBuffer implements the getNextNonBlocked of CheckpointBarrierHandler, which is used to get the next record to be processed. This method blocks the call until the next record is obtained. There are two kinds of records here, one is from upstream inputs that are not marked as blocked, such as event (a) in the image above, and the other is the records released from the buffer queue in the blocked input, such as event in the image above.

At-Least-Once mode-BarrierTracker

BarrierTracker tracks the barrier of the checkpoint received by each input. Once it observes that all barrier of a checkpoint has arrived, it notifies the listener that the checkpoint is complete to trigger the corresponding callback processing. Unlike the processing logic of BarrierBuffer, BarrierTracker does not block the input that has been sent to barrier, which means that the alignment mechanism is not used, so the data of this checkpoint will be processed in time, and therefore the data of the next checkpoint may arrive before the checkpoint is completed. This can only provide the semantic guarantee of At-Least-Once during recovery.

BarrierTracker also implements CheckpointBarrierHandler's getNextNonBlocked, which is used to get the next record to be processed. Compared with BarrierBuffer, it is easy to implement, just blocking to get the event to be processed.

The core difference between the above two CheckpointBarrierHandler implementations is that BarrierBuffer maintains whether the multi-input is blocked or not, caching the record of the input by blocked. As the saying goes, there are gains and losses, and there are gains and losses. Being willing to be willing is also slightly reflected here, ha.

In the "Apache Flink talk series-State", we have introduced what Apache Flink stores in State. For example, in connector, OperatorState is used to record the offset of the read location, then the execution diagram of a complete Apache Flink task is a DAG. Above we describe the process of a node in DAG, so what is the process of Checkpointing as a whole? What is the process of generating checkpoint and distributed persistence to HDFS?

Overall Checkpointing process

In the figure above, we see a complete Apache Flink Job Checkpointing process. JM triggers Soruce to transmit barriers. When an Operator receives the barrier sent upstream, it starts barrier processing. The whole Checkpointing is carried out node by node according to DAG, and persisted to Statebackend, all the way to the sink node of DAG.

Incremental Checkpointing

For a stream computing task, data will flow in continuously, such as dual-stream join (Apache Flink chat series-Join will be described in detail). Because there is a sequence problem with the arrival of the stream event on both sides, we must store both left and right data in state, Left event inflow will join data in Right State, and Right event inflow will join data in Left State. The data on the left and right sides of the figure below will be persisted to State:

Due to the continuous flow of data on the stream, with the increase of time, the snapshot files (sst files of RocksDB) generated by each checkpoint will become very large, increasing the network IO and lengthening the checkpoint time, resulting in unable to complete the checkpoint, resulting in Apache Flink losing the ability of Failover. In order to solve the problem of growing checkpoint, Apache Flink has implemented Incremental Checkpointing internally. This incremental checkpoint mechanism will greatly reduce the checkpoint time, and if the business data is stable, the time of each checkpoint is relatively stable. Set the interval of checkpoint according to different business requirements, and Checkpointing stably and quickly to ensure that the Apache Flink task can smoothly Failover in case of failure. The benefits of Incremental Checkpointing optimization to hundreds of Apache Flink task nodes are self-evident.

According to the above introduction, we know that Apache Flink supports Exactly-Once semantics internally. To achieve end-to-end (Soruce-to-Sink) Exactly-Once, you need the support of external Soruce and Sink of Apache Flink, as follows:

Fault tolerance requirements of external Source

Apache Flink needs the support of external Source to achieve the Exactly-Once of End-to-End. For example, as mentioned above, Apache Flink's Checkpointing mechanism records the read Position on the Source node, then the external Source needs to provide Position for reading data and support data reading according to Position.

Fault tolerance requirements of external Sink

It is relatively difficult for Apache Flink to achieve the Exactly-Once of End-to-End. Take Kafka as an example. When the Sink Operator node goes down, according to the fault-tolerant guarantee of the internal Exactly-Once mode of Apache Flink, the system will roll back to the last successful Checkpoint to continue writing. However, after the last successful checkpoint, some new data has been written to kafka before the current checkpoint is completed. Apache Flink continues to write to kafka since the last successful checkpoint, which causes kafka to receive the same data from Sink Operator again, thus breaking the Exactly-Once semantics of End-to-End (repeated writing becomes At-Least-Once). If you want to solve this problem, Apache Flink uses Two Phase Commit (two-phase commit) to deal with it. In essence, the Sink Operator needs to perceive the completion of the overall Checkpoint and write the calculation results to the Kafka when the overall Checkpoint is completed.

After reading the above, have you mastered how to deeply parse FaultTolerance in ApacheFlink? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report