Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to prevent data loss by RabbitMQ

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Editor to share with you RabbitMQ how to prevent data loss, I hope you will learn something after reading this article, let's discuss it together!

First, analyze the causes of data loss

To analyze the loss of RabbitMQ messages, take a look at the process of sending a message from the producer to the consumer:

As you can see, the whole process of a message goes through two network transmissions: from the producer to the RabbitMQ server, and from the RabbitMQ server to the consumer.

Stored in a queue (Queue) before the consumer consumes it.

So you can know that there are three scenarios where message loss can occur:

Stored in the queue, if the queue does not persist the message, the RabbitMQ server downtime restart will lose data. When the producer sends a message to the RabbitMQ server, if the RabbitMQ server goes down and stops service, the message will be lost. The consumer gets the data consumption stored in the queue from the RabbitMQ server, but the consumer program goes wrong or goes down and does not consume correctly, resulting in data loss.

For the above three scenarios, RabbitMQ provides three solutions, namely, message persistence, confirm mechanism and ACK transaction mechanism.

2. Persistence of messages

RabbitMQ supports message persistence, which needs to be set: Exchange is persistence and Queue persistence, so that when a message is sent to the RabbitMQ server, the message is persisted.

First, take a look at the class diagram of the Exchange switch:

The purpose of looking at this class diagram is to show that the four switches introduced in the previous article are all subclasses of the AbstractExchange abstract class, so according to the characteristics of java, creating an instance of a subclass will first call the constructor of the parent class. What is the constructor of the parent class, that is, AbstractExchange?

From the comments above, you can see that the durable parameter indicates whether to persist or not. The default is persistence (true). To create a persistent Exchange, you can write:

@ Bean

Public DirectExchange rabbitmqDemoDirectExchange () {

/ / Direct switch

Return new DirectExchange (RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false)

}

Then there is the Queue queue. Let's take a look at what the Queue constructor looks like:

It is also set whether to persist through the durable parameter. The default is true. So you can create without specifying:

@ Bean

Public Queue fanoutExchangeQueueA () {

/ / you only need to specify a name. It is persistent by default.

Return new Queue (RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A)

}

This completes the setting of message persistence, and then starts the project, sends a few messages, and we can see:

How to prove that it has been persisted? in fact, you can find the corresponding file: find the directory on the corresponding disk: message persistence can prevent messages from being lost in RabbitMQ Server due to downtime and restart.

III. Message confirmation mechanism 3.1 confirm mechanism

When the producer sends to the RabbitMQ Server, the delivery may fail due to network problems, resulting in data loss. We can use confirm mode to prevent data loss. What is the workflow? take a look at the following diagram: you can see from the above figure that the notification is made through two callback functions * * confirm () and returnedMessage () * *.

A message sent from the producer to RabbitMQ is first sent to Exchange, corresponding to the callback function confirm (). The second step is to assign the route from Exchange to Queue, and the corresponding callback function is returnedMessage ().

How to implement the code, please see the demo:

First, add the following configuration to the application.yml configuration file:

Spring:

Rabbitmq:

Publisher-confirms: true

# publisher-returns: true

Template:

Mandatory: true

# publisher-confirms: when set to true. When the message is delivered to Exchange, the confirm () method will be called back to notify the producer.

# publisher-returns: when set to true. When the message matches to the Queue and fails, the message is returned by calling back the returnedMessage () method

# spring.rabbitmq.template.mandatory: when set to true. Specifies that the message is returned through the callback returnedMessage () method when it is not received by the queue.

There is one small detail. If both publisher-returns and mandatory are set, the priority is mandatory. You can look at the source code: then we need to define the callback method:

@ Component

Public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

Private Logger logger = LoggerFactory.getLogger (RabbitmqConfirmCallback.class)

/ * *

* listen to whether the message reaches the Exchange

*

* @ param correlationData the object that contains the unique identity of the message

* @ param ack true ID ack,false ID nack

* @ reasons for the failure of param cause nack delivery

, /

@ Override

Public void confirm (CorrelationData correlationData, boolean ack, String cause) {

If (ack) {

Logger.info ("message delivered successfully ~ message Id: {}", correlationData.getId ())

} else {

Logger.error ("message delivery failed, Id: {}, error prompt: {}", correlationData.getId (), cause)

}

}

@ Override

Public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {

Logger.info ("message is not routed to the queue, get the returned message")

Map map = byteToObject (message.getBody (), Map.class)

Logger.info ("message body: {}", map = = null? ": map.toString ())

Logger.info ("replyCode: {}", replyCode)

Logger.info ("replyText: {}", replyText)

Logger.info ("exchange: {}", exchange)

Logger.info ("routingKey: {}", exchange)

Logger.info ("- > end"

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report