In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "what is the meaning of the transaction message of RocketMQ". In the daily operation, I believe that many people have doubts about the meaning of the transaction message of RocketMQ. The editor consulted all kinds of information and sorted out a simple and easy-to-use operation method. I hope it will be helpful to answer the doubt of "what is the meaning of RocketMQ transaction message?" Next, please follow the editor to study!
I. background
Ali's previous version of RocketMQ castrated the message back, in the new version and re-joined to solve the problem that small companies do not have the ability to do reliable messaging middleware products. At the same time, RocketMQ also refers to the Kafka implementation, the performance is also very good.
Second, version org.apache.rocketmq rocketmq-spring-boot-starter 2.0.3 III. Source code interpretation
Official demo
@ SpringBootApplicationpublic class ProducerApplication implements CommandLineRunner {private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @ Resource private RocketMQTemplate rocketMQTemplate; @ Value ("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main (String [] args) {SpringApplication.run (ProducerApplication.class, args);} @ Override public void run (String...) Args) throws Exception {/ / Send transactional messages testTransaction ();} private void testTransaction () throws MessagingException {String [] tags = new String [] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int I = 0; I < 10) Try {Message msg = MessageBuilder. WithPayload ("Hello RocketMQ" + I) .setHeader (RocketMQHeaders.TRANSACTION_ID, "KEY_" + I) .build () SendResult sendResult = rocketMQTemplate.sendMessageInTransaction (TX_PGROUP_NAME, springTransTopic + ":" + tags [I% tags.length], msg Null) System.out.printf ("- send Transactional msg body =% s, sendResult=%s% n", msg.getPayload (), sendResult.getSendStatus ()); Thread.sleep (10);} catch (Exception e) {e.printStackTrace () }} @ RocketMQTransactionListener (txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener {private AtomicInteger transactionIndex = new AtomicInteger (0); private ConcurrentHashMap localTrans = new ConcurrentHashMap (); @ Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {String transId = (String) msg.getHeaders () .get (RocketMQHeaders.TRANSACTION_ID) System.out.printf ("# executeLocalTransaction is executed, msgTransactionId=%s% n", transId); int value = transactionIndex.getAndIncrement (); int status = value% 3; localTrans.put (transId, status) If (status = = 0) {/ / Return local transaction with success (commit), in this case, / / this message will not be checked in checkLocalTransaction () System.out.printf ("# COMMIT # Simulating msg% s related local transaction exec succeeded! #% n", msg.getPayload ()); return RocketMQLocalTransactionState.COMMIT } if (status = = 1) {/ / Return local transaction with failure (rollback), in this case, / / this message will not be checked in checkLocalTransaction () System.out.printf ("# ROLLBACK # Simulating% s related local transaction exec failed! n", msg.getPayload ()); return RocketMQLocalTransactionState.ROLLBACK } System.out.printf ("# UNKNOW # Simulating% s related local transaction exec UNKNOWN!\ n"); return RocketMQLocalTransactionState.UNKNOWN;} @ Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {String transId = (String) msg.getHeaders () .get (RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get (transId) If (null! = status) {switch (status) {case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break Case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break;}} System.out.printf ("-!! CheckLocalTransaction is executed once, "+" msgTransactionId=%s, TransactionState=%s status=%s n ", transId, retState, status); return retState;}
The transaction message calls RocketMQTemplate.sendMessageInTransaction (), so start here
/ / RocketMQTemplatepublic TransactionSendResult sendMessageInTransaction (final String txProducerGroup, final String destination, final Message message, final Object arg) throws MessagingException {try {/ / producer TransactionMQProducer txProducer = this.stageMQProducer (txProducerGroup) that gets the producer group name from the local cache Org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); return txProducer.sendMessageInTransaction (rocketMsg, arg);} catch (MQClientException e) {throw RocketMQUtil.convert (e);}}
Enter txProducer.sendMessageInTransaction (rocketMsg, arg)
/ / TransactionMQProducerpublic TransactionSendResult sendMessageInTransaction (final Message msg, final Object arg) throws MQClientException {/ / whether the transaction listener (local transaction, callback query) if (null = = this.transactionListener) {throw new MQClientException ("TransactionListener is null", null);} return this.defaultMQProducerImpl.sendMessageInTransaction (msg, null, arg);}
Enter defaultMQProducerImpl.sendMessageInTransaction (msg, null, arg)
/ / DefaultMQProducerImplpublic TransactionSendResult sendMessageInTransaction (final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {TransactionListener transactionListener = getCheckListener (); if (null = = localTransactionExecuter & & null = = transactionListener) {throw new MQClientException ("tranExecutor is null", null);} Validators.checkMessage (msg, this.defaultMQProducer); SendResult sendResult = null / set to MessageAccessor.putProperty (msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty (msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup ()); try {/ / send message sendResult = this.send (msg);} catch (Exception e) {throw new MQClientException ("send message Exception", e);} LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null Switch (sendResult.getSendStatus ()) {/ / is sent successfully. If the isWaitStoreMsgOK=true in the message object (default true), if isWaitStoreMsgOK=false, no exception is caught, then SEND_OK case SEND_OK will be returned: {try {if (sendResult.getTransactionId ()! = null) {msg.putUserProperty ("_ _ transactionId__", sendResult.getTransactionId ()) } String transactionId = msg.getProperty (MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null! = transactionId & &! ".equals (transactionId)) {msg.setTransactionId (transactionId) } / / execute the incoming local branch transaction if (null! = localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch (msg, arg) / / execute annotations or producers construct incoming transaction listeners} else if (transactionListener! = null) {log.debug ("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction (msg, arg);} if (null = = localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW } if (localTransactionState! = LocalTransactionState.COMMIT_MESSAGE) {log.info ("executeLocalTransactionBranch return {}", localTransactionState); log.info (msg.toString ());}} catch (Throwable e) {log.info ("executeLocalTransactionBranch exception", e) Log.info (msg.toString ()); localException = e;}} break / / case FLUSH_DISK_TIMEOUT: / / data synchronization to Slave server "timeout case FLUSH_SLAVE_TIMEOUT: / / No Slave server" available case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break } try {/ / send secondary confirmation messages this.endTransaction (sendResult, localTransactionState, localException);} catch (Exception e) {log.warn ("local transaction execute" + localTransactionState + ", but end broker transaction failed", e);} / / encapsulate the execution result TransactionSendResult transactionSendResult = new TransactionSendResult (); transactionSendResult.setSendStatus (sendResult.getSendStatus ()); transactionSendResult.setMessageQueue (sendResult.getMessageQueue ()) TransactionSendResult.setMsgId (sendResult.getMsgId ()); transactionSendResult.setQueueOffset (sendResult.getQueueOffset ()); transactionSendResult.setTransactionId (sendResult.getTransactionId ()); transactionSendResult.setLocalTransactionState (localTransactionState); return transactionSendResult;} public void endTransaction (final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id; if (sendResult.getOffsetMsgId ()! = null) {id = MessageDecoder.decodeMessageId (sendResult.getOffsetMsgId ()) } else {id = MessageDecoder.decodeMessageId (sendResult.getMsgId ());} String transactionId = sendResult.getTransactionId (); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish (sendResult.getMessageQueue (). GetBrokerName ()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader (); requestHeader.setTransactionId (transactionId); requestHeader.setCommitLogOffset (id.getOffset ()) Switch (localTransactionState) {/ / commit transaction case COMMIT_MESSAGE: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; / / commit transaction case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break / / commit transaction case UNKNOW: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break;} requestHeader.setProducerGroup (this.defaultMQProducer.getProducerGroup ()); requestHeader.setTranStateTableOffset (sendResult.getQueueOffset ()); requestHeader.setMsgId (sendResult.getMsgId ()); String remark = localException! = null? ("executeLocalTransactionBranch exception:" + localException.toString ()): null; / / sends a secondary confirmation message in one direction without the need for the server to correspond. The compensation this.mQClientFactory.getMQClientAPIImpl () .endTransactionOneway (brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout ()) is checked and monitored by the message.}
When we first looked at the transaction message Example of RocketMQ, the listener used to execute the local transaction, thinking that the client could not respond in real time by sending a pre-message to the server and then processing the local transaction by asynchronously listening to the server's response.
At this point, the study on "what is the meaning of RocketMQ transaction news" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.