Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Advanced Application method of RabbitMQ in java

2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces the relevant knowledge of the advanced application method of RabbitMQ in java, the content is detailed and easy to understand, the operation is simple and fast, and it has a certain reference value. I believe everyone will gain something after reading this article on the advanced application method of RabbitMQ in java. Let's take a look at it.

1. Reliable delivery of messages

When   uses RabbitMQ, if the producer wants to know whether the message is successfully delivered to the corresponding switch and queue, there are two ways to control the reliability mode of message delivery.

  from the above figure of the whole message delivery process, the producer's message into the middleware will first arrive at the switch, and then transferred from the switch to the queue, that is, divided into two-step strategy. Then the loss of messages will occur in these two stages. RabbitMQ intimately provides us with a reliable new delivery mode for these two parts:

Confirm mode.

Return mode.

  uses these two callback patterns to ensure reliable message delivery.

1.1. Confirmation mode

The transfer of the   message from the producer to the switch returns a callback of confirmCallback. You can set the confirmation logic directly in the rabbitTemplate instance. If you are using XML configuration, you need to enable publisher-confirms= "true" in the factory configuration. The configuration of YAML is directly publisher-confirm-type: correlated. It defaults to NONE and needs to be enabled manually.

@ RunWith (SpringJUnit4ClassRunner.class) @ ContextConfiguration (locations = "classpath:spring-rabbitmq.xml") public class Producer {@ Autowired private RabbitTemplate rabbitTemplate; @ Test public void producer () throws InterruptedException {rabbitTemplate.setConfirmCallback (new RabbitTemplate.ConfirmCallback () {@ Override public void confirm (CorrelationData correlationData, boolean b, String s) {System.out.println () If (! B) {/ / processing such as message retransmission System.out.println (s);} else {System.out.println ("the switch successfully received the message");}) RabbitTemplate.convertAndSend ("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep (5);}}

The above confirmation of   is performed by a function of confirm, which contains three parameters, the first is the configuration information, the second indicates whether the switch successfully received the message, and the third parameter refers to the reason why the message was not received successfully.

1.2. Return mode

A returnCallback is returned if the   fails to deliver from the switch to the message queue. Turn on the fallback mode publisher-returns= "true" in the factory configuration, set the mode in which the switch fails to process the message (the default false discards the message directly), and add the logic of bounce processing.

@ RunWith (SpringJUnit4ClassRunner.class) @ ContextConfiguration (locations = "classpath:spring-rabbitmq.xml") public class Producer {@ Autowired private RabbitTemplate rabbitTemplate; @ Test public void producer () throws InterruptedException {rabbitTemplate.setMandatory (true) RabbitTemplate.setReturnCallback (new RabbitTemplate.ReturnCallback () {@ Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {/ / resend logic processing System.out.println (message.getBody () + "delivery message queue failed");}}) RabbitTemplate.convertAndSend ("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep (5);}}

  returnedMessage carries five parameters, which refer to message object, error code, error message, switch, and routing key.

1.3. Confirmation mechanism

  will have an acknowledgement mechanism to confirm the message after consumers grab the data consumption in the message queue to prevent message loss caused by the capture of the message but not the success of the consumption. There are three ways to confirm:

Automatic confirmation: acknowledge= "none"

Manual confirmation: acknowledge= "manual"

Confirm according to the exception: acknowledge= "auto"

In  , automatic confirmation means that once the message is grabbed by the consumer, it automatically succeeds by default and removes the message from the message queue. If there is a problem with the consumer at this time, the default message consumption will also be successful, but in fact, the consumption is not successful, that is, the current message is lost. The default is the automatic confirmation mechanism.

If   sets the manual confirmation method, you need to call back to confirm channel.basicAck () after the normal consumption of messages, and sign for receipt manually. If an exception occurs during business processing, channel.basicNack () is called to resend the message.

First of all,   needs to configure the confirmation mechanism when the queue is bound, which is set to sign for receipt manually.

The   producer does not need to change the end, but only needs to change the consumer's implementation to sign the message automatically. If the business is executed normally, the message will be signed and received. If an error occurs in the business, the message will be rejected, retransmitted or discarded.

Public class ConsumerAck implements ChannelAwareMessageListener {@ Override public void onMessage (Message message, Channel channel) throws Exception {/ / message unique ID long tag = message.getMessageProperties () .getDeliveryTag (); try {String msg = new String (message.getBody (), "utf-8"); channel.basicAck (tag, true); System.out.println ("receive message:" + msg) } catch (Exception e) {System.out.println ("message received exception"); channel.basicNack (tag, true, true); e.printStackTrace ();}

  involves three simple signing and receiving functions, one is the correct signed basicAck, the second is the single rejected basicReject, and the third is the batch rejected basicNack.

The first parameter of basicAck indicates the unique ID of the message in the channel. Only the second parameter of the current Channel; indicates whether to agree in batches. If it is false, it will only agree to sign a message from the current ID and delete it from the message queue. If it is true, it will agree to sign and receive the messages before this ID.

The first parameter of basicReject still indicates the unique ID of the message, and the second parameter indicates whether to return to the queue to send the message. False means to discard the message directly or there is a dead letter queue to receive it. True means to return to the queue to send the message. All operations are only for the current message.

BasicNack has one more parameter than the second, that is, the Boolean value in the middle, indicating whether to proceed in batches.

2. Limit the flow at the consumer end

  increases the isolation of message middleware between user requests and DB service processing, so that all burst traffic is resisted by message queues and reduces the possibility that the server will be washed out. Let all requests be stored in the queue, and the consumer only needs to take out the message at a uniform speed for consumption, so as to ensure the running efficiency. it will not cause the client to get a normal response because of the blocking in the background (of course, it refers to some tasks that do not need synchronous echo).

  only needs to specify the rate at which the message is fetched when the consumer binds the message queue. You need to sign for it manually, and each time you sign for it, the next piece of data will be taken out of the queue.

3. Message expiration time

  message queue provides the expiration time of messages stored in the queue, which is implemented in two directions, one is for all messages in the queue, that is, the expiration time of the queue, and the other is for the expiration time of the current message, which is set separately for a single message.

Setting the expiration time of a   queue is very simple. You only need to specify the expiration time when you create the queue, or you can create the specified expiration time directly through the console. Once the queue expiration time is up, the messages in the queue that have not been consumed will expire and the queue expiration will be processed.

The expiration time of a single   message needs to be specified separately when it is sent, and the additional information of the configuration is specified when it is sent, and the configuration is written by the configuration class.

  if the expiration time of a message is up, but he is in the middle of the queue at this time, then he will not be processed, and it will only be processed later to determine whether it has expired or not.

MessagePostProcessor messagePostProcessor = new MessagePostProcessor () {@ Override public Message postProcessMessage (Message message) throws AmqpException {/ / set the expiration time of message message.getMessageProperties () .setExpiration ("5000"); / / return the message return message;}}; rabbitTemplate.convertAndSend ("exchange", "route", "msg", messagePostProcessor)

If   sets both the expiration time of the message and the expiration time of the queue, then the final expiration time is determined by the shortest time, that is, if the expiration time of the current message has not arrived, but the expiration time of the entire queue has arrived, then all messages in the queue will naturally expire and implement the expiration processing policy.

4. Dead letter queue 4.1, Dead letter concept

A dead-letter queue refers to a dead-letter switch, which can be re-sent to another switch for processing when a message becomes dead-letter, and the switch for processing is called a dead-letter switch.

There are several situations in which a message becomes a dead letter.

The message length of the queue reaches the limit

When consumers reject a message, they do not put the message back into the queue.

The message expiration setting exists in the queue, and the message timeout is not consumed

There is an expiration time for the message, which is found to have expired when it is delivered to the consumer.

When creating a queue,   can specify relevant information in the configuration, such as dead-letter switch, queue length, and so on. After that, a series of work is not performed by the programmer, and MQ will complete the configured event response on its own.

4.2. Delay queue

  delay queue means that messages will not be consumed immediately after entering the queue, but will not be consumed until they reach a specified time, that is, a judgment condition of time is required.

  message queue actually does not provide the implementation of delay queue, but it can be done by TTL + dead letter queue, set a queue, not be consumed by any consumers, all messages entering will be saved inside, set the expiration time of the queue, once the queue expires, all messages will be transferred to the bound dead letter queue.

In  , the messages in the dead-letter queue are consumed by specific consumers, thus realizing the function of delay queue.

 , for example, implements a function of issuing an order for overtime payment to cancel an order:

This is the end of the article on "Advanced Application methods of RabbitMQ in java". Thank you for reading! I believe you all have a certain understanding of the knowledge of "Advanced Application methods of RabbitMQ in java". If you want to learn more, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report