In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Today, I will talk to you about how RabbitMQ ensures the reliability of the message. Many people may not know much about it. In order to make you understand better, the editor summed up the following content for you. I hope you can get something from this article.
A successful consumption has experienced producer-> MQ- > consumer, so it is possible to lose messages in all three steps.
A message producer did not successfully send the message to the MQ1.1 transaction mechanism
The AMQP protocol provides a transaction mechanism to enable transaction support when delivering a message and roll back the transaction if the message delivery fails.
Custom transaction manager
@ Configuration
Public class RabbitTranscation {
@ Bean
Public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {
Return new RabbitTransactionManager (connectionFactory)
}
@ Bean
Public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) {
Return new RabbitTemplate (connectionFactory)
}
}
Modify yml
Spring:
Rabbitmq:
# messages are returned without being received by the queue
Publisher-returns: true
Turn on transaction support
RabbitTemplate.setChannelTransacted (true)
Call ReturnCallback when the message is not received
RabbitTemplate.setMandatory (true)
Producer delivery message
@ Service
Public class ProviderTranscation implements RabbitTemplate.ReturnCallback {
@ Autowired
RabbitTemplate rabbitTemplate
@ PostConstruct
Public void init () {
/ / set channel to start the transaction
RabbitTemplate.setChannelTransacted (true)
RabbitTemplate.setReturnCallback (this)
}
@ Override
Public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println ("this message failed to send" + message+ ", please process")
}
@ Transactional (rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
Public void publishMessage (String message) throws Exception {
RabbitTemplate.setMandatory (true)
RabbitTemplate.convertAndSend ("javatrip", message)
}
}
However, few people do this because this is a synchronous operation, and after a message is sent, it blocks the sender to wait for a response from the RabbitMQ-Server before moving on to the next message, which greatly degrades the throughput and performance of the producer production message.
1.2 sender confirmation mechanism
When sending a message, the channel is set to confirm mode, and after the message enters the channel, it will be assigned to a unique ID. Once the message is delivered to the matching queue, the RabbitMQ will send an acknowledgement to the producer.
Enable message confirmation mechanism
Spring:
Rabbitmq:
# messages are returned without being received by the queue
Publisher-returns: true
# enable message confirmation mechanism
Publisher-confirm-type: correlated
Call ReturnCallback when the message is not received
RabbitTemplate.setMandatory (true)
Producer delivery message
@ Service
Public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@ Autowired
RabbitTemplate rabbitTemplate
@ PostConstruct
Public void init () {
RabbitTemplate.setReturnCallback (this)
RabbitTemplate.setConfirmCallback (this)
}
@ Override
Public void confirm (CorrelationData correlationData, boolean ack, String cause) {
If (ack) {
System.out.println ("confirmed this message:" + correlationData)
} else {
System.out.println ("confirmation failed:" + correlationData+ "; exception:" + cause)
}
}
@ Override
Public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println ("this message failed to send" + message+ ", please process")
}
Public void publisMessage (String message) {
RabbitTemplate.setMandatory (true)
RabbitTemplate.convertAndSend ("javatrip", message)
}
}
If the message confirmation fails, we can compensate for the message, that is, the retry mechanism of the message. The message is re-delivered when the confirmation message is not received. The configuration can be completed as follows.
Spring:
Rabbitmq:
# support returning to the queue after a failed message delivery
Publisher-returns: true
# enable message confirmation mechanism
Publisher-confirm-type: correlated
Listener:
Simple:
Retry:
# enable retry
Enabled: true
# maximum number of retries
Max-attempts: 5
# retry interval
Initial-interval: 3000
After the message is sent to MQ, the MQ outage results in the loss of messages in memory.
Messages can be lost in MQ, so we need to persist both the queue and the message.
The @ Queue annotation provides us with some queue-related attributes, as follows:
Name: name of the queue
Durable: whether to persist or not
Exclusive: whether exclusive or exclusive
AutoDelete: whether to delete automatically
Arguments: other attribute parameters of the queue, which are optional as follows, see the arguments in figure 2:
X-message-ttl: expiration time of the message (in milliseconds)
X-expires: queue expiration time. How long the queue has not been accessed will be deleted (in milliseconds)
X-max-length: maximum queue length. If the maximum length is exceeded, the message will be deleted from the queue header.
X-max-length-bytes: queue message content occupies the maximum space and is limited by memory size. Beyond this threshold, messages are deleted from the queue header.
X-overflow: sets the queue overflow behavior. This determines what happens to the message when the maximum length of the queue is reached. Valid values are drop-head, reject-publish, or reject-publish-dlx. Quorum type only supports drop-head
X-dead-letter-exchange: dead letter switch name, messages that are expired or deleted (because the queue length is too long or the space exceeds the threshold) can be specified to be sent to this switch
X-dead-letter-routing-key: dead-letter message routing key, which is used when the message is sent to the dead-letter switch. If not set, the original routing key value of the message is used.
X-single-active-consumer: indicates whether the queue is a single active consumer. In true, there is only one consumer consumption message in the registered consumption group, the other is ignored, and the message is circulated to all consumers in false (default false)
X-max-priority: the maximum number of priorities to be supported by the queue; if not set, the queue will not support message priority
X-queue-mode (Lazy mode): set the queue to delay mode to keep as many messages on disk as possible to reduce the use of RAM; if not set, the queue will retain the memory cache to deliver messages as quickly as possible
X-queue-master-locator: sets the master node information of the mirror queue in cluster mode.
Persistence queue
Set the persistence property durable to true and autoDelete to false when creating the queue
@ Queue (value = "javatrip", durable = "true", autoDelete = "false")
Persistent message
When sending a message, the deliveryMode of the message is set to 2, and the message is persisted by default in Spring Boot.
When consumers consume news, there is an exception before the consumption is finished.
The consumer has just consumed the message and has not yet processed the business, resulting in an exception. At this point, you need to turn off automatic confirmation and manually confirm the message instead.
Change yml to manual signing mode
Spring:
Rabbitmq:
Listener:
Simple:
# Manual signing mode
Acknowledge-mode: manual
# sign for one message at a time
Prefetch: 1
Consumers sign for receipt manually
@ Component
@ RabbitListener (queuesToDeclare = @ Queue (value = "javatrip", durable = "true"))
Public class Consumer {
@ RabbitHandler
Public void receive (String message, @ Headers Map headers, Channel channel) throws Exception {
System.out.println (message)
/ / unique message ID
Long deliverTag = (Long) headers.get (AmqpHeaders.DELIVERY_TAG)
/ / confirm the message
If (...) {
Channel.basicAck (deliverTag,false)
} else {
/ / consumption failed, message returned to queue
Channel.basicNack (deliverTag,false,true)
}
}
}
Fourth, summarize the reasons for the loss of the message?
Producers, MQ and consumers may all cause message loss.
How to ensure the reliability of the message?
The sender adopts the sender confirmation mode
MQ for queue and message persistence
Manually confirm the message after the consumer consumes successfully
After reading the above, do you have any further understanding of how RabbitMQ ensures the reliability of the message? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.