In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces you how to analyze the transaction messages in Kafka, the content is very detailed, interested friends can refer to, hope to be helpful to you.
1. Idempotent message
In order to solve the problem of message repetition and disorder caused by retry, kafka introduces idempotent messages. Idempotent messages guarantee the idempotency of messages written by producer to one partition in a session, and the Exactly Once semantics of message publication can be ensured by retrying.
The implementation logic is simple:
Differentiate producer session
After each launch of producer, first apply to broker for a globally unique pid to identify this session.
Message detection
The sequence number field is added to message_v2, and seq is added to each batch of messages sent by producer.
Broker maps in memory maintenance (pid,seq), checks the seq after receiving the message, if
New_seq=old_seq+1: normal message; new_seqold_seq+1: message lost
Producer retry
Producer will try again after receiving a clear message that the ack is missing or the ack is not received after the timeout.
two。 Transaction message
Consider that in a scenario where stream processing requires atomic write semantics of multiple messages, either all are successful or all fail, which is the problem that kafka transaction messages are trying to solve.
Transaction messages are implemented with the participation of producer, transaction coordinator, broker, group coordinator and consumer.
1) producer
Specify a fixed TransactionalId for the producer, which can continuously identify the producer through multiple sessions of the producer (producer restart / disconnect reconnection).
Use epoch to identify each "rebirth" of a producer to prevent multiple sessions of the same producer.
Producer obeys the behavior of idempotent messages and adds transaction id and epoch to the sent recordbatch.
2) transaction Coordinator (Transaction Coordinator)
The transaction coordinator is introduced to realize the transaction commit of the message in the way of two-phase commit.
The transaction coordinator uses a special topic:transaction to log the transaction commit.
The transaction controller coordinates broker and consumer coordinator to realize the two-phase commit of the transaction through RPC research.
Each broker starts a transaction coordinator and uses hash (TransactionalId) to determine the corresponding transaction coordinator for the producer to balance the load of the entire cluster.
3) broker
Broker processes the commit/abort control message in the transaction coordinator, writes the control message to topic like a normal message (interlaced with the normal message to confirm the log offset of the transaction commit), and advances the message commit offset hw.
4) Group Coordinator
If a consumption offset is committed during a transaction, the group coordinator writes the transaction consumption offset in offset log. When a transaction commits, a transaction offset confirmation message is written in offset log.
5) consumer
Consumer filters uncommitted messages and transaction control messages to make them invisible to users.
There are two ways to do this:
Consumer caching mode
Set isolation.level=read_uncommitted so that all messages from topic are visible to consumer.
Consumer caches these messages until a transaction control message is received. If the transaction commit, these messages are published; if the transaction abort, these messages are discarded.
Broker filtering mode
Set isolation.level=read_committed so that uncommitted messages in topic are not visible to consumer, and messages are visible to consumer only after the transaction ends.
Broker's BatchRecord message to consumer will contain a list indicating which are "abort" transactions, and consumer can simply discard the abort transaction.
The transaction message processing process is shown in figure 1
Figure 1 transaction message business process
Process description:
1. Find the transaction coordinator-FindCoordinatorRequest
The transaction coordinator is the core of allocating pid and managing transactions. Produer first sends FindCoordinatorRequest to any broker and finds its own transaction coordinator.
two。 Apply for pid-InitPidRequest
Immediately after that, producer sends an InitPidRequest to the transaction coordinator to request the generation of pid.
2a. When transactional.id is specified, the transaction coordinator pid the producer partition and updates the epoch to write the mapping of (tid,pid) to the transaction log. At the same time, clean up any outstanding transactions in tid and discard uncommitted messages.
3. Start a transaction
Starting a transaction is a local operation of producer that forces producer to update its internal state and does not have a relationship with the transaction coordinator.
The transaction coordinator automatically starts the transaction and is always in one transaction state machine after another.
4. Consume-transform-produce transaction loop
4.1. Register partition-AddPartitionsToTxnRequest
For each topic partition that wants to write a message in a transaction, the producer should register the partition with the transaction processor before the first message is sent.
4.1a. The transaction processor writes the partition associated with the transaction to the transaction log.
When committing or terminating a transaction, the transaction coordinator needs this information to control all partitions involved in the transaction leader to complete the transaction commit or termination.
4.2. Write messages-- ProduceRequest
4.2a. Producer writes a message to the partition leader, which contains tid,pid,epoch and seq.
4.3. Submit consumption offset-AddOffsetCommitsToTxnRequest
4.3a. Producer sends a consumption offset to the transaction coordinator, which records the offset information in the transaction log and returns the group coordinator to producer.
4.4. Submit consumption offset-TxnOffsetCommitRequest
4.4a. Producer sends a TxnOffsetCommitRequest to the group coordinator, which writes offset information to the offset log. However, this offset does not take effect until the transaction is committed and is visible externally.
5. Commit or terminate a transaction
5.1. EndTxnRequest
When a request is received to commit or terminate a transaction, the transaction processor does the following:
1. Write PREPARE_COMMIT or PREPARE_ABORT messages (5.1a) to the transaction log.
two。 Transaction control messages (5.2) are sent to all broker in the transaction through WriteTxnMarkerRequest.
3. Write a COMMITTED or ABORTED message on the date of the transaction (5.3).
5.2. WriteTxnMarkerRequest
This message is sent by the transaction processor to the leader of the partition involved in the transaction.
When this message is received, broker writes a COMMIT or ABORT control message to the partition log. At the same time, the transaction commit offset hw for the partition is updated.
If there is a commit consumption offset in the transaction, broker also writes a control message to _ _ consumer-offsets log and notifies the group coordinator to make the committed consumption offset in the transaction take effect.
5.3. Write the final commit or abort message
When all commit or abort messages are written to the data log, the transaction coordinator writes the transaction log in the transaction log, marking the end of the transaction.
At this point, all the status information of this transaction can be deleted and a new transaction can be started.
There are many details in the implementation, for example, the transaction coordinator starts a timer to detect and terminate transactions that have been inactive for a long time since they started. For more information, please refer to the kafka community technical documentation listed below.
We need to realize that although kafka transaction messages provide guarantees for multiple message atomic writes, it does not guarantee atomic reads.
For example,
1) the transaction writes messages to the topic_a and topic_b partitions, and at some point after the transaction is committed, all copies of topic_a are invalidated. At this point, the messages in topic_b can be consumed normally, but the messages in topic_a are lost. 2) if consumer only consumes topic_a and does not consume topic_b, you cannot read the complete transaction message. 3) A typical kafka stream application consumes from multiple topic and then writes to one or more topic. After a failure, the kafka stream application starts processing stream data again, and because there is no stable order between data read from multiple topic (even if there is only one topic, there is no stable order between data read from multiple partitions), then the output of the two processing may be different.
In other words, although kafka log persists data and can consume data multiple times by specifying offset, the result of each processing output is different because of the disorder between partition data. This makes it impossible for kafka stream to be reexecuted at any time like hadoop batch tasks, ensuring that the results of each execution are the same. Unless we read data from only one topic partition.
This is the end of the message about how to parse the transactions in Kafka. I hope the above content can be helpful to you and learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.