In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to ensure that messages are not consumed repeatedly in MQ". In daily operation, I believe that many people have doubts about how to ensure that messages are not consumed repeatedly in MQ. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "how to ensure that messages are not consumed repeatedly in MQ". Next, please follow the editor to study!
one。 Repeat message
Why is the message duplicated? There are two reasons for duplicate messages: 1. The message is repeated during production, 2. The message is repeated during consumption.
1.1 duplicate messages during production
Because the producer sends a message to the MQ, there is a network fluctuation during the MQ confirmation, and the producer does not receive the acknowledgment. In fact, the MQ has already received the message. At this point, the producer will send the message again.
If the message is not acknowledged or fails in the producer, we can use scheduled task + (redis/db) to retry the message.
@ Component
@ Slf4J
Public class SendMessage {
@ Autowired
Private MessageService messageService
@ Autowired
Private RabbitTemplate rabbitTemplate
/ / maximum number of deliveries
Private static final int MAX_TRY_COUNT = 3
/ * *
* pull the failed message every 30s and re-deliver it.
, /
@ Scheduled (cron = "0ram 30 *?")
Public void resend () {
Log.info ("start scheduled tasks (resend messages)")
List msgLogs = messageService.selectTimeoutMsg ()
MsgLogs.forEach (msgLog-> {
String msgId = msgLog.getMsgId ()
If (msgLog.getTryCount () > = MAX_TRY_COUNT) {
MessageService.updateStatus (msgId, Constant.MsgLogStatus.DELIVER_FAIL)
Log.info ("message delivery failed after exceeding the maximum number of retries, msgId: {}", msgId)
} else {
MessageService.updateTryCount (msgId, msgLog.getNextTryTime ()); / / delivery times + 1
CorrelationData correlationData = new CorrelationData (msgId)
RabbitTemplate.convertAndSend (msgLog.getExchange (), msgLog.getRoutingKey (), MessageHelper.objToMsg (msgLog.getMsg ()), correlationData); / / re-delivery
Log.info ("1st" + (msgLog.getTryCount () + 1) + "resubmitted message")
}
});
Log.info ("scheduled task execution ends (resubmit messages)")
}
}
1.2 duplicate messages during consumption
After the consumer consumes successfully, there is a network fluctuation when confirming to MQ, and MQ does not receive the confirmation. In order to ensure that the message is consumed, MQ will continue to deliver the previous message to the consumer. At this point, consumers receive two identical messages.
Modify the consumer to simulate the exception
@ RabbitListener (queuesToDeclare = @ Queue (value = "javatrip", durable = "true"))
Public void receive (String message, @ Headers Map headers, Channel channel) throws Exception {
System.out.println (retry + System.currentTimeMillis ())
System.out.println (message)
Int I = 1 / 0
}
Configure yml retry policy
Spring:
Rabbitmq:
Listener:
Simple:
Retry:
Enabled: true # enables consumers to retry
Max-attempts: 5 # maximum number of retries
Initial-interval: 3000 # retry interval
Because duplicate messages are caused by network reasons, it is inevitable to repeat messages. But we need to ensure that the message is idempotent.
two。 How to ensure the idempotency of messages
Let each message carry a global unique ID to ensure the idempotency of the message. The specific consumption process is as follows:
After getting the message, consumers first query whether the message exists in redis/db according to id.
If it does not exist, it will be consumed normally and will be written to redis/db after consumption.
If it exists, it proves that the message has been consumed and discarded directly.
Producer
@ PostMapping ("/ send")
Public void sendMessage () {
JSONObject jsonObject = new JSONObject ()
JsonObject.put ("message", "Java Journey")
String json = jsonObject.toJSONString ()
Message message = MessageBuilder.withBody (json.getBytes ()) .setContentType (MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding ("UTF-8") .setMessageId (UUID.randomUUID () + ") .build ()
AmqpTemplate.convertAndSend ("javatrip", message)
}
Consumer
@ Component
@ RabbitListener (queuesToDeclare = @ Queue (value = "javatrip", durable = "true"))
Public class Consumer {
@ RabbitHandler
Public void receiveMessage (Message message) throws Exception {
Jedis jedis = new Jedis ("localhost", 6379)
String messageId = message.getMessageProperties () .getMessageId ()
String msg = new String (message.getBody (), "UTF-8")
System.out.println ("the message received is:" + msg+ "= = message id is:" + messageId)
String messageIdRedis = jedis.get ("messageId")
If (messageId = = messageIdRedis) {
Return
}
JSONObject jsonObject = JSONObject.parseObject (msg)
String email = jsonObject.getString ("message")
Jedis.set ("messageId", messageId)
}
}
If you need to store the db, you can directly set the ID as the primary key of the message. The next time you get a duplicate message for consumption, an exception will be thrown directly because of the uniqueness of the database primary key.
At this point, the study on "how to ensure that messages are not repeatedly consumed in MQ" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.