In addition to Weibo, there is also WeChat
Please pay attention

WeChat public account
Shulou
 
            
                     
                
2025-10-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
Most people do not understand the knowledge points of this article "how SpringBoot integrates RocketMQ transactions, broadcasts and sequential messages", so the editor summarizes the following content, detailed content, clear steps, and has a certain reference value. I hope you can get something after reading this article. Let's take a look at this "how SpringBoot integrates RocketMQ transactions, broadcasts and sequential messages".
Environment: springboot2.3.9RELEASE + RocketMQ4.8.0
Rely on org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0 configuration file server: port: 8080-rocketmq: nameServer: localhost:9876 producer: group: demo-mq normal message
Send
@ Resource private RocketMQTemplate rocketMQTemplate; public void send (String message) {rocketMQTemplate.convertAndSend ("test-topic:tag2", MessageBuilder.withPayload (message). Build ());}
Accept
@ RocketMQMessageListener (topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 | | tag2") @ Component public class ConsumerListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println ("received message:" + message);}} sequential messages
Send
@ Resource private RocketMQTemplate rocketMQTemplate Public void sendOrder (String topic, String message, String tags, int id) {rocketMQTemplate.asyncSendOrderly (topic + ":" + tags, MessageBuilder.withPayload (message). Build (), order- + id New SendCallback () {@ Override public void onSuccess (SendResult sendResult) {System.err.println ("msg-id:" + sendResult.getMsgId () + ":" + message + "\ tqueueId:" + sendResult.getMessageQueue () .getQueueId () } @ Override public void onException (Throwable e) {e.printStackTrace ();});}
Here is to send messages to different queues according to hashkey
@ RocketMQMessageListener (topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 | | tag4", consumeMode = ConsumeMode.ORDERLY) @ Component public class ConsumerOrderListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println (Thread.currentThread (). GetName () + "received Order message:" + message);}}
ConsumeMode = ConsumeMode.ORDERLY, indicating that the message mode is sequential, one queue, one thread.
Result
 
When consumeMode = ConsumeMode.CONCURRENTLY, the execution result is as follows:
 
Cluster / broadcast message mode
Sending end
@ Resource private RocketMQTemplate rocketMQTemplate; public void send (String topic, String message, String tags) {rocketMQTemplate.send (topic + ":" + tags, MessageBuilder.withPayload (message). Build ());} Cluster message mode
Consumer end
@ RocketMQMessageListener (topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 | | tag7", messageModel = MessageModel.CLUSTERING) @ Component public class ConsumerBroadListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println ("ConsumerBroadListener1 received message:" + message);}}
MessageModel = MessageModel.CLUSTERING
test
Start the two services, respectively, the port is 8080pr 8081
8080 Servic
8081 Servic
In the cluster message mode, each service receives a part of the message respectively, which realizes the load balancing.
Broadcast message mode
Consumer end
@ RocketMQMessageListener (topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 | | tag7", messageModel = MessageModel.BROADCASTING) @ Component public class ConsumerBroadListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println ("ConsumerBroadListener1 received message:" + message);}}
MessageModel = MessageModel.BROADCASTING
test
Start the two services, respectively, the port is 8080pr 8081
8080 Servic
8081 Servic
In the cluster message mode, each service receives the same message separately.
Transaction message
Three states of RocketMQ transaction
TransactionStatus.CommitTransaction: submit a transaction message that consumers can consume
TransactionStatus.RollbackTransaction: rollback the transaction, which means that the message will be deleted and is not allowed to be consumed.
TransactionStatus.Unknown: intermediate state, which means that the message queue needs to be checked to determine the status.
The implementation of transaction messages by RocketMQ is mainly divided into two stages: the sending and submission of normal transactions, and the overall process of compensation for transaction information is as follows:
Normal transaction send and commit phase
1. The producer sends one and a half messages to MQServer (semi-messages refer to messages that consumers cannot consume temporarily)
2. The server response message is written into the result, and the semi-message is sent successfully.
3. Start executing local transactions
4. Perform Commit or Rollback operations according to the execution status of the local transaction
Compensation process of transaction information
1. If the MQServer does not receive the execution status of the local transaction for a long time, it will issue an operation request to the producer to confirm the check back.
2. After receiving the confirmation request, the producer checks the execution status of the local transaction.
3. Perform Commit or Rollback operations according to the results of the inspection
The compensation phase is mainly used to solve the situation in which the producer times out or fails when sending Commit or Rollback operations.
Sending end
@ Resource private RocketMQTemplate rocketMQTemplate; public void sendTx (String topic, Long id, String tags) {rocketMQTemplate.sendMessageInTransaction (topic + ":" + tags, MessageBuilder.withPayload (id, UUID.randomUUID (). ToString (). ReplaceAll ("-", ")). SetHeader ("BID", UUID.randomUUID (). ToString (). ReplaceAll ("-", ")). Build (), UUID.randomUUID (). ToString (). ReplaceAll ("-",");}
Listeners corresponding to producers
@ RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {@ Resource private BusinessService bs; @ Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {/ / here performs local transaction operations, such as saving data. Try {/ / create a log table and store the unique ID in the database. In the following check method, you can query whether there is data String id = (String) msg.getHeaders (). Get ("BID"); Users users = new JsonMapper (). ReadValue ((byte []) msg.getPayload (), Users.class) System.out.println ("message content:" + users + "\ t participation data:" + arg + "\ t unique serial number of this transaction:" + id); bs.save (users, new UsersLog (users.getId (), id));} catch (Exception e) {e.printStackTrace (); return RocketMQLocalTransactionState.ROLLBACK;} return RocketMQLocalTransactionState.COMMIT } @ Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {/ / check whether the local transaction is executed successfully String id = (String) msg.getHeaders () .get ("BID"); System.out.println ("execute query ID:" + id + "data exists"); UsersLog usersLog = bs.queryUsersLog (id) If (usersLog = = null) {return RocketMQLocalTransactionState.ROLLBACK;} return RocketMQLocalTransactionState.COMMIT;}}
Consumer end
@ RocketMQMessageListener (topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @ Component public class ConsumerTxListener implements RocketMQListener {@ Override public void onMessage (Users users) {System.out.println ("TX received message:" + users);}}
Service
@ Transactional public boolean save (Users users, UsersLog usersLog) {usersRepository.save (users); usersLogRepository.save (usersLog); if (users.getId () = = 1) {throw new RuntimeException ("data error");} return true;} public UsersLog queryUsersLog (String bid) {return usersLogRepository.findByBid (bid);}
Controller
GetMapping ("/ tx/ {id}") public Object sendTx (@ PathVariable ("id") Long id) {ps.sendTx ("tx-topic", id, "tag10"); return "send transaction success";}
test
After calling the API, the console outputs:
It can be seen from the printed log that the consumer does not receive the message until it has been saved.
Delete the data and then test that ID is 1 will report an error.
There is no data in the database.
Is not very complicated, two stages to deal with.
The above is about the content of this article on "how SpringBoot integrates RocketMQ transactions, broadcasts and sequential messages". I believe we all have some understanding. I hope the content shared by the editor will be helpful to you. If you want to know more about the relevant knowledge, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

The market share of Chrome browser on the desktop has exceeded 70%, and users are complaining about

The world's first 2nm mobile chip: Samsung Exynos 2600 is ready for mass production.According to a r


A US federal judge has ruled that Google can keep its Chrome browser, but it will be prohibited from

Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope





 
             
            About us Contact us Product review car news thenatureplanet
More Form oMedia: AutoTimes. Bestcoffee. SL News. Jarebook. Coffee Hunters. Sundaily. Modezone. NNB. Coffee. Game News. FrontStreet. GGAMEN
© 2024 shulou.com SLNews company. All rights reserved.