In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Today, I would like to talk to you about how to integrate RocketMQ transaction news, many people may not know much about it. In order to make you understand better, the editor summarized the following content for you. I hope you can get something according to this article.
First, the reason for choosing RocketMQ
Selection of ActiveMQ, RabbitMQ, ZeroMQ, Kafka and RocketMQ
Second, the idea of integration
RocketMQ provides transaction message review to check the official Demo.
@ SpringBootApplicationpublic class ProducerApplication implements CommandLineRunner {private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @ Resource private RocketMQTemplate rocketMQTemplate; @ Value ("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main (String [] args) {SpringApplication.run (ProducerApplication.class, args);} @ Override public void run (String...) Args) throws Exception {/ / Send transactional messages testTransaction ();} private void testTransaction () throws MessagingException {String [] tags = new String [] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int I = 0; I < 10) Try {Message msg = MessageBuilder. WithPayload ("Hello RocketMQ" + I) .setHeader (RocketMQHeaders.TRANSACTION_ID, "KEY_" + I) .build () SendResult sendResult = rocketMQTemplate.sendMessageInTransaction (TX_PGROUP_NAME, springTransTopic + ":" + tags [I% tags.length], msg Null) System.out.printf ("- send Transactional msg body =% s, sendResult=%s% n", msg.getPayload (), sendResult.getSendStatus ()); Thread.sleep (10);} catch (Exception e) {e.printStackTrace () }} @ RocketMQTransactionListener (txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener {private AtomicInteger transactionIndex = new AtomicInteger (0); private ConcurrentHashMap localTrans = new ConcurrentHashMap (); @ Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {String transId = (String) msg.getHeaders () .get (RocketMQHeaders.TRANSACTION_ID) System.out.printf ("# executeLocalTransaction is executed, msgTransactionId=%s% n", transId); int value = transactionIndex.getAndIncrement (); int status = value% 3; localTrans.put (transId, status) If (status = = 0) {/ / Return local transaction with success (commit), in this case, / / this message will not be checked in checkLocalTransaction () System.out.printf ("# COMMIT # Simulating msg% s related local transaction exec succeeded! #% n", msg.getPayload ()); return RocketMQLocalTransactionState.COMMIT } if (status = = 1) {/ / Return local transaction with failure (rollback), in this case, / / this message will not be checked in checkLocalTransaction () System.out.printf ("# ROLLBACK # Simulating% s related local transaction exec failed! n", msg.getPayload ()); return RocketMQLocalTransactionState.ROLLBACK } System.out.printf ("# UNKNOW # Simulating% s related local transaction exec UNKNOWN!\ n"); return RocketMQLocalTransactionState.UNKNOWN;} @ Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {String transId = (String) msg.getHeaders () .get (RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get (transId) If (null! = status) {switch (status) {case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break Case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break;}} System.out.printf ("-!! CheckLocalTransaction is executed once, "+" msgTransactionId=%s, TransactionState=%s status=%s n ", transId, retState, status); return retState;}
You need to send a message in testTransaction (), then implement the executeLocalTransaction () method in the TransactionListenerImpl class to execute the entire local transaction, and then implement transaction message lookback in checkLocalTransaction ().
Looking at the source code, you can see that the testTransaction () method and executeLocalTransaction () are in the same thread, but in the wrapper RocketMQTemplate.
Third, problems and solutions 3.1 several problems faced by transaction messages:
There is no strict sequence between the callback query of the transaction message sent by the message and the local transaction, so how to ensure that the transaction operation must have been completed when checking back.
The transaction message callback uses transaction_id to query where the transaction_id is stored, while ensuring the successful execution of the business operations associated with the transaction_id.
How to isolate the transaction callback query operation from the business and ensure that it does not invade the code.
Downstream consumers how to ensure the idempotency of the interface.
How to improve idempotent query performance for downstream consumers.
How to isolate idempotent operations from the business and ensure that they do not invade the code.
3.2 solution
Because there may be delays in database or other business operations, if there is no guarantee that the business operation has been completed during the review, you can check it again and again, and set the maximum number of rechecks. At the same time, you cannot discard the persistence of MQ messages to facilitate manual recovery.
You can use the local message table to send messages on the ground, and at the same time, you can use aspects, inheritance, and other methods to isolate the landing messages from the business code, so as to ensure that the local messages fall into the library and do not invade. Note that you must ensure that the local messages and the local business fall into the same transaction!
Transaction message backcheck can use the local message table at point 2 to judge the execution result of the local transaction according to the transaction_id query. Like point 2, you can use some ways to isolate the transaction message backcheck code from the business code to ensure non-intrusion.
The idempotent method:
Database unique constraint
State machine CAS unidirectional flow
The message is deduplicated.
Before executing the local business, determine whether the business id exists on the redis, and the existence will directly return the consumption success. After executing the local business, the consumption information can be asynchronously landed in the redis. Note: you need to ensure that the local business and the message idempotent operation are in the same transaction, while the redis landing operation is outside the transaction.
A better solution should be the database unique constraint + message de-duplicating table, setting unique constraints on the business id in the message deduplicated table, and isolating the message landing operation from the local business to ensure no intrusion.
Regularly clean up the history of the local message table (message to re-table).
After reading the above, do you have any further understanding of how to integrate RocketMQ transaction messages? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.