Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to ensure the Reliability of messages by RabbitMQ

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how RabbitMQ ensures the reliability of the message. Many people may not know much about it. In order to make you understand better, the editor summed up the following content for you. I hope you can get something from this article.

A successful consumption has experienced producer-> MQ- > consumer, so it is possible to lose messages in all three steps.

A message producer did not successfully send the message to the MQ1.1 transaction mechanism

The AMQP protocol provides a transaction mechanism to enable transaction support when delivering a message and roll back the transaction if the message delivery fails.

Custom transaction manager

@ Configuration

Public class RabbitTranscation {

@ Bean

Public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {

Return new RabbitTransactionManager (connectionFactory)

}

@ Bean

Public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) {

Return new RabbitTemplate (connectionFactory)

}

}

Modify yml

Spring:

Rabbitmq:

# messages are returned without being received by the queue

Publisher-returns: true

Turn on transaction support

RabbitTemplate.setChannelTransacted (true)

Call ReturnCallback when the message is not received

RabbitTemplate.setMandatory (true)

Producer delivery message

@ Service

Public class ProviderTranscation implements RabbitTemplate.ReturnCallback {

@ Autowired

RabbitTemplate rabbitTemplate

@ PostConstruct

Public void init () {

/ / set channel to start the transaction

RabbitTemplate.setChannelTransacted (true)

RabbitTemplate.setReturnCallback (this)

}

@ Override

Public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {

System.out.println ("this message failed to send" + message+ ", please process")

}

@ Transactional (rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")

Public void publishMessage (String message) throws Exception {

RabbitTemplate.setMandatory (true)

RabbitTemplate.convertAndSend ("javatrip", message)

}

}

However, few people do this because this is a synchronous operation, and after a message is sent, it blocks the sender to wait for a response from the RabbitMQ-Server before moving on to the next message, which greatly degrades the throughput and performance of the producer production message.

1.2 sender confirmation mechanism

When sending a message, the channel is set to confirm mode, and after the message enters the channel, it will be assigned to a unique ID. Once the message is delivered to the matching queue, the RabbitMQ will send an acknowledgement to the producer.

Enable message confirmation mechanism

Spring:

Rabbitmq:

# messages are returned without being received by the queue

Publisher-returns: true

# enable message confirmation mechanism

Publisher-confirm-type: correlated

Call ReturnCallback when the message is not received

RabbitTemplate.setMandatory (true)

Producer delivery message

@ Service

Public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

@ Autowired

RabbitTemplate rabbitTemplate

@ PostConstruct

Public void init () {

RabbitTemplate.setReturnCallback (this)

RabbitTemplate.setConfirmCallback (this)

}

@ Override

Public void confirm (CorrelationData correlationData, boolean ack, String cause) {

If (ack) {

System.out.println ("confirmed this message:" + correlationData)

} else {

System.out.println ("confirmation failed:" + correlationData+ "; exception:" + cause)

}

}

@ Override

Public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {

System.out.println ("this message failed to send" + message+ ", please process")

}

Public void publisMessage (String message) {

RabbitTemplate.setMandatory (true)

RabbitTemplate.convertAndSend ("javatrip", message)

}

}

If the message confirmation fails, we can compensate for the message, that is, the retry mechanism of the message. The message is re-delivered when the confirmation message is not received. The configuration can be completed as follows.

Spring:

Rabbitmq:

# support returning to the queue after a failed message delivery

Publisher-returns: true

# enable message confirmation mechanism

Publisher-confirm-type: correlated

Listener:

Simple:

Retry:

# enable retry

Enabled: true

# maximum number of retries

Max-attempts: 5

# retry interval

Initial-interval: 3000

After the message is sent to MQ, the MQ outage results in the loss of messages in memory.

Messages can be lost in MQ, so we need to persist both the queue and the message.

The @ Queue annotation provides us with some queue-related attributes, as follows:

Name: name of the queue

Durable: whether to persist or not

Exclusive: whether exclusive or exclusive

AutoDelete: whether to delete automatically

Arguments: other attribute parameters of the queue, which are optional as follows, see the arguments in figure 2:

X-message-ttl: expiration time of the message (in milliseconds)

X-expires: queue expiration time. How long the queue has not been accessed will be deleted (in milliseconds)

X-max-length: maximum queue length. If the maximum length is exceeded, the message will be deleted from the queue header.

X-max-length-bytes: queue message content occupies the maximum space and is limited by memory size. Beyond this threshold, messages are deleted from the queue header.

X-overflow: sets the queue overflow behavior. This determines what happens to the message when the maximum length of the queue is reached. Valid values are drop-head, reject-publish, or reject-publish-dlx. Quorum type only supports drop-head

X-dead-letter-exchange: dead letter switch name, messages that are expired or deleted (because the queue length is too long or the space exceeds the threshold) can be specified to be sent to this switch

X-dead-letter-routing-key: dead-letter message routing key, which is used when the message is sent to the dead-letter switch. If not set, the original routing key value of the message is used.

X-single-active-consumer: indicates whether the queue is a single active consumer. In true, there is only one consumer consumption message in the registered consumption group, the other is ignored, and the message is circulated to all consumers in false (default false)

X-max-priority: the maximum number of priorities to be supported by the queue; if not set, the queue will not support message priority

X-queue-mode (Lazy mode): set the queue to delay mode to keep as many messages on disk as possible to reduce the use of RAM; if not set, the queue will retain the memory cache to deliver messages as quickly as possible

X-queue-master-locator: sets the master node information of the mirror queue in cluster mode.

Persistence queue

Set the persistence property durable to true and autoDelete to false when creating the queue

@ Queue (value = "javatrip", durable = "true", autoDelete = "false")

Persistent message

When sending a message, the deliveryMode of the message is set to 2, and the message is persisted by default in Spring Boot.

When consumers consume news, there is an exception before the consumption is finished.

The consumer has just consumed the message and has not yet processed the business, resulting in an exception. At this point, you need to turn off automatic confirmation and manually confirm the message instead.

Change yml to manual signing mode

Spring:

Rabbitmq:

Listener:

Simple:

# Manual signing mode

Acknowledge-mode: manual

# sign for one message at a time

Prefetch: 1

Consumers sign for receipt manually

@ Component

@ RabbitListener (queuesToDeclare = @ Queue (value = "javatrip", durable = "true"))

Public class Consumer {

@ RabbitHandler

Public void receive (String message, @ Headers Map headers, Channel channel) throws Exception {

System.out.println (message)

/ / unique message ID

Long deliverTag = (Long) headers.get (AmqpHeaders.DELIVERY_TAG)

/ / confirm the message

If (...) {

Channel.basicAck (deliverTag,false)

} else {

/ / consumption failed, message returned to queue

Channel.basicNack (deliverTag,false,true)

}

}

}

Fourth, summarize the reasons for the loss of the message?

Producers, MQ and consumers may all cause message loss.

How to ensure the reliability of the message?

The sender adopts the sender confirmation mode

MQ for queue and message persistence

Manually confirm the message after the consumer consumes successfully

After reading the above, do you have any further understanding of how RabbitMQ ensures the reliability of the message? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report