In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In this issue, the editor will bring you about how to deal with messages using RocketMQ. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
Message sending (producer)
Take the maven + SpringBoot project as an example to increase dependencies in pom.xml first.
Org.apache.rocketmq rocketmq-spring-boot-starter 2.0.1
Since this dependency is a starter, you can start writing code to deliver messages by introducing the dependency directly. This starter registers a bean called org.apache.rocketmq.spring.core.RocketMQTemplate, which can be used to deliver messages directly. The specific API is like this.
XXXEvent xxxDto = new XXXEvent (); Message message = MessageBuilder.withPayload (xxxDto). Build (); String dest = String.format ("% slug% s", topic-name "," tag-name "); / / default delivery: synchronous delivery does not lose messages. If a network exception occurs after a successful delivery, the client will roll back the local transaction this.rocketMQTemplate.send (dest, xxxDto) because of the delivery failure.
This delivery method can ensure that the message of successful delivery will not be lost, but it can not guarantee the success of delivery. Suppose the process of a single call looks like this
If an error occurs at step 3, mqClient rolls back the transaction because of the error that the message delivery failed. If the message has been consumed, it can lead to a business error. We can solve this problem with transaction messages.
For messages delivered with transactions, the normal processing flow is like this.
That's what happens when things go wrong.
Since there is no message lookback in ordinary messages, the producer used in ordinary messages does not support backcheck operations, and different businesses have different backcheck processing, so transaction messages need to use a separate producer. The message sending code goes something like this
/ / do not do any operations that affect the data before calling this code: XXXEvent xxxDto = new XXXEvent (); Message message = MessageBuilder.withPayload (xxxDto). Build (); String dest = String.format ("% SRAV% s", topic-name "," tag-name "); TransactionSendResult transactionSendResult = this.rocketMQTemplate.sendMessageInTransaction (" poducer-name "," topic-name:tag-name ", message," xxxid ") If (LocalTransactionState.ROLLBACK_MESSAGE.equals (transactionSendResult.getLocalTransactionState () {throw new RuntimeException ("transaction message delivery failed");} / / according to RocketMQ, there should be no other code @ RocketMQTransactionListener (txProducerGroup = "producer") class TransactionListenerImpl implements RocketMQLocalTransactionListener {/ / logic (semi-message) / / original text: When send transactional prepare (message) message succeed, this method will be invoked to execute local transaction. @ Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {try {/ / xxxService.doSomething (); return RocketMQLocalTransactionState.COMMIT; catch (IOException e) {/ / uncertain whether the final success is return RocketMQLocalTransactionState.UNKNOWN;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK }} / / check the transaction execution status @ Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {Boolean result = xxxService.isSuccess (msg,arg); if (result! = null) {if (result) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK }} return RocketMQLocalTransactionState.UNKNOWN;}} process messages (consumption)
The difference between ordinary messages and transaction messages is obvious only when it is delivered, and the corresponding consumer code is relatively simple.
Import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component @ Slf4j@Component@RocketMQMessageListener (consumerGroup = "xxx-consumer", topic = "topic-name", selectorExpression = "tag-name") public class XXXEventMQListener implements RocketMQListener {private String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s"; @ Autowired private StringRedisTemplate redisTemplate; @ Override public void onMessage (XXXEvent message) {log.info ("consumer message {}", message); / / processing message try {xxxService.doSomething (message) } catch (Exception ex) {log.warn (String.format ("message [% s] consumption failure", message), ex); / / after an exception is thrown, MQClient returns ConsumeConcurrentlyStatus.RECONSUME_LATER, and this message attempts to consume throw new RuntimException (ex) again;}
RocketMQ uses the ACK mechanism to ensure that NameServer knows whether the message is consumed in org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.
Public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@ SuppressWarnings ("unchecked") @ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt: msgs) {log.debug ("received msg: {}", messageExt); try {long now = System.currentTimeMillis (); rocketMQListener.onMessage (doConvertMessage (messageExt)); long costTime = System.currentTimeMillis ()-now Log.debug ("consume {} cost: {} ms", messageExt.getMsgId (), costTime);} catch (Exception e) {log.warn ("consume message failed. MessageExt: {} ", messageExt, e); context.setDelayLevelWhenNextConsume (delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER;}} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}} this is how the editor shared how to deal with messages using RocketMQ. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.