Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement message Reliability Mechanism in rabbitmq in springboot

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly introduces the springboot rabbitmq how to achieve message reliability mechanism, the article is very detailed, has a certain reference value, interested friends must read it!

1. The producer module realizes message reliability through publisher confirm mechanism.

1.1 producer module imports rabbitmq-related dependencies

Org.springframework.boot spring-boot-starter-amqp com.fasterxml.jackson.core jackson-databind

1.2 configuration of mq in the configuration file

Spring.rabbitmq.host=10.128.240.183spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true

Publish-confirm-type: enable publisher-confirm. Available values are as follows

Simple: wait for the confirm result synchronously until the timeout

Correlated: asynchronous callback, which defines ConfirmCallback. This ConfirmCallback will be called back when mq returns the result.

Publish-returns: enable the publish-return function. ReturnCallback can be defined

Template.mandatory: define the policy for message routing failure

True: calling ReturnCallback

False: discard messages directly

1.3Definitions of ReturnCallback (this callback is triggered by a failed delivery of messages to the queue)

Only one ReturnCallback can be configured per RabbitTemplate.

When message delivery fails, the processing logic defined in the producer's returnCallback is invoked

You can configure this callback when the container starts

@ Slf4j@Configurationpublic class CommonConfig implements ApplicationContextAware {@ Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {/ / get RabbitTemplate object RabbitTemplate rabbitTemplate = applicationContext.getBean (RabbitTemplate.class); / / configure ReturnCallback rabbitTemplate.setReturnCallback ((message, replyCode, replyText, exchange, routingKey)-> {/ / determine whether it is a delay message Integer receivedDelay = message.getMessageProperties (). GetReceivedDelay () If (receivedDelay! = null & & receivedDelay > 0) {/ / is a delay message, ignore this error prompt return Log log.error ("message failed to queue, response code: {}, cause of failure: {}, switch: {}, routing key: {}, message: {}", replyCode, replyText, exchange, routingKey, message.toString ()) / / resend the message if necessary);}}

1.4 define ConfirmCallback (message arrives at the switch to trigger this callback)

You can specify a unified confirmation callback for redisTemplate

@ Slf4j@Configurationpublic class CommonConfig implements ApplicationContextAware {@ Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {/ / get RabbitTemplate object RabbitTemplate rabbitTemplate = applicationContext.getBean (RabbitTemplate.class); / / configure ReturnCallback rabbitTemplate.setReturnCallback ((message, replyCode, replyText, exchange, routingKey)-> {/ / determine whether it is a delay message Integer receivedDelay = message.getMessageProperties (). GetReceivedDelay () If (receivedDelay! = null & & receivedDelay > 0) {/ / is a delay message, ignore this error prompt return Log log.error ("message failed to queue, response code: {}, cause of failure: {}, switch: {}, routing key: {}, message: {}", replyCode, replyText, exchange, routingKey, message.toString ()) / / resend the message} if necessary); / / set a unified confirm callback. As soon as the message arrives at broker, ack=true rabbitTemplate.setConfirmCallback (new RabbitTemplate.ConfirmCallback () {@ Override public void confirm (CorrelationData correlationData, boolean b, String s) {System.out.println ("this is a unified callback"); System.out.println ("correlationData:" + correlationData); System.out.println ("ack:" + b) System.out.println ("cause:" + s);});}}

You can also customize callbacks for specific messages

@ Autowired private RabbitTemplate rabbitTemplate; @ Test public void testmq () throws InterruptedException {CorrelationData correlationData = new CorrelationData (UUID.randomUUID (). ToString ()); correlationData.getFuture () .addCallback (result- > {if (result.isAck ()) {/ / ACK log.debug ("message delivered to the switch successfully! Message ID: {} ", correlationData.getId ();} else {/ / NACK log.error (" message delivery to the switch failed! Message ID: {} ", correlationData.getId (); / / resend message}}, ex- > {/ / log log.error (" message sent failed! " , ex); / / resend message}); rabbitTemplate.convertAndSend ("example.direct", "blue", "hello,world", correlationData);} 2. Consumer module opens message confirmation

2.1 add configuration

# Manual ack messages without using the default consumer confirmation spring.rabbitmq.listener.simple.acknowledge-mode=manual

None: close ack. If the message is unreliable, it may be lost.

Auto: similar to the transaction mechanism. If an exception occurs, nack is returned, and the message is rolled back to mq. No exception is returned.

Ackmanual: we specify when to return ack for ourselves

2.2 manual mode is customized in listeners to return ack

@ RabbitListener (queues = "order.release.order.queue") @ Servicepublic class OrderCloseListener {@ Autowired private OrderService orderService; @ RabbitHandler private void listener (OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println ("received expired order information, ready to close order" + orderEntity.getOrderSn ()); try {orderService.closeOrder (orderEntity) / / if the second parameter is false, only this message is confirmed. True means to confirm channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false) for multiple messages received at the same time;} catch (Exception e) {/ / the second parameter is ture means to re-queue the message channel.basicReject (message.getMessageProperties (). GetDeliveryTag (), true);}} 3. Consumer module enables message failure retry mechanism

3.1 profile add configuration, enable local retry

Spring: rabbitmq: listener: simple: retry: enabled: true # enable Consumer failed retry initial-interval: 1000 # failed wait time is 1 second multiplier: 1 # multiple of failed wait time = multiplier * last-interval max-attempts: 3 # maximum number of retries stateless: true # true stateless; false stateless. If there are transactions in the business, change here to false

Enable local retry. If the message processing process always throws an exception, it will not requeue to the queue, but will retry locally at the consumer.

When the maximum number of retries is reached, spring returns ack and the message is discarded

4. Consumer module adds failure policy (used after failed local retry is enabled)

When local retry is enabled, the message is discarded directly after the maximum number of retries.

All three strategies are inherited from the MessageRecovery interface

RejectAndDontRequeueRecoverer: after the retry runs out, reject directly and discard the message. This is the default way.

ImmediateRequeueMessageRecoverer: when the retry runs out, return nack and rejoin the queue with messages.

RepublishMessageRecoverer: when the retry runs out, the failure message is delivered to the specified switch

4.2 define switches and queues that handle failed messages without automatically creating corresponding queues, switches and bindings, and do nothing

@ Beanpublic DirectExchange errorMessageExchange () {return new DirectExchange ("error.direct");} @ Beanpublic Queue errorQueue () {return new Queue ("error.queue", true);} / / the routing key is key@Beanpublic Binding errorBinding (Queue errorQueue, DirectExchange errorMessageExchange) {return BindingBuilder.bind (errorQueue) .to (errorMessageExchange) .with ("error");}

4.3 add a failed policy component to the container

@ Beanpublic MessageRecoverer republishMessageRecoverer (RabbitTemplate rabbitTemplate) {/ / error is the routing key return new RepublishMessageRecoverer (rabbitTemplate, "error.direct", "error");} above is all the content of the article "how to achieve message reliability mechanism in springboot". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report