Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Apache Flink combined with Kafka to build end-to-end Exactly-Once processing

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Author: Piotr Nowojski

Translation | Zhou Kaibo

Alibaba, a technical expert with a master's degree from Sichuan University, joined Ali search Division after graduating in 2010, engaged in the research and development of search offline platform, and participated in the reconstruction of search background data processing architecture from MapReduce to Flink. At present, in Ali Computing platform Division, we focus on the construction of one-stop computing platform based on Flink.

Table of contents:

Exactly-Once semantics in 1.Apache Flink applications

End-to-end Exactly-Once semantics of 2.Flink applications

3. The sample Flink application starts the pre-commit phase

4. Implement two-phase commit Operator in Flink

5. Summary

Since the release of version 1.4.0 in December 2017, Apache Flink has introduced an important milestone feature for stream computing: TwoPhaseCommitSinkFunction (related Jira). It extracts the general logic of the two-phase commit protocol, which makes it possible to build end-to-end Exactly-Once programs through Flink. Supports both data source (source) and output (sink), including Apache Kafka 0.11 and later. It provides an abstraction layer where users only need to implement a few methods to implement end-to-end Exactly-Once semantics.

For more information on the use of TwoPhaseCommitSinkFunction, see the document: TwoPhaseCommitSinkFunction. Or you can directly read the document of Kafka 0.11 sink: kafka.

Next, we will analyze this new feature and the implementation logic of Flink in detail, which can be divided into the following points.

Describe how the Flink checkpoint mechanism ensures the Exactly-Once of Flink program results

Shows how Flink interacts with the data source and data output side through the two-phase commit protocol to provide end-to-end Exactly-Once assurance

Through a simple example, learn how to use TwoPhaseCommitSinkFunction to achieve the file output of Exactly-Once

I. Exactly-Once semantics in Apache Flink applications

When we say "Exactly-Once", we mean that each input event affects the final result only once. Even if the machine or software fails, the data is neither duplicated nor lost.

Flink has provided Exactly-Once semantics for a long time. Over the past few years, we have given an in-depth description of Flink's checkpoint mechanism, which is at the core of Flink's ability to provide Exactly-Once semantics. The Flink documentation also provides a comprehensive overview of this feature.

Before moving on, take a look at a brief introduction to the checkpoint mechanism, which is critical to understanding the topics that follow.

A checkpoint is a consistent snapshot of the following:

The current state of the application

Location of the input stream

Flink can configure a fixed point in time to generate checkpoint periodically and write data from checkpoint to persistent storage systems such as S3 or HDFS. Writing checkpoint data to persistent storage occurs asynchronously, which means that Flink applications can continue to process data during the checkpoint process.

In the event of a machine or software failure, the Flink application resumes processing from the latest checkpoint point after reboot; Flink restores the application state, rolls back the input stream to the location where the last checkpoint saved, and then starts running again. This means that Flink can calculate the results as if there had never been a failure.

Prior to Flink 1.4.0, Exactly-Once semantics were limited to Flink applications and did not extend to most external systems sent after Flink data processing. Flink applications interact with various data outputs, and developers need to be able to maintain the context of the components themselves to ensure Exactly-Once semantics.

In order to provide end-to-end Exactly-Once semantics-that is, external systems written by Flink also need to be able to meet Exactly-Once semantics in addition to internal Flink applications-these external systems must provide methods for commit or rollback and then be coordinated through Flink's checkpoint mechanism.

In distributed systems, the common way to coordinate commit and rollback is the two-phase commit protocol. In the next section, we will discuss how Flink's TwoPhaseCommitSinkFunction leverages the two-phase commit protocol to provide end-to-end Exactly-Once semantics.

Second, end-to-end Exactly-Once semantics of Flink applications

We will introduce the two-phase commit protocol and how it implements end-to-end Exactly-Once semantics in a Flink program that reads and writes Kafka. Kafka is a popular messaging middleware that is often used with Flink. Kafka has added support for transactions in the most recent version 0.11. This means that you now have the necessary support for reading and writing Kafaka through Flink and providing end-to-end Exactly-Once semantics.

Flink's support for end-to-end Exactly-Once semantics is not limited to Kafka; you can use it with any source / output that provides the necessary coordination mechanism. For example, Pravega, an open source streaming media storage system from DELL/EMC, can also support end-to-end Exactly-Once semantics through Flink's TwoPhaseCommitSinkFunction.

In the example program discussed today, we have:

-data source read from Kafka (KafkaConsumer built into Flink)

-window aggregation

-write the data back to the data output of Kafka (KafkaProducer built into Flink)

For the data output side to provide an Exactly-Once guarantee, it must commit all data to Kafka through a single transaction. Commit bundles all the data to be written between the two checkpoint. This ensures that written data can be rolled back in the event of a failure. But in distributed systems, there are usually multiple write tasks running concurrently, and a simple commit or rollback is not enough, because all components must be "consistent" at commit or rollback to ensure consistent results. Flink uses a two-phase commit protocol and a pre-commit phase to solve this problem.

At the beginning of checkpoint, the "pre-commit" phase of the two-phase commit agreement. When the checkpoint starts, Flink's JobManager injects the checkpoint barrier (dividing the records in the data stream into the current checkpoint and the next checkpoint) into the data stream.

