Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement RocketMQ transaction message

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article will explain in detail how to implement RocketMQ transaction messages. The content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

Summary: the implementation principle of transaction message commit or rollback is to find the message according to commitlogOffset. If it is a commit action, restore the topic and queue of the original message, store it in the commitlog file again, and then transfer it to the message consumption queue for consumer consumption, and then store the original preprocessed message into a new topic RMQ_SYS_TRANS_OP_HALF_TOPIC, indicating that the message has been processed. The rollback message differs from committing the transaction message in that the commit transaction message restores the message to the original topic and queue and stores it in the commitlog file again.

If you are interested in RocketMQ technology, please join the RocketMQ technology exchange group

The editor will focus on how RocketMQ Broker handles transaction message commit and rollback commands. According to the previous introduction, the entry EndTransactionProcessor#processRequest:

OperationResult result = new OperationResult (); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE = = requestHeader.getCommitOrRollback ()) {/ / @ 1result = this.brokerController.getTransactionalMessageService () .commitMessage (requestHeader); / / @ 2 if (result.getResponseCode () = = ResponseCode.SUCCESS) {/ / @ 3 RemotingCommand res = checkPrepareMessage (result.getPrepareMessage (), requestHeader) / / @ 4 if (res.getCode () = = ResponseCode.SUCCESS) {MessageExtBrokerInner msgInner = endMessageTransaction (result.getPrepareMessage ()); / / @ 5 msgInner.setSysFlag (MessageSysFlag.resetTransactionValue (msgInner.getSysFlag (), requestHeader.getCommitOrRollback (); msgInner.setQueueOffset (requestHeader.getTranStateTableOffset ()); msgInner.setPreparedTransactionOffset (requestHeader.getCommitLogOffset ()) MsgInner.setStoreTimestamp (result.getPrepareMessage () .getStoreTimestamp ()); / / @ 6 RemotingCommand sendResult = sendFinalMessage (msgInner); / / @ 7 if (sendResult.getCode () = = ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService () .deletePrepareMessage (result.getPrepareMessage ()) / / @ 8} return sendResult;} return res;}}

Code @ 1: if the request is a commit transaction, enter the transaction message commit process.

Code @ 2: submit messages. Don't be misled by this name. This method is mainly based on commitLogOffset to find messages from commitlog files and return OperationResult instances:

Cdn.com/14618120dcde45cd59aba1c5cb3ccd5cee5c9e61.png ">

Private MessageExt prepareMessage: message object.

Private int responseCode: find the results.

Private String responseRemark: error message.

Code @ 3: if the message is found successfully, the process continues, otherwise it is returned to the client, and the message does not find the error message.

Code @ 4: necessary fields for validating messages.

Verify that the production group of the message is consistent with the producer group in the request information.

Verify that the queue offset (queueOffset) of the message is consistent with the offset in the request information.

Verify that the commitLogOffset of the message is consistent with the CommitLogOffset in the request information.

Code @ 5: call the endMessageTransaction method, whose main purpose is to restore the real topic, queue, and set the transaction ID of the transaction message.

Code @ 6: set the relevant properties of the message, this step should be implemented directly in endMessageTransaction, uniformly restore the number of original messages, with special attention to the removal of transaction-related system tags.

Code @ 7: send the final message, the implementation principle is very simple, call MessageStore to store the message in the commitlog file, and the message will be forwarded to the consumption queue corresponding to the original message topic and consumed by the consumer.

Code @ 8: deleting preprocessed messages (prepare) actually stores messages in a topic with the theme: RMQ_SYS_TRANS_OP_HALF_TOPIC, indicating that the messages have been processed (submitted or rolled back).

The above is the process of transaction message commit, and the transaction rollback is similar. Next, we will roughly analyze the process of transaction message rollback.

EndTransactionProcessor#processRequest else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE = = requestHeader.getCommitOrRollback ()) {result = this.brokerController.getTransactionalMessageService () .rollbackMessage (requestHeader); / / @ 1 if (result.getResponseCode () = = ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage (result.getPrepareMessage (), requestHeader); if (res.getCode () = = ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService () .deletePrepareMessage (result.getPrepareMessage ()) / / @ 2} return res;}}

Code @ 1: rollback the message, in fact, the internal is to find the message according to the commitlogOffset.

Code @ 2: storing the message in RMQ_SYS_TRANS_OP_HALF_TOPIC means that it has been processed. Unlike committing a transaction message, committing a transaction message restores the message to the original topic and queue and stores it in the commitlog file again.

This is the end of the commit and rollback process of transaction messages on the Broker server. Its core implementation is to find the message according to commitlogOffset, if it is a submission action, restore the topic and queue of the original message, store it in the commitlog file again, and then transfer it to the message consumption queue for consumer consumption, and then store the original preprocessed message into a new topic RMQ_SYS_TRANS_OP_HALF_TOPIC, indicating that the message has been processed. The rollback message differs from committing the transaction message in that the commit transaction message restores the message to the original topic and queue and stores it in the commitlog file again.

On how to implement the RocketMQ transaction message sharing here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report