Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of RabbitMQ release confirmation High-level problem

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

RabbitMQ released to confirm the example analysis of high-level problems, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

1. Release confirmation Advanced

1. Existing problems

In the reproduction environment, rabbitmq restart is caused by some unknown reasons, and the failure of producer message delivery during RabbitMQ restart will result in message loss.

1.1.release confirmation SpringBoot version 1.1.1, confirmation mechanism scheme

When the message cannot be received normally, we need to store the message in the cache.

1.1.2, code architecture diagram

1.1.3, configuration file spring.rabbitmq.host=192.168.123.129spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123spring.rabbitmq.publisher-confirm-type=correlated

NONE: disables the release confirmation mode, which is the default.

CORRELATED: successfully publishes the message to the switch to trigger the callback method.

CORRELATED: just release one and confirm one.

1.1.4. Configuration class import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1" @ Bean ("confirmExchange") public DirectExchange confirmExchange () {return new DirectExchange (CONFIRM_EXCHANGE_NAME);} @ Bean ("confirmQueue") public Queue confirmQueue () {return QueueBuilder.durable (CONFIRM_QUEUE_NAME) .build () } @ Bean public Binding queueBindingExchange (@ Qualifier ("confirmExchange") DirectExchange confirmExchange, @ Qualifier ("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind (confirmQueue) .to (confirmExchange) .with (CONFIRM_ROUTING_KEY);} 1.1.5, callback interface import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate Import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/** * callback API * / @ Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback {@ Autowired RabbitTemplate rabbitTemplate; @ PostConstruct public void init () {rabbitTemplate.setConfirmCallback (this);} / * callback * 1 after the switch accepts the failure. Save the ID of the message and related messages * 2. Whether it has been successfully received * 3. Accept the reason for failure * @ param correlationData * @ param b * @ param s * / @ Override public void confirm (CorrelationData correlationData, boolean b, String s) {String id = correlationData! = null? CorrelationData.getId (): "; if (b = = true) {log.info (" the switch has received a message with id: {}, id);} else {log.info ("the switch has not received a message with id: {} due to reasons: {}", id,s);}} 1.1.6, producer import com.xiao.springbootrabbitmq.utils.MyCallBack Import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct @ RestController@RequestMapping ("/ confirm") @ Slf4jpublic class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @ Autowired RabbitTemplate rabbitTemplate; @ GetMapping ("/ sendMessage/ {message}") public void sendMessage (@ PathVariable String message) {CorrelationData correlationData1 = new CorrelationData ("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend (CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); CorrelationData correlationData2 = new CorrelationData ("2") String routingKey2 = "key2"; rabbitTemplate.convertAndSend (CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info ("content sent is: {}", message);} 1.1.7, consumer import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component @ Component@Slf4jpublic class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; @ RabbitListener (queues = CONFIRM_QUEUE_NAME) public void receiveMessage (Message message) {String msg = new String (message.getBody ()); log.info ("received queue" + CONFIRM_QUEUE_NAME + "message: {}", msg);} 1.1.8, test results

1. The first case

The message with ID 1 is delivered normally, and the message with ID 2 cannot be consumed normally due to the error of RoutingKey, but the switch still receives the message normally, so the lost message cannot be received normally due to the reason that the switch receives it normally.

two。 The second case

In the previous case, we modified the name of the switch with a message with ID 1, so the callback function answers what caused the switch to fail to receive the success message.

1.2. fallback message 1.2.1, Mandatory parameters

When only the producer confirmation mechanism is enabled, after the switch receives the message, it will send an acknowledgement message directly to the message producer. If it is found that the message is not routable (that is, after the message is successfully received by the switch, it cannot reach the queue), then the message will be discarded directly, and the producer does not know that the message has been discarded.

By setting this parameter, the message can be returned to the producer when the destination is not reachable during message delivery.

1.2.2, configuration file spring.rabbitmq.publisher-returns=true

Return callback needs to be enabled in the configuration file.

1.2.3, producer code import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct @ RestController@RequestMapping ("/ confirm") @ Slf4jpublic class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @ Autowired RabbitTemplate rabbitTemplate; @ GetMapping ("/ sendMessage/ {message}") public void sendMessage (@ PathVariable String message) {CorrelationData correlationData1 = new CorrelationData ("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend (CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1) Log.info ("content sent is: {}", message + routingKey1); CorrelationData correlationData2 = new CorrelationData ("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend (CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info ("content sent is: {}", message + routingKey2);}} 1.2.4, callback API code import lombok.extern.slf4j.Slf4j Import org.springframework.amqp.core.Message;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/** * callback API * / @ Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@ Autowired RabbitTemplate rabbitTemplate @ PostConstruct public void init () {rabbitTemplate.setConfirmCallback (this); rabbitTemplate.setReturnsCallback (this);} / * callback * 1 after the switch accepts failure. Save the ID of the message and related messages * 2. Whether it has been successfully received * 3. Accept the reason for failure * @ param correlationData * @ param b * @ param s * / @ Override public void confirm (CorrelationData correlationData, boolean b, String s) {String id = correlationData! = null? CorrelationData.getId (): "; if (b = = true) {log.info (" the switch has received a message with id: {} ", id);} else {log.info (" the switch has not received a message with id: {} due to reasons: {} ", id,s) } @ Override public void returnedMessage (ReturnedMessage returnedMessage) {Message message = returnedMessage.getMessage (); String exchange = returnedMessage.getExchange (); String routingKey = returnedMessage.getRoutingKey (); String replyText = returnedMessage.getReplyText (); log.error ("message {}, returned by switch {}, reason for fallback: {}, routing Key: {}", new String (message.getBody ()), exchange,replyText,routingKey) } 1.2.5, test results

The code for the other classes is the same as in the previous section

A message with an ID of 2 is not routable because the RoutingKey is not routed, but it is still processed by the callback function.

1.3, backup switch 1.3.1, code architecture diagram

Here we add backup switch, backup queue and alarm queue. Their binding relationships are shown in the figure. If the message successfully received by the acknowledgement switch cannot be routed to the appropriate queue, it will be sent by the acknowledgement switch to the backup switch.

1.3.2. Configuration class code import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange" Public static final String BACKUP_QUEUE_NAME = "backup_queue"; public static final String WARNING_QUEUE_NAME = "warning_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @ Bean ("confirmExchange") public DirectExchange confirmExchange () {return ExchangeBuilder.directExchange (CONFIRM_EXCHANGE_NAME) .durable (true) .withargument ("alternate-exchange", BACKUP_EXCHANGE_NAME). Build () } @ Bean ("confirmQueue") public Queue confirmQueue () {return QueueBuilder.durable (CONFIRM_QUEUE_NAME). Build ();} @ Bean ("backupExchange") public FanoutExchange backupExchange () {return new FanoutExchange (BACKUP_EXCHANGE_NAME);} @ Bean ("backupQueue") public Queue backupQueue () {return QueueBuilder.durable (BACKUP_QUEUE_NAME). Build () } @ Bean ("warningQueue") public Queue warningQueue () {return QueueBuilder.durable (WARNING_QUEUE_NAME). Build ();} @ Bean public Binding queueBindingExchange (@ Qualifier ("confirmExchange") DirectExchange confirmExchange, @ Qualifier ("confirmQueue") Queue confirmQueue) {return BindingBuilder.bind (confirmQueue) .to (confirmExchange) .with (CONFIRM_ROUTING_KEY) } @ Bean public Binding queueBindingExchange1 (@ Qualifier ("backupExchange") FanoutExchange backupExchange, @ Qualifier ("backupQueue") Queue backupQueue) {return BindingBuilder.bind (backupQueue) .to (backupExchange) } @ Bean public Binding queueBindingExchange2 (@ Qualifier ("backupExchange") FanoutExchange backupExchange, @ Qualifier ("warningQueue") Queue warningQueue) {return BindingBuilder.bind (warningQueue) .to (backupExchange);} 1.3.3, Consumer Code import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component @ Component@Slf4jpublic class WarningConsumer {public static final String WARNING_QUEUE_NAME = "warning_queue"; @ RabbitListener (queues = WARNING_QUEUE_NAME) public void receiveMessage (Message message) {String msg = new String (message.getBody ()); log.info ("alarm discovers that non-routable messages are: {}", msg);} 1.3.4, test results

When the mandatory parameter can be used with the backup switch, if both are turned on at the same time, the backup switch has a high priority.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report