Brarrier is passed between operator. For each operator, it triggers a status snapshot of the operator to be written to the state backend.

The data source holds the offset of the consumption Kafka (offset), and then passes the checkpoint barrier to the next operator.

This approach applies only if operator has an "internal" state. The so-called internal state refers to the sum value saved and managed by Flink state backend-for example, the sume value aggregated by window in the second operator. When a process has its internal state, nothing needs to be done during the pre-commit phase except that data changes need to be written to state backend before checkpoint. Flink is responsible for committing these writes correctly if the checkpoint is successful, or aborting them in the event of a failure.

Third, the sample Flink application starts the pre-submission phase

However, when the process has an "external" state, some additional processing is required. The external state is usually written to an external system, such as Kafka. In this case, in order to provide Exactly-Once guarantees, the external system must support transactions in order to integrate with the two-phase commit protocol.

The data in the example in this article needs to be written to Kafka, so the data output (Data Sink) has an external state. In this case, during the pre-commit phase, in addition to writing its state to the state backend, the data output must also pre-commit its external transaction.

When the checkpoint barrier is passed through all the operator and the triggered checkpoint callback completes successfully, the pre-commit phase ends. All triggered status snapshots are considered part of the checkpoint. The checkpoint is a snapshot of the entire application state, including pre-committed external status. If a failure occurs, we can roll back to the point in time when the snapshot was successfully completed.

The next step is to notify all operator,checkpoint that they have succeeded. This is the commit phase of the two-phase commit protocol, and JobManager issues a checkpoint completed callback for each operator in the application.

The data source and widnow operator have no external state, so these operator do not have to do anything during the commit phase. However, the data output side (Data Sink) has an external state and the external transaction should be committed at this time.

Let's summarize the above knowledge points:

-once all operator are pre-submitted, submit a commit.

-if at least one pre-commit fails, all other commits will be aborted and we will roll back to the last successfully completed checkpoint.

-after a successful pre-submission, the submitted commit needs to be guaranteed to be ultimately successful-both operator and external systems need to guarantee this. If the commit fails (for example, due to intermittent network problems), the entire Flink application will fail, the application will restart according to the user's restart policy, and an attempt will be made to resubmit. This process is critical because if the commit does not succeed in the end, it will result in data loss.

Therefore, we can confirm that all operator agree with the final result of checkpoint: all operator agree that the data has been submitted, or the submission has been aborted and rolled back.

Implement two-phase commit Operator in Flink

The full implementation of the two-phase commit protocol can be a bit complicated, which is why Flink extracts its generic logic into the abstract class TwoPhaseCommitSinkFunction.

Next, based on a simple example of output to a file, show how to use TwoPhaseCommitSinkFunction. Users only need to implement four functions to implement Exactly-Once semantics for the data output side:

-beginTransaction-before the transaction starts, we create a temporary file in the temporary directory of the target file system. We can then write the data to this file as we process the data.

-preCommit-during the pre-commit phase, we refresh the file to storage, close the file, and no longer rewrite. We will also start a new transaction for any subsequent file writes that belong to the next checkpoint.

-commit-during the commit phase, we move the pre-commit phase files atomically to the real target directory. It is important to note that this increases the delay in the visibility of the output data.

-abort-during the abort phase, we delete temporary files.

We know that in the event of any failure, Flink will restore the state of the application to the latest checkpoint point. At one extreme, the pre-submission was successful, but a failure occurred before this commit notification reached operator. In this case, Flink restores the state of the operator to a state that has been pre-committed, but not actually committed.

We need to save enough information to the checkpoint state during the pre-commit phase so that the transaction can be aborted or committed correctly after restart. In this example, this information is the path to the temporary file and the target directory.

TwoPhaseCommitSinkFunction has taken this situation into account and will give priority to issuing a commit when restoring the state from the checkpoint point. We need to implement the commit in an idempotent way, which is generally not difficult. In this example, we can recognize that the temporary file is not in the temporary directory, but has been moved to the target directory.

There are other boundary cases that will be taken into account in TwoPhaseCommitSinkFunction. Please refer to the Flink documentation for more information.

Summary

Summarize some of the main points involved in this article:

Flink's checkpoint mechanism is the basis for supporting the two-phase commit protocol and providing end-to-end Exactly-Once semantics.

The advantage of this scheme is that Flink does not transfer and store data over the network like some other systems-it does not need to write every stage of the calculation to disk, as most batch programs do.

Flink's TwoPhaseCommitSinkFunction extracts the common logic of the two-phase commit protocol, based on which it is possible to build an end-to-end Exactly-Once by combining Flink with external systems that support transactions.

Since Flink 1.4.0, both Pravega and Kafka 0.11 producer have provided Exactly-Once semantics; Kafka introduced transactions for the first time in version 0.11, making it possible to provide Exactly-Once semantics for using Kafka producer in Flink programs.

The transaction of Kafaka 0.11 producer is implemented on the basis of TwoPhaseCommitSinkFunction, with very low overhead compared to at-least-once producer.

This is an exciting feature and it is expected that Flink TwoPhaseCommitSinkFunction will support more data receivers in the future.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report