In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how the basic idea of RocketMQ transaction message implementation is, and the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
The basic idea of RocketMQ transaction message implementation.
The RocketMQ4.3.0 version begins to support transaction messages, and subsequent sharing will begin to analyze the implementation principles of transaction messages. Start with the official Demo instance, which leads to the world of RocketMQ transaction messages.
Before the official version is released, after the first version of apache rocketmq is launched, there are codes related to transaction messages in the code, such as COMMIT, ROLLBACK and PREPARED. Before the transaction messages are open source, the "voice" of transaction messages on the Internet is basically similar to two-phase commit, mainly based on the definition in the message system flag MessageSysFlag:
TRANSACTION_PREPARED_TYPE
TRANSACTION_COMMIT_TYPE
TRANSACTION_ROLLBACK_TYPE
The message sender first sends a message of type TRANSACTION_PREPARED_TYPE, and then decides whether to commit or roll back the transaction to send an commit request or rollback request according to the transaction status. if the commit/rollback request is lost, rocketmq will check the transaction status after the specified timeout to decide whether to commit or roll back the transaction.
Let's each take our own understanding and guess, starting with reading the Demo program officially provided by RocketMQ, and try to get some general information.
The Demo sample program is located in the: / rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction package. There is no message consumer in this package. In order to verify the message consumption of the transaction, we can copy a consumer from other packages to run the producer first, and then run the consumer to judge the pre-release, submission, rollback and other effects of the transaction message. Without saying a word, run it first and see the effect:
The result of the message sender running:
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a QueueId=1], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test BrokerName=broker-a, queueId=2], queueOffset=9]
Effect of message consumer:
Consumer Started.ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString () = Message {topic='transaction_topic_test', flag=0, properties= {MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC REAL_QID=0}, body= [72,101,108,108,111,32,82,111,99,107,101,116,77,81,32,55], transactionId='C0A8010518DC6D06D69C8D5768640007'}] ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString () = Message {topic='transaction_topic_test', flag=0, properties= {MIN_OFFSET=0 REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1}, body= [72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='C0A8010518DC6D06D69C8D5768380004'}]] ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834 StoreHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString () = Message {topic='transaction_topic_test', flag=0, properties= {MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2}, body= [72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49] TransactionId='C0A8010518DC6D06D69C8D57680F0001'}]]
To sum up, the server sent 10 messages, but the consumer only received 3 messages. It should be due to transaction rollback that only 3 messages were submitted. For more stringency, you can install a rocketmq-consonse to observe the above results of shangshagn's more intuitively:
Cdn.com/97db9b9f7bc18478739f472e134cf48eb3af8cad.png ">
Next, read the sample code:
1. Producer-side code interpretation:
Public class TransactionProducer {public static void main (String [] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl (); / / @ 1 TransactionMQProducer producer = new TransactionMQProducer ("please_rename_unique_group_name"); producer.setNamesrvAddr ("127.0.0.1 String 9876") ExecutorService executorService = new ThreadPoolExecutor (2,5,100, TimeUnit.SECONDS, new ArrayBlockingQueue (2000), new ThreadFactory () {@ Override public Thread newThread (Runnable r) {Thread thread = newThread (r); thread.setName ("client-transaction-msg-check-thread"); return thread;}}) / / @ 2 producer.setExecutorService (executorService); / / @ 3 producer.setTransactionListener (transactionListener); / / @ 4 producer.start (); String [] tags = new String [] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int I = 0; I
< 10; i++) { // @5 try { Message msg = new Message("transaction_topic_test", tags[i % tags.length], "KEY">Code @ 1: create a TransactionListener instance, which is literally understood as a transaction message event listener, which is expanded in detail below.
The code @ 2:ExecutorService executorService creates a thread pool whose name prefix "client-transaction-msg-check-thread" is literally understood as a client transaction message status detection thread. We can boldly guess whether this thread pool calls the TransactionListener method to complete the detection of transaction messages. [here is only the author's guess, we can not take it seriously, after the author's subsequent article, if this point of view error, will be fixed, written here, mainly want to share my method of reading the source code].
Code @ 3: sets the thread pool for the transaction message sender.
Code @ 4: sets the transaction listener for the transaction message sender.
Code @ 5: send 10 messages.
2. TransactionListener code interpretation
Public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger (0); private ConcurrentHashMap localTrans = new ConcurrentHashMap (); @ Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) {int value = transactionIndex.getAndIncrement (); int status = value% 3; localTrans.put (msg.getTransactionId (), status); return LocalTransactionState.UNKNOW;} @ Override public LocalTransactionState checkLocalTransaction (MessageExt msg) {Integer status = localTrans.get (msg.getTransactionId ()) If (null! = status) {switch (status) {case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE;} return LocalTransactionState.COMMIT_MESSAGE }}
ExecuteLocalTransaction method: record the transaction state of the local transaction. Here, its implementation is to set the state of the transaction message to 0Magne1Magol 2. In the demo, the state data of the message is stored in a Map. In practical application, the transaction state of the message is usually persisted, such as a database or cache.
CheckLocalTransaction method, the transaction checks back the business implementation, looks up the local transaction table, and judges the state of the transaction such as 0RL _ BACKE _ message. It can be explained here that the producer sends 10 messages in a row, and because only 3 messages have a transaction status of COMMIT_MESSAGE, message consumers can only consume 3 messages.
At this point, you can basically know how the transaction message is implemented, which is basically similar to the "online voice" implementation shown at the beginning of the article. The next section will analyze the implementation details of TransactionMQProducer transaction message sending in detail.
Solemnly declare: this article is mainly to show the basic use of transaction messages, the conclusion of this article is only the author's guess, a very important purpose of this article is to show readers a way to learn the source code, summed up as follows: first to do a comprehensive understanding (online, official documents), and then to their own thinking, learning from Demo examples, learning tasks will be decomposed, while writing while reading.
On the basic idea of RocketMQ transaction message implementation is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.