Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement the transaction message process of RocketMQ and Kafka

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How is the transaction message process realized by RocketMQ and Kafka? in order to solve this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.

Today we are going to talk about the transaction messages of message queues. When we talk about transactions, I believe we are all familiar with them. What pops up in our minds is ACID.

Usually we understand the transaction is for some update operations are either successful or failed, there will be no intermediate state, and ACID is a strict definition of transaction implementation, but in a single system generally do not strictly follow the constraints of ACID to implement transactions, let alone distributed systems.

Distributed systems often can only compromise to the final consistency, to ensure the ultimate integrity and consistency of data, the main reason is that the strength does not allow. Because usability is king.

And to ensure that the full version of the transaction implementation is very expensive, if you want to maintain so much system data, no intermediate state data is allowed to be read, and all operations must be inseparable, which means that the execution of a transaction is blocked and resources are locked for a long time.

In the case of high concurrency, resources are occupied for a long time, which is fatal injury. Take a delicious example, such as the peak period of going to the toilet.

By the way, if you don't know what ACID is, please check it out quickly. I won't talk about it here.

Distributed transaction

When it comes to distributed transactions, the common ones are 2PC, TCC, and transaction messages. The focus below is to introduce transaction messages, but I'll mention 2PC and TCC a little bit.

2PC

2PC is a two-stage submission, with two roles of coordinator and participant, and the two stages are the preparation phase and the submission phase respectively.

The preparation phase is when the coordinator sends the prepare command to each participant. In this stage, the participant does everything except the transaction commit, and the commit phase means that the coordinator sees that each participant's preparation phase is not ok, if there is an ok, then send a commit command to each participant, and if there is a non-ok, then send a rollback command.

The point here is that 2PC only applies to transactions at the database level. What does that mean? That is, you want to write a piece of data in the database and upload a picture at the same time, these two operations 2PC can not guarantee that the two operations meet the transaction constraints.

And 2PC is a highly consistent distributed transaction, it is synchronous blocking, that is, before receiving the commit or rollback command, all participants are waiting for each other, especially after the preparatory phase, when the resources are locked. If one participant is stuck for a long time, other participants have to wait for it, resulting in blocking in the resource locked state for a long time.

Generally speaking, the efficiency is low, and there is a single point of failure. The coordinator is that single point, and there is a risk of data inconsistency under extreme conditions. For example, a participant does not receive a commit order and goes down at this time. After recovery, the data is rolled back, and all other participants have actually executed the order to commit the transaction.

TCC

TCC can guarantee business-level transactions, that is, it is not just at the database level, it can also upload images above.

TCC is divided into three stages: try-confirm-cancel. To put it simply, every business needs these three methods. First, the try method is executed. At this stage, no real business operations will be done, only a hole will be occupied first. What does it mean? For example, if you plan to add 10 points, first add the 10 points in the pre-added field. At this time, the points on the user's account are actually not increased.

Then if all the try are successful, then execute the confirm method, and everyone will do the real business operation. If a try fails, then everyone will perform the cancel operation to undo the changes just made.

You can see that TCC is actually very coupled to the business, because the business needs to make certain changes to complete these three methods, which is actually the disadvantage of TCC, and confirm and cancel operations should pay attention to idempotency, because there is no retreat to implement these two steps, it must be completed, so there needs to be a retry mechanism, so you need to ensure that the method is idempotent.

Transaction message

Transaction message is the protagonist of today's article, it is mainly suitable for asynchronous update scenarios, and data real-time requirements are not high.

Its purpose is to solve the problem of data consistency between message producers and message consumers.

For example, if you order takeout, we first choose fried chicken to add to the shopping cart, then choose a bottle of Coke, and then place an order, and the process ends after payment.

On the other hand, the data in the shopping cart is very suitable to be deleted asynchronously by message, because generally speaking, we will not order this store's menu again, and even if we order these items in the shopping cart, it doesn't matter.

What we hope is that the items in the shopping cart will eventually be deleted after the order is placed successfully, so the point is that both placing the order and sending the message will either succeed or fail.

RocketMQ transaction message

Let's first take a look at how RocketMQ implements transaction messages.

