In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to use springboot + rabbitmq message confirmation mechanism". In daily operation, I believe many people have doubts about how to use springboot + rabbitmq message confirmation mechanism. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use springboot + rabbitmq message confirmation mechanism". Next, please follow the editor to study!
First, prepare the environment
Org.springframework.boot
Spring-boot-starter-amqp
The message acknowledgement of sender and consumer needs to be enabled in the configuration.
Spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# sender enables confirm confirmation mechanism spring.rabbitmq.publisher-confirms=true# sender enables return confirmation mechanism spring.rabbitmq.publisher-returns=true### sets the consumer end Does manual ackspring.rabbitmq.listener.simple.acknowledge-mode=manual# support retrying spring.rabbitmq.listener.simple.retry.enabled=true?
Define the switch confirmTestExchange and queue confirm_test_queue, and bind the queue to the switch.
@ Configuration
Public class QueueConfig {
@ Bean (name = "confirmTestQueue")
Public Queue confirmTestQueue () {
Return new Queue ("confirm_test_queue", true, false, false)
}
@ Bean (name = "confirmTestExchange")
Public FanoutExchange confirmTestExchange () {
Return new FanoutExchange ("confirmTestExchange")
}
@ Bean
Public Binding confirmTestFanoutExchangeAndQueue (
@ Qualifier ("confirmTestExchange") FanoutExchange confirmTestExchange
@ Qualifier ("confirmTestQueue") Queue confirmTestQueue) {
Return BindingBuilder.bind (confirmTestQueue) .to (confirmTestExchange)
}
}
The message acknowledgement of rabbitmq is divided into two parts: sending message acknowledgement and message receiving acknowledgement.
Insert a picture description here. 2. Send confirmation of message
The confirmCallback callback is triggered as soon as the message is received by rabbitmq broker.
@ Slf4j
@ Component
Public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@ Override
Public void confirm (CorrelationData correlationData, boolean ack, String cause) {
If (! ack) {
Log.error ("message sending exception!")
} else {
Log.info ("Sender Dad has received confirmation, correlationData= {}, ack= {}, cause= {}", correlationData.getId (), ack, cause)
}
}
}
Implement the interface ConfirmCallback, overriding its confirm () method, which has three parameters correlationData, ack, and cause.
CorrelationData: there is only one id attribute inside the object to indicate the uniqueness of the current message. Ack: the status of the message delivered to broker. True indicates success. Cause: indicates the reason for the delivery failure.
However, when a message is received by broker, it can only indicate that it has arrived at the MQ server, and there is no guarantee that the message will be delivered to the target queue. So you need to use returnCallback next.
If the message is not delivered to the destination queue, a callback returnCallback will be triggered. If the message is not successfully delivered to the queue, the detailed delivery data of the current message will be recorded here to facilitate subsequent operations such as retransmission or compensation.
@ Slf4j
@ Component
Public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@ Override
Public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {
Log.info ("returnedMessage = = > replyCode= {}, replyText= {}, exchange= {}, routingKey= {}", replyCode, replyText, exchange, routingKey)
}
}
Implement the interface ReturnCallback and override the returnedMessage () method with five parameters: message (message body), replyCode (response code), replyText (response content), exchange (switch), and routingKey (queue).
The following is the specific message delivery. Set Confirm and Return callback in rabbitTemplate. We persist the message through setDeliveryMode (), create a CorrelationData object for subsequent testing, and add an id of 10000000000.
@ Autowired
Private RabbitTemplate rabbitTemplate
@ Autowired
Private ConfirmCallbackService confirmCallbackService
@ Autowired
Private ReturnCallbackService returnCallbackService
Public void sendMessage (String exchange, String routingKey, Object msg) {
/ * *
* ensure that messages can be returned to the queue after a failed delivery
* Note: yml needs to be configured with publisher-returns: true
, /
RabbitTemplate.setMandatory (true)
/ * *
* manual ack receipt callback processing after the consumer confirms receipt of the message
, /
RabbitTemplate.setConfirmCallback (confirmCallbackService)
/ * *
* callback processing for failed delivery of messages to the queue
, /
RabbitTemplate.setReturnCallback (returnCallbackService)
/ * *
* send messages
, /
RabbitTemplate.convertAndSend (exchange, routingKey, msg
Message-> {
Message.getMessageProperties () setDeliveryMode (MessageDeliveryMode.PERSISTENT)
Return message
}
New CorrelationData (UUID.randomUUID () .toString ())
}
3. Message receipt confirmation @ Slf4j
@ Component
@ RabbitListener (queues = "confirm_test_queue")
Public class ReceiverMessage1 {
@ RabbitHandler
Public void processHandler (String msg, Channel channel, Message message) throws IOException {
Try {
Log.info ("Xiao Fu receives message: {}", msg)
/ / TODO specific business
Channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false)
} catch (Exception e) {
If (message.getMessageProperties () .getRedelivered ()) {
Log.error ("message has been repeatedly processed and failed, refusing to receive again.")
Channel.basicReject (message.getMessageProperties () .getDeliveryTag (), false); / / reject message
} else {
Log.error ("the message is about to return to the queue for processing.")
Channel.basicNack (message.getMessageProperties (). GetDeliveryTag (), false, true)
}
}
}
}
There are three receipt methods for consumer messages. Let's analyze the meaning of each method.
BasicAck: indicates a successful confirmation. After using this receipt method, the message will be deleted by rabbitmq broker.
Void basicAck (long deliveryTag, boolean multiple)
DeliveryTag: indicates the serial number of message delivery. Each time a message is consumed or re-delivered, the deliveryTag will increase. In manual message confirmation mode, we can perform ack, nack, reject and other operations on messages with specified deliveryTag.
Multiple: whether to confirm in batch. A value of true will ack all messages smaller than the current message deliveryTag at one time.
For example, Chestnut: suppose I send three messages with deliveryTag of 5, 6 and 7, but none of them are confirmed. When I send the fourth message, deliveryTag is set to 8 and multiple is set to true, and all messages of 5, 6, 7 and 8 will be confirmed.
BasicNack: indicates failure confirmation. This method is usually used when consuming message business exceptions. Messages can be re-delivered to the queue.
Void basicNack (long deliveryTag, boolean multiple, boolean requeue)
DeliveryTag: indicates the serial number of message delivery.
Multiple: whether to confirm in batch.
Requeue: messages with a value of true will be re-queued.
BasicReject: reject messages. Unlike basicNack, batch operations cannot be performed. Other uses are very similar.
Void basicReject (long deliveryTag, boolean requeue)
DeliveryTag: indicates the serial number of message delivery.
Requeue: messages with a value of true will be re-queued.
Fourth, test five, step on the pit log
This is a very unskilled pit, but it is very easy to make mistakes.
Enable the message confirmation mechanism and don't forget to consume the message channel.basicAck, otherwise the message will exist all the time, resulting in repeated consumption.
When I first came into contact with the message confirmation mechanism, the consumer code was written as follows. The idea is simple: confirm the message after processing the business logic, and put the message back into the queue after an exception occurs in int a = 1 / 0.
@ RabbitHandler
Public void processHandler (String msg, Channel channel, Message message) throws IOException {
Try {
Log.info ("Consumer 2 received: {}", msg)
Int a = 1 / 0
Channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false)
} catch (Exception e) {
Channel.basicNack (message.getMessageProperties (). GetDeliveryTag (), false, true)
}
}
However, there is a problem that once 99.9% of the bug occurs in the business code, it will not be repaired automatically. A message will be delivered to the queue indefinitely, and the consumer will execute it indefinitely, resulting in an endless loop.
Insert a picture description here
The local CPU was filled in an instant. You can imagine how panicked I was when the service crashed in the production environment.
And rabbitmq management has only one unconfirmed message.
Insert a picture description here
After testing and analysis, it is found that when the message is re-delivered to the message queue, the message will not return to the end of the queue, but still at the head of the queue.
The consumer immediately consumes the message, the business process throws an exception, and the message rejoins the queue, and so on. Causes message queuing processing to block, causing normal messages to fail to run.
Our solution at that time was to reply the message first, at which time the message queue would delete the message, and at the same time we sent the message to the message queue again, and the abnormal message was placed at the end of the message queue, thus ensuring that the message would not be lost and the normal business would be carried out.
Channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false)
/ / resend the message to the end of the queue
Channel.basicPublish (message.getMessageProperties (). GetReceivedExchange ()
Message.getMessageProperties (). GetReceivedRoutingKey (), MessageProperties.PERSISTENT_TEXT_PLAIN
JSON.toJSONBytes (msg))
However, this method does not solve the fundamental problem. Error messages still report errors from time to time. Later, the number of message retries is optimized. When the upper limit of retry is reached, the message is manually confirmed, the queue deletes the message, and the message is persisted into MySQL and the alarm is pushed. Manual processing and timing tasks are performed to compensate.
How to ensure that the consumption of MQ is idempotent depends on the specific business. The message can be persisted with the help of MySQL or redis, and verified by the uniqueness attribute in the message.
At this point, the study on "how to use springboot + rabbitmq message confirmation mechanism" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.