In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you a brief analysis of the implementation of Flink Exactly-Once delivery. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
Author: Paul Lin Source: https://www.whitewood.me
With the recent migration of more and more businesses to Flink, the accuracy of Flink jobs is further improved, of which the most important thing is how to ensure the delivery semantics of exactly-once in different business scenarios. Although many real-time systems (e.g. Real-time computing / message queuing) both claim to support exactly-once,exactly-once delivery seems to be a solved problem, but in fact they are more aimed at information delivery between internal modules, such as exactly-once for Kafka production (producer to Kafka broker) and consumption (broker to consumer). Flink as a real-time computing engine, in the actual scenario business will involve many different components, due to the different characteristics and positioning of components, Flink does not support exactly-once for all components (see [1]), and different components have different methods to implement exactly-once, some implementations may bring side effects or usage limitations. Therefore, an in-depth understanding of the implementation mechanism of Flink exactly-once is of great significance for the design of a stable and reliable architecture.
Below, the difficulties and implementation schemes of exactly-once are analyzed in detail based on Flink, and these conclusions can also be extended to other real-time systems, especially streaming computing systems. Exactly-Once difficulty analysis because inter-process coordination in distributed systems needs to go through the network, and the network situation is unpredictable in many cases, usually three situations should be considered in sending messages: normal return, error return and timeout, in which error return can be divided into retry error return (e.g. Database maintenance is temporarily unavailable) and non-retry error return (e.g. Authentication error), and the retry error return and timeout will cause the message to be resent, which may cause the duplicate message to be received downstream, that is, the delivery semantics of at-least-once. On the other hand, exactly-once adds a mechanism on the basis of at-least-once that can recognize retransmitted data or package messages as idempotent operations. In fact, the exactly-once delivery of messages is not a new topic generated by distributed systems (although it generally refers to the distributed domain of exactly-once), as early as in the early days of the development of computing networks, the TCP protocol has achieved reliable network transmission. The exactly-once implementation of the TCP protocol makes message delivery stateful: first establish a connection synchronously, then add an incremental sequence number (sequence number) to each packet sent, and then release the connection synchronously after it is sent. Because both the sender and the receiver keep status information (the sequence number of the sent packet / the sequence number of the received packet), they can know which packets are missing or duplicated. In a distributed environment, exactly-once is more complex. The biggest difference is that distributed systems need to tolerate process crashes and node loss, which brings many problems, such as the following common ones:
The process state needs to be persisted to reliable distributed storage to prevent the loss of state caused by the loss of nodes.
Because sending a message is a two-stage operation (that is, sending a message and receiving an acknowledgment from the other party), there is no way for the process after restart to determine whether a message has been sent with the current sequence number before the crash, which may lead to the reuse of sequence numbers.
The process that is thought to have crashed may not have exited, and then connected again to the zombie process to continue to send data.
Point 2 and point 3 are actually the same problem, that is, you need to distinguish between the original process and the restarted process. There is a mature solution to this problem in the industry: epoch is introduced to represent different generations of processes and a distributed coordination system is used to manage it. Although there are still some details, but on the whole, it is not a big problem. However, the first problem has caused a more far-reaching impact, that is, in order to reduce the cost of IO, the preservation of state must be micro-batching rather than streaming, which will cause the preservation of state to always lag behind the progress of stream computing, so it is necessary to implement transaction rollback in order to ensure the exactly-once stream computing engine. Stateful Exactly-Once and end-to-end Exactly-OnceFlink provide the state delivery semantics of exactly-once, which ensures the accuracy of stateful (stateful) computation. One of the more confusing points is the stateful delivery semantics and the more common end-to-end (end to end) delivery semantics, and the implementation of the former is the prerequisite for the latter. Flink began to provide State API from version 0.9, marking the beginning of the Stateful Streaming era for Flink. State API is simply a "process-independent" data structure, and its naming conventions are consistent with common data structures, such as MapState and ListState. Both Flink's official operators (such as KafkaSource) and user-developed operators can use State API to save state information. Like most distributed systems, Flink uses snapshots to periodically synchronize the state of the entire job to external storage, that is, the information saved by State API is stored in the form of serialization, and when the job is restored, the job can be restored to the previous state at a certain point in time by reading the external storage. Because restoring from a snapshot also rolls back the processing progress of the data stream, State is a natural exactly-once delivery. End-to-end consistency requires the cooperation of upstream and downstream external systems, because Flink cannot save their states to snapshots and roll them back independently, otherwise they would not be called external systems. Generally speaking, the upstream of Flink is pull-based persistent storage that can be read repeatedly or consumed, so to implement exactly-once on the source side, you only need to roll back the read progress of source (e.g. Kafka offset). The exactly-once on the sink side is more complex because the sink is push-based. As the saying goes, it is not easy to withdraw the message sent, because it requires that a series of responses made by the downstream based on the news can be withdrawn. This requires State API to save the metadata of the sent message and record which data needs to be rolled back after reboot. The following will examine how Flink implements exactly-once Sink. Exactly-Once Sink principle Flink's exactly-once sink is based on snapshot mechanism. According to the implementation principle, it can be divided into two types: Idempotent sink and Transactional sink. Idempotent Sink idempotency is a very useful feature in the distributed domain, which means that the same operation can be executed once and multiple times can get the same result, so at-least-once is naturally equivalent to exactly-once. In this way, idempotent sink does not need to withdraw sent messages to the external system when restoring from the snapshot, which is equivalent to avoiding the problem of state rollback of the external system. For example, when writing to a sink in a KV database, because the operation of inserting a row is idempotent, the sink can be stateless and does not need to care about the state of the external system when the error is recovered. In a sense, the TCP protocol mentioned above also takes advantage of the idempotence of sending packets to guarantee exactly-once. However, the application scenario of idempotent sink depends on business logic. If the downstream business cannot guarantee idempotency, transactional sink is needed. Transactional Sink transactional sink, as its name implies, is similar to traditional DBMS transactions, wrapping all the output of a series (usually within a checkpoint) into a logical unit, and ideally provides ACID transaction guarantee. The reason why it is said to be "ideal" is mainly because sink depends on the transaction guarantee of the target output system, while the support for transactions in distributed systems is not necessarily very complete. For example, HBase does not support cross-line transactions, and file systems such as HDFS do not provide transactions. In this case, sink can only package another layer on the basis of the client to do its best to provide transaction guarantee. However, only the transaction guarantee provided by the downstream system itself is not enough for the exactly-once sink, because there will be multiple subtask of the same sink, which are in different sessions and transactions for the downstream system, and can not guarantee the atomicity of the operation, so the exactly-once sink also needs to implement distributed transactions to achieve the consistent commit or rollback of all subtask. Because the sink transaction life cycle is one-to-one corresponding to checkpoint, or checkpoint is a distributed transaction that persists job state, the distributed transaction of sink can also be realized through hook provided by checkpoint mechanism. The hook provided by Checkpoint to the operator is CheckpointedFunction and CheckpointListener, the former is called when the operator takes a checkpoint snapshot, and the latter is called after the checkpoint is successful. For simplicity, Flink combines the above two interfaces to abstract the general logic abstract TwoPhaseCommitSinkFunction interface of exactly-once sink. It can be seen from the naming that this is an implementation of the two-phase commit protocol. The main methods are as follows:
BeginTransaction: initializes a transaction. Called when new data arrives and the current transaction is empty.
PreCommit: pre-commit data, that is, no longer write to the current transaction and ready to commit the current transaction. Called when the sink operator takes a snapshot.
Commit: formally commits the data and commits the prepared transaction. Called when the checkpoint of the job is complete.
Abort: abandon the transaction. Called when the job checkpoint fails.
Let's take Bucketing File Sink as an example to illustrate how to implement transactional sink based on asynchronous checkpoint. Bucketing File Sink is a FileSystem Connector provided by Flink to write data streams to fixed-size files. Bucketing File Sink divides the file into three states, in-progress/pending/committed, which represents the file being written, the file that is ready to be submitted, and the file that has been submitted. At run time, Bucketing File Sink first opens a temporary file and constantly writes the received data (equivalent to the beginTransaction step of the transaction), when the file is in in-progress. Until the file exceeds the threshold or no new data is written for a period of time, the file closes and changes to the pending state (equivalent to the pre-commit step of a transaction). Because Flink checkpoint is asynchronous, multiple concurrent checkpoint,Bucketing File Sink may record the checkpoint epoch corresponding to the pending file. When the checkpoint of an epoch is completed, the Bucketing File Sink will receive the callback and change the corresponding file to the committed state. This is done by renaming the atomic operation, so it is guaranteed that the pre-commit transaction either commit succeeds or commit fails, and no other intermediate state occurs. An error in Commit will cause the job to restart automatically. After restart, the Bucketing File Sink itself has been restored to the state of the last checkpoint, but the state of the file system still needs to be restored to ensure consistency. After recovering from checkpoint, the corresponding transaction will retry commit, it will change the recorded pending file to the committed state, the recorded in-progress file truncate to the offset recorded by checkpoint, and the remaining unrecorded pending files and in-progress files will be deleted. The above mainly revolves around the two points of AC of transaction guarantee (Atomicity and Consistency), and Flink exactly-once sink has different implementation methods on I (Isolation). In fact, due to the stream computing characteristics of Flink, the non-commit data of the current transaction is accumulated all the time. According to the different places where the non-commit data are cached, transactional sink can be divided into two implementation methods.
The non-commit data is cached on the sink side, and the cached data is flush downstream after the checkpoint is completed. This method can provide the transaction isolation level of read-committed, but at the same time, because the non-commit data will not be sent downstream (synchronized with checkpoint), the cache on the sink side will bring a certain delay, which is equivalent to degenerating to the micro-batching mode synchronized with checkpoint.
Cache the uncommit data in the downstream system, and notify the downstream commit when the checkpoint is completed. The advantage of this is that the data is streamed downstream, there is no peak of network IO after each checkpoint completion, and the transaction isolation level can be set by the downstream, which can choose read-uncommitted with low latency and weak consistency or read-committed with high latency and strong consistency.
In the case of Bucketing File Sink, files in both in-progress and pending states are hidden by default (in practice, underscores are used as file name prefixes, which are filtered out by HDFS's FileInputFormat), and files are visible to users only after commit is successful, that is, providing read-committed transaction isolation. Ideally, exactly-once sink should use the way that uncommit data is cached on downstream systems, because this is most in line with the idea of streaming computing. The most typical is that the downstream system already supports transactions, so the data without commit is naturally cached in the downstream system, otherwise sink can choose to implement its own transactions at the user level of the downstream system like the Bucketing File Sink in the above example, or fallback to wait for the data to become committed before sending out the micro-batching mode. Summarizing Exactly-once is the most critical accuracy requirement of real-time systems, and it is also one of the problems that limit the application of most distributed real-time systems to business scenarios with higher accuracy requirements (such as online transaction processing OLTP). At present, the exactly-once of streaming computing has made a great breakthrough in theory, and the Flink community is also actively absorbing the most advanced ideas and practical experience. With the increasing maturity of Flink technology on exactly-once, combined with the stream processing characteristics of Flink itself, it is believed that in the near future, in addition to constructing data analysis and data pipeline applications, Flink can also occupy a place in the field of micro-services. The above is the realization analysis of Flink Exactly-Once delivery shared by the editor. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.