RocketMQ's transaction message can also be thought of as a two-phase commit, which simply sends an one-and-a-half message to Broker at the beginning of the transaction.

Half-message means that the message is not visible to Consumer at this time, and it does not exist in the queue that is actually to be sent, but a special queue.

Execute the local transaction after sending half the message, and then decide whether to send the commit message or the rollback message to Broker according to the execution result of the local transaction.

What if someone says that this step failed to send a commit or rollback message?

The impact is small. Broker will regularly check with Producer whether the transaction is successful. Specifically, Producer needs to expose an interface through which Broker can know whether the transaction has been executed successfully or not. If it is not successful, the unknown will be returned, because it is possible that the transaction is still executing and will be queried multiple times.

If successful, the half-message is restored to the normal queue to be sent, so that consumers can consume the message.

Let's take a simple look at how to use it. I simplified it according to the sample code on the official website.

You can see that it is very simple and intuitive to use, just add one more method to check the result of the transaction, and then write the process of the local transaction execution in TransationListener.

At this point, the general flow of the RocketMQ transaction message is clear. Let's draw an overall flow chart to go over it. In fact, in the fourth step, the message is either a normal message or abandons nothing, and the transaction message has ended its life cycle.

Analysis of RocketMQ transaction message Source Code

Then let's look at how it is done from the point of view of the source code. first of all, let's take a look at the sendMessageInTransaction method, which is a bit long, but the structure is still very clear.

The process, which we analyzed above, stuffed the message into some attributes to indicate whether the message is still half-message at this time, then sent it to Broker, then executed the local transaction, and then sent the execution status of the local transaction to Broker. Now let's take a look at how Broker handles this message.

This semi-message request will be processed in Broker's SendMessageProcessor#sendMessage, because today we mainly analyze transaction messages, so other processes are not analyzed, so I'll outline the principle.

To put it simply, if you find that the MessageConst.PROPERTY_TRANSACTION_PREPARED in the attribute of the accepted message in sendMessage is true, then you can know that the message is a transaction message, and then determine whether the message exceeds the maximum number of consumption, whether to delay, and whether Broker accepts transaction messages and other operations, store the real topic and queue of the message in the attribute, and then reset the message's topic to RMQ_SYS_TRANS_HALF_TOPIC. And the queue is 0, making it impossible for consumers to read the message.

This is the overall process of dealing with half-message. Let's take a look at the source code.

Even if there is a beaver cat to change the prince, in fact, the delayed message is also realized in this way, and the news of the change of skin will eventually be put on the market.

The way Broker handles commit or rollback messages is EndTransactionProcessor#processRequest. Let's take a look at what it does.

As you can see, if the transaction is committed, the skin is swapped back and written back to the queue to which the real topic belongs for consumption by consumers. In the case of rollback, the half-message is recorded under a half_op topic. When the backend service scans the half-message, it will determine that the message has been processed.

The backend service is the TransactionalMessageCheckService service, which periodically scans the semi-message queue to request the interface to check whether the transaction is successful. The specific execution is the TransactionalMessageServiceImpl#check method.

I would like to talk about the process, this step actually involves a lot of code, I will not post the code, interested students understand. But I believe it can be explained clearly in language.

First of all, take the half-message topic, that is, all the queues under the RMQ_SYS_TRANS_HALF_TOPIC. If you still remember the above, you will know that the queue written by the half-message is the queue whose id is 0, and then take out the queue under the half_op topic corresponding to this queue, that is, the queue under the RMQ_SYS_TRANS_OP_HALF_TOPIC topic.

The main purpose of this half_op is to record that the transaction message has been processed, that is, a message that already knows whether the transaction message is committed or rolled back will be recorded in the half_op.

Then call the fillOpRemoveMap method, fetch a batch of processed messages from half_op to repeat, call putBackHalfMsgQueue for those semi-messages that are not recorded in half_op and write them to commitlog, and then send a transaction reverse check request, which is also oneWay, that is, it will not wait for a response. Of course, the consumption offset of semi-message queues will also be promoted at this time.

Then the ClientRemotingProcessor#processRequest in producer will process the request, throw the task into the thread pool of TransactionMQProducer, and eventually call the checkLocalTransactionState method defined above when we sent the message, and then send the transaction status to Broker, also using oneWay.

