In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces how to achieve the RocketMQ transaction message, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, let the editor take you to understand it.
As described above, the entry to send a transaction message is:
TransactionMQProducer#sendMessageInTransaction:public TransactionSendResult sendMessageInTransaction (final Message msg, final Object arg) throws MQClientException {if (null = = this.transactionListener) {/ / @ 1 throw new MQClientException ("TransactionListener is null", null);} return this.defaultMQProducerImpl.sendMessageInTransaction (msg, transactionListener, arg); / / @ 2}
Code @ 1: if transactionListener is empty, an exception is thrown directly.
Code @ 2: call the sendMessageInTransaction method of defaultMQProducerImpl.
Next, focus on sharing the sendMessageInTransaction method.
DefaultMQProducerImpl#sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction (final Message msg, final TransactionListener tranExecuter, final Object arg) throws MQClientException {
Step1: first explain the meaning of the parameters.
Final Message msg: messa
TransactionListener tranExecuter: transaction listener
Object arg: other additional parameters
DefaultMQProducerImpl#sendMessageInTransactionSendResult sendResult = null;MessageAccessor.putProperty (msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty (msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup ()); try {sendResult = this.send (msg);} catch (Exception e) {throw new MQClientException ("send message Exception", e);}
Step2: add two attributes to the message property: TRAN_MSG, whose value is true, which is represented as a transactional message, and PGROUP: the sender group to which the message belongs, and then sends the message synchronously. The property TRAN_MSG of the message is checked before the message is sent, and if it exists and the value is true, the message is set to MessageSysFlag.TRANSACTION_PREPARED_TYPE by setting the message system flag.
DefaultMQProducerImpl#sendKernelImplfinal String tranMsg = msg.getProperty (MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg! = null & & Boolean.parseBoolean (tranMsg)) {sysFlag | = MessageSysFlag.TRANSACTION_PREPARED_TYPE;} SendMessageProcessor#sendMessageString traFlag = oriProps.get (MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag! = null & & Boolean.parseBoolean (traFlag)) {if (this.brokerController.getBrokerConfig (). IsRejectTransactionMessage ()) {response.setCode (ResponseCode.NO_PERMISSION) Response.setRemark ("the broker [" + this.brokerController.getBrokerConfig (). GetBrokerIP1 () + "] sending transaction message is forbidden"); return response;} putMessageResult = this.brokerController.getTransactionalMessageService (). PrepareMessage (msgInner);} else {putMessageResult = this.brokerController.getMessageStore (). PutMessage (msgInner);}
After receiving the request from the client to send a message, the Step3:Broker side judges the message type. If it is a transaction message, the TransactionalMessageService#prepareMessage method is called, otherwise, following the previous logic, the MessageStore#putMessage method is called to store the message on the Broker server.
This section focuses on the principle of the implementation of transaction messages, so next we will focus on the prepareMessage method. If you want to understand the relevance of RocketMQ message storage, you can follow the author's source code analysis RocketMQ series.
Org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessagepublic PutMessageResult prepareMessage (MessageExtBrokerInner messageInner) {return transactionalMessageBridge.putHalfMessage (messageInner);}
Step4: transaction message, which calls the TransactionalMessageServiceImpl#prepareMessage method, which in turn calls the TransactionalMessageBridge#prepareMessage method.
TransactionalMessageBridge#parseHalfMessageInnerpublic PutMessageResult putHalfMessage (MessageExtBrokerInner messageInner) {return store.putMessage (parseHalfMessageInner (messageInner));} private MessageExtBrokerInner parseHalfMessageInner (MessageExtBrokerInner msgInner) {MessageAccessor.putProperty (msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic (); MessageAccessor.putProperty (msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf (msgInner.getQueueId () MsgInner.setSysFlag (MessageSysFlag.resetTransactionValue (msgInner.getSysFlag (), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic (TransactionalMessageUtil.buildHalfTopic ()); msgInner.setQueueId (0); msgInner.setPropertiesString (MessageDecoder.messageProperties2String (msgInner.getProperties (); return msgInner;}
Step5: back up the original topic name of the message with the original queue ID, then untag the transaction message of the message, reset the theme of the message to: RMQ_SYS_TRANS_HALF_TOPIC, and the queue ID is fixed to 0. Then call the MessageStore#putMessage method to persist the message, where the TransactionalMessageBridge bridge class is the process that encapsulates the transaction message, and finally calls MessageStore to complete the persistence of the message. After the message is stored, it continues to go back to DefaultMQProducerImpl#sendMessageInTransaction, after the Step2 above, that is, the message is sent to the message server through synchronization.
Note: this is the processing logic of the Prepare status of the transaction message. The message is stored on the message server, but the storage is not the original topic, but the RMQ_SYS_TRANS_HALF_TOPIC, so the consumer cannot consume shen.
Of a message sent by the producer. Seeing here, if you are familiar with RocketMQ, there must be a "scheduled task" to fetch the message under this topic, and then restore the topic of the message at the "right" time.
DefaultMQProducerImpl#sendMessageInTransactionswitch (sendResult.getSendStatus ()) {case SEND_OK: {try {if (sendResult.getTransactionId ()! = null) {msg.putUserProperty ("_ _ transactionId__", sendResult.getTransactionId ()) } String transactionId = msg.getProperty (MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null! = transactionId & &! ".equals (transactionId)) {msg.setTransactionId (transactionId);} localTransactionState = tranExecuter.executeLocalTransaction (msg, arg) If (null = = localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;} if (localTransactionState! = LocalTransactionState.COMMIT_MESSAGE) {log.info ("executeLocalTransactionBranch return {}", localTransactionState); log.info (msg.toString ()) }} catch (Throwable e) {log.info ("executeLocalTransactionBranch exception", e); log.info (msg.toString ()); localException = e;}} break Case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break;}
Step6: if the message is sent successfully, the TransactionListener#executeLocalTransaction method will be called back, the local transaction will be executed, and the local transaction status will be returned as LocalTransactionState. The enumerated values are as follows:
COMMIT_MESSAGE
ROLLBACK_MESSAGE
UNKNOW
Note: TransactionListener#executeLocalTransaction executes the local transaction method after the sender successfully sends the PREPARED message, and then returns the local transaction status; if the PREPARED message fails, TransactionListener#executeLocalTransaction is not called, and the local transaction message is set to LocalTransactionState.ROLLBACK_MESSAGE, indicating that the message needs to be rolled back.
DefaultMQProducerImpl#sendMessageInTransactiontry {this.endTransaction (sendResult, localTransactionState, localException);} catch (Exception e) {log.warn ("local transaction execute" + localTransactionState + ", but end broker transaction failed", e);}
Step7: call the endTransaction method to end the transaction (commit or rollback).
DefaultMQProducerImpl#endTransactionEndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader (); requestHeader.setTransactionId (transactionId); requestHeader.setCommitLogOffset (id.getOffset ()); switch (localTransactionState) {case COMMIT_MESSAGE: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break } requestHeader.setProducerGroup (this.defaultMQProducer.getProducerGroup ()); requestHeader.setTranStateTableOffset (sendResult.getQueueOffset ()); requestHeader.setMsgId (sendResult.getMsgId ())
Step8: assemble the end transaction request. The main parameters are: transaction ID, transaction operation (commitOrRollback), consumption group, message queue offset, message ID,fromTransactionCheck. The request sent from here defaults to false. The request handler on the broker side is: EndTransactionProcessor.
Step9:EndTransactionProcessor is based on the transaction commit type: TRANSACTION_COMMIT_TYPE (commit transaction), TRANSACTION_ROLLBACK_TYPE (rollback transaction), TRANSACTION_NOT_TYPE (ignore the request).
So far, the sending process of RocketMQ transaction messages has been combed in detail, more accurately, the message sending process of Prepare status. The specific process is shown in the figure:
Cdn.com/b0bfa9bc730e64d5d03312968f1a2b07132364f1.png ">
At this point, this article preliminarily shows the sending process of transaction messages. Generally speaking, RocketMQ uses the idea of two-phase commit to send transaction messages. First, when sending a message, send a message with a message type of Prepread, then after the message is successfully stored in the message server, it will call back TransactionListener#executeLocalTransaction, execute the local transaction status callback function, and then end the transaction according to the return value of this method:
1. COMMIT_MESSAGE: commit the transaction.
2. ROLLBACK_MESSAGE: roll back the transaction.
3. UNKNOW: unknown transaction status, when the message server (Broker) receives the EndTransaction command, the message will not be processed. The message is still in the Prepared type and stored in the queue with the topic: RMQ_SYS_TRANS_HALF_TOPIC. Then the message sending process will end, so how can these messages be submitted or rolled back?
In order to avoid the need for the client to send commit and rollback commands again, RocketMQ will take a scheduled task to take out the message in RMQ_SYS_TRANS_HALF_TOPIC and then go back to the client to determine whether the message needs to be submitted or rolled back to complete the declaration cycle of the transaction message.
Thank you for reading this article carefully. I hope the article "how to realize RocketMQ transaction News" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.