Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How SpringBoot integrates RocketMQ transactions, broadcasts, and sequential messages

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

Most people do not understand the knowledge points of this article "how SpringBoot integrates RocketMQ transactions, broadcasts and sequential messages", so the editor summarizes the following content, detailed content, clear steps, and has a certain reference value. I hope you can get something after reading this article. Let's take a look at this "how SpringBoot integrates RocketMQ transactions, broadcasts and sequential messages".

Environment: springboot2.3.9RELEASE + RocketMQ4.8.0

Rely on org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0 configuration file server: port: 8080-rocketmq: nameServer: localhost:9876 producer: group: demo-mq normal message

Send

@ Resource private RocketMQTemplate rocketMQTemplate; public void send (String message) {rocketMQTemplate.convertAndSend ("test-topic:tag2", MessageBuilder.withPayload (message). Build ());}

Accept

@ RocketMQMessageListener (topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 | | tag2") @ Component public class ConsumerListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println ("received message:" + message);}} sequential messages

Send

@ Resource private RocketMQTemplate rocketMQTemplate Public void sendOrder (String topic, String message, String tags, int id) {rocketMQTemplate.asyncSendOrderly (topic + ":" + tags, MessageBuilder.withPayload (message). Build (), order- + id New SendCallback () {@ Override public void onSuccess (SendResult sendResult) {System.err.println ("msg-id:" + sendResult.getMsgId () + ":" + message + "\ tqueueId:" + sendResult.getMessageQueue () .getQueueId () } @ Override public void onException (Throwable e) {e.printStackTrace ();});}

Here is to send messages to different queues according to hashkey

@ RocketMQMessageListener (topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 | | tag4", consumeMode = ConsumeMode.ORDERLY) @ Component public class ConsumerOrderListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println (Thread.currentThread (). GetName () + "received Order message:" + message);}}

ConsumeMode = ConsumeMode.ORDERLY, indicating that the message mode is sequential, one queue, one thread.

Result

When consumeMode = ConsumeMode.CONCURRENTLY, the execution result is as follows:

Cluster / broadcast message mode

Sending end

@ Resource private RocketMQTemplate rocketMQTemplate; public void send (String topic, String message, String tags) {rocketMQTemplate.send (topic + ":" + tags, MessageBuilder.withPayload (message). Build ());} Cluster message mode

Consumer end

@ RocketMQMessageListener (topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 | | tag7", messageModel = MessageModel.CLUSTERING) @ Component public class ConsumerBroadListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println ("ConsumerBroadListener1 received message:" + message);}}

MessageModel = MessageModel.CLUSTERING

test

Start the two services, respectively, the port is 8080pr 8081

8080 Servic

8081 Servic

In the cluster message mode, each service receives a part of the message respectively, which realizes the load balancing.

Broadcast message mode

Consumer end

@ RocketMQMessageListener (topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 | | tag7", messageModel = MessageModel.BROADCASTING) @ Component public class ConsumerBroadListener implements RocketMQListener {@ Override public void onMessage (String message) {System.out.println ("ConsumerBroadListener1 received message:" + message);}}

MessageModel = MessageModel.BROADCASTING

test

Start the two services, respectively, the port is 8080pr 8081

8080 Servic

8081 Servic

In the cluster message mode, each service receives the same message separately.

Transaction message

Three states of RocketMQ transaction

TransactionStatus.CommitTransaction: submit a transaction message that consumers can consume

TransactionStatus.RollbackTransaction: rollback the transaction, which means that the message will be deleted and is not allowed to be consumed.

TransactionStatus.Unknown: intermediate state, which means that the message queue needs to be checked to determine the status.

The implementation of transaction messages by RocketMQ is mainly divided into two stages: the sending and submission of normal transactions, and the overall process of compensation for transaction information is as follows:

Normal transaction send and commit phase

1. The producer sends one and a half messages to MQServer (semi-messages refer to messages that consumers cannot consume temporarily)

2. The server response message is written into the result, and the semi-message is sent successfully.

3. Start executing local transactions

4. Perform Commit or Rollback operations according to the execution status of the local transaction

Compensation process of transaction information

1. If the MQServer does not receive the execution status of the local transaction for a long time, it will issue an operation request to the producer to confirm the check back.

2. After receiving the confirmation request, the producer checks the execution status of the local transaction.

3. Perform Commit or Rollback operations according to the results of the inspection

The compensation phase is mainly used to solve the situation in which the producer times out or fails when sending Commit or Rollback operations.

Sending end

@ Resource private RocketMQTemplate rocketMQTemplate; public void sendTx (String topic, Long id, String tags) {rocketMQTemplate.sendMessageInTransaction (topic + ":" + tags, MessageBuilder.withPayload (id, UUID.randomUUID (). ToString (). ReplaceAll ("-", ")). SetHeader ("BID", UUID.randomUUID (). ToString (). ReplaceAll ("-", ")). Build (), UUID.randomUUID (). ToString (). ReplaceAll ("-",");}

Listeners corresponding to producers

@ RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener {@ Resource private BusinessService bs; @ Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {/ / here performs local transaction operations, such as saving data. Try {/ / create a log table and store the unique ID in the database. In the following check method, you can query whether there is data String id = (String) msg.getHeaders (). Get ("BID"); Users users = new JsonMapper (). ReadValue ((byte []) msg.getPayload (), Users.class) System.out.println ("message content:" + users + "\ t participation data:" + arg + "\ t unique serial number of this transaction:" + id); bs.save (users, new UsersLog (users.getId (), id));} catch (Exception e) {e.printStackTrace (); return RocketMQLocalTransactionState.ROLLBACK;} return RocketMQLocalTransactionState.COMMIT } @ Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {/ / check whether the local transaction is executed successfully String id = (String) msg.getHeaders () .get ("BID"); System.out.println ("execute query ID:" + id + "data exists"); UsersLog usersLog = bs.queryUsersLog (id) If (usersLog = = null) {return RocketMQLocalTransactionState.ROLLBACK;} return RocketMQLocalTransactionState.COMMIT;}}

Consumer end

@ RocketMQMessageListener (topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @ Component public class ConsumerTxListener implements RocketMQListener {@ Override public void onMessage (Users users) {System.out.println ("TX received message:" + users);}}

Service

@ Transactional public boolean save (Users users, UsersLog usersLog) {usersRepository.save (users); usersLogRepository.save (usersLog); if (users.getId () = = 1) {throw new RuntimeException ("data error");} return true;} public UsersLog queryUsersLog (String bid) {return usersLogRepository.findByBid (bid);}

Controller

GetMapping ("/ tx/ {id}") public Object sendTx (@ PathVariable ("id") Long id) {ps.sendTx ("tx-topic", id, "tag10"); return "send transaction success";}

test

After calling the API, the console outputs:

It can be seen from the printed log that the consumer does not receive the message until it has been saved.

Delete the data and then test that ID is 1 will report an error.

There is no data in the database.

Is not very complicated, two stages to deal with.

The above is about the content of this article on "how SpringBoot integrates RocketMQ transactions, broadcasts and sequential messages". I believe we all have some understanding. I hope the content shared by the editor will be helpful to you. If you want to know more about the relevant knowledge, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report