If you see this, I believe you will have some questions, such as why there is a half_op and why half a message is processed and then written into commitlog. Don't listen to me one by one.

First of all, the design of RocketMQ is to sequentially append writes, so it will not change the message that has been entered the disk, and the number of times that the transaction message needs to be updated and checked back. If the reverse check fails, the transaction will be rolled back.

Therefore, every time we have to check back, we will put the previous half-message back into the market again, and push forward the progress of consumption. On the other hand, half_op will record the result of each reverse check, whether it is a commit or a rollback, so the next time it loops to process this half of the message, you can know from half_op that the transaction is over, so it is filtered out and does not need to be processed.

If the result of the reverse check is UNKNOW, the result will not be recorded in the half_op, so it can be checked again and the number of times can be updated.

Now that the whole process is clear, I'll draw a diagram to summarize the transaction flow of Broker.

Kafka transaction message

The transaction message of Kafka is different from that of RocketMQ. RocketMQ solves the two actions of local transaction execution and sending messages that satisfy the transaction constraints.

Kafka transaction messages are used in situations where multiple messages need to be sent in a transaction to ensure transaction constraints between multiple messages, that is, multiple messages are either sent successfully or all failed, as shown in the following code.

The transaction of Kafka basically cooperates with its idempotent mechanism to implement Exactly Once semantics, so the transaction message of Kafka is not the kind of transaction message we think, but that of RocketMQ.

When it comes to this, I want to talk about it. When it comes to this Exactly Once, it is easy for students who are not clear about it to misunderstand.

We know that there are three kinds of message reliability, at most, just once, and at least once. I have already mentioned in the previous article in the message queue that we basically use at least once and then cooperate with the idempotent of the consumer side to implement exactly once.

The news happens to be consumed once, of course, all of us pursue it, but I have analyzed the article from all aspects before, and it is basically difficult to achieve.

And Kafka says it can achieve Exactly Once? Is it such a beef beer? This is actually a gimmick of Kafka, if you say he's wrong, he's right, you have to say he's right, but the Exactly Once he achieves is not the Exactly Once you think he is.

It happens to exist only one scenario at a time, that is, from the Kafka as the message source, and then do some operation, and then write to the Kafka.

So how did he do it just once? It is through idempotence, as we have implemented in business, through a unique Id, and then record it, and if it has already been recorded, it will not be written, so as to ensure that it happens once.

So what Kafka implements is just once in a particular scenario, not using Kafka to send a message as we thought, then the message will only be consumed once.

This is actually the same as when Redis said that he realized the transaction, and it is not what we thought.

Therefore, we blindly believe that what features have been developed in open source software, so they are often residual blood or can only be satisfied in special scenarios, do not be misled, can not believe the superficial description, but also have to look at the documentation or source code in detail.

But from another point of view is understandable, as an open source software must want more people to use, I am not lying ah, my document is very clear, this title is not deceptive, right?

Indeed, for example, when you click on an article that shocked xxxx's title, he didn't lie to you. He was really shocked.

Let's come back to the Kafka transaction news, so this transaction message is not the transaction news we want, in fact, it is not the topic of today, but I will say it briefly.

The transaction of Kafka has the role of transaction coordinator, who is actually part of Broker.

At the beginning of a transaction, the producer will initiate a request to the transaction coordinator to indicate that the transaction is open, and the transaction coordinator will record the message in a special log-transaction log, and then the producer will send the message that it really wants to send. Here Kafka and RocketMQ are not handled differently. Kafka will process these transaction messages like normal messages, and the consumer will filter the message.

Then, after sending, the producer will send a commit or rollback request to the transaction coordinator, who will commit in two phases. If it is a commit, the pre-commit will be performed first, that is, the state of the transaction will be set to pre-commit and then written to the transaction log, and then a message similar to the end of the transaction will be written to all the partitions related to the transaction, so that the consumer will know the transaction when it consumes the message. You can put the news out.

Finally, the coordinator will record another transaction end message in the transaction log, and the Kafka transaction is complete. Let me summarize the process with the diagram on confluent.io.

This is the answer to the question about how the transaction message process of RocketMQ and Kafka is realized. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel for more related knowledge.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report