Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to set up RabbitMQ delay queues

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Editor to share with you how to set up the RabbitMQ delay queue, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's learn about it!

Delay consumption. For example, after the user generates the order, it takes a period of time to verify the payment status of the order, and if the order is not paid, the order needs to be closed in time; after the user has successfully registered, it takes a period of time, such as a week, to verify the user's usage, and if the user is found to be low in activity, send an email or text message to remind the user to use it.

Rabbitmq message TTL combines with Dead letter Exchange

1. TTL (Time To Live) of the message

The TTL of the message is the lifetime of the message. RabbitMQ can set TTL for queues and messages, respectively. The setting for the queue means that the queue has no retention time connected to the consumer, and you can set it separately for each individual message. After this time, we think the news is dead, which is called dead letter. If the queue is set and the message is set, the smaller one will be taken. So if a message is routed to a different queue, the time of death of the message may be different (different queue settings). Here we focus on the TTL of a single message, because it is the key to implementing deferred tasks. You can set the time by setting the expiration field or the x-message-ttl property of the message, both of which have the same effect.

2.Dead Letter Exchanges

The concept of Exchage will not be repeated here. A message will enter a dead-letter route if it meets the following conditions. Remember that this is a route rather than a queue, and a route can correspond to many queues.

1. A message is rejected by Consumer, and requeue is false in the parameter of the reject method. In other words, it will not be put in the queue again and will be used by other consumers.

two。 The TTL of the above message has arrived and the message has expired.

3. The length limit of the queue is full. Messages at the top of the list are discarded or thrown on dead-letter routes.

Dead Letter Exchange is actually a normal exchange, just like creating other exchange. It's just that if a message expires in a queue that sets Dead Letter Exchange, it will automatically trigger the forwarding of the message and send it to the Dead Letter Exchange.

3. Implement delay queu

Let's first set the strings for each configuration.

Public interface TestMq {/ * queue name * / String TEST_QUEUE = "test"; / * Service add routing key * / String ROUTING_KEY_TEST = "post.test"; / * * Dead letter queue * / String DEAD_QUEUE = "dead"; String ROURING_KEY_DEAD = "dead.routing.key"; String MQ_EXCHANGE_DEAD = "dead.exchange";}

Configuration information

/ * rabbitmq configuration * * / @ Configurationpublic class RabbitmqConfig {/ * Dead letter queue * @ return * / @ Bean public Queue deadQueue () {Map arguments = new HashMap (); / / enter dead letter switch arguments.put ("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD) here / / enter the route of the message queue here instead of the dead letter queue's own route arguments.put ("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST); return new Queue (TestMq.DEAD_QUEUE,true,false,false,arguments) } / * Dead letter switch * @ return * / @ Bean public DirectExchange deadExchange () {return new DirectExchange (TestMq.MQ_EXCHANGE_DEAD) } / * bind the dead letter queue to the dead letter switch * @ return * / @ Bean public Binding bindingDeadExchange () {return BindingBuilder.bind (deadQueue ()) .to (deadExchange ()) .with (TestMq.ROURING_KEY_DEAD) } / * queue for getting messages listened to by consumers * @ return * / @ Bean public Queue testQueue () {return new Queue (TestMq.TEST_QUEUE,true,false,false) } / * bind the message queue to the dead-letter switch, which is different from the route of the dead-letter queue * @ return * / @ Bean public Binding bindingTest () {return BindingBuilder.bind (testQueue ()) .to (deadExchange ()) .with (TestMq.ROUTING_KEY_TEST);}}

Message producer

@ Slf4j@Componentpublic class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@ Autowired private RabbitTemplate rabbitTemplate; public void send (String exchange,String routingKey,Object content) {log.info ("send content=" + content); this.rabbitTemplate.setMandatory (true); this.rabbitTemplate.setConfirmCallback (this); this.rabbitTemplate.setReturnCallback (this) MessagePostProcessor processor = message-> {/ / the expiration time set for the message is 10 seconds message.getMessageProperties (). SetExpiration (10000 + "); return message;}; this.rabbitTemplate.convertAndSend (exchange,routingKey,serialize (content), processor) Callback after confirmation: * @ param correlationData * @ param ack * @ param cause * / @ Override public void confirm (@ Nullable CorrelationData correlationData, boolean ack, @ Nullable String cause) {if (! ack) {log.info ("send ack fail, cause =" + cause);} else {log.info ("send ack success") Return callback after failure: * * @ param message * @ param replyCode * @ param exchange * @ param routingKey * / @ Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info ("send fail return-message =" + new String (message.getBody ()) + ", replyCode:" + replyCode + " ReplyText: "+ replyText +", exchange: "+ exchange +", routingKey: "+ routingKey) Binary serialization of message objects * @ param o * @ return * / private byte [] serialize (Object o) {Kryo kryo = new Kryo (); ByteArrayOutputStream stream = new ByteArrayOutputStream (); Output output = new Output (stream); kryo.writeObject (output, o); output.close (); return stream.toByteArray ();}}

Consumer

@ Slf4j@Component@RabbitListener (queues = TestMq.TEST_QUEUE) public class TestConsumer {@ RabbitHandler public void receice (byte [] data, Channel channel, Message message) throws IOException {try {/ / tell the server that this message has been consumed by me and can be deleted in the queue Otherwise, the message server thinks that the message will be sent channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false); Integer orderNo = unSerialize (data); log.info (orderNo + "is the received message");} catch (IOException e) {e.printStackTrace () / / discard this message channel.basicNack (message.getMessageProperties (). GetDeliveryTag (), false,false); log.info ("receiver fail");}} / * deserialization * @ param data * @ return * / private Integer unSerialize (byte [] data) {Input input = null Try {Kryo kryo = new Kryo (); input = new Input (new ByteArrayInputStream (data)); return kryo.readObject (input,Integer.class);} finally {input.close ();}

Let's write a random test.

@ Servicepublic class TestService {@ Autowired private TestSender sender; @ PostConstruct public void test () {/ / here is dead-letter switch, dead-letter queue routing, message sender.send (TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1);}}

After testing

2019-10-11 17 c.g.rabbitdelay.config.TestSender 2615 18.079 INFO 879-[main] c.g.rabbitdelay.config.TestSender: send content=1

2019-10-11 17 o.s.a.r.c.CachingConnectionFactory 26 INFO 18.098 879-[main] o.s.a.r.c.CachingConnectionFactory: Attempting to connect to: [xxx.xxx.xxx.xxx:5672]

2019-10-11 17 o.s.a.r.c.CachingConnectionFactory 2618. 227 INFO 879-[main] o.s.a.r.c.CachingConnectionFactory: Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]

2019-10-11 17 send ack success 2614 INFO 18.337 879-[39.9.225.2 send ack success]

2019-10-11 17 o.s.s.concurrent.ThreadPoolTaskExecutor 2615 18.446 INFO 879-[main] o.s.s.concurrent.ThreadPoolTaskExecutor: Initializing ExecutorService 'applicationTaskExecutor'

2019-10-11 17 o.s.b.a.e.web.EndpointLinksResolver 26 18. 751 INFO 879-[main] o.s.b.a.e.web.EndpointLinksResolver: Exposing 2 endpoint (s) beneath base path'/ actuator'

2019-10-11 17 o.s.b.w.embedded.tomcat.TomcatWebServer 2615 INFO 18.959-[main] o.s.b.w.embedded.tomcat.TomcatWebServer: Tomcat started on port (s): 8080 (http) with context path''

2019-10-11 17 c.g.rabbitdelay.RabbitdelayApplication 26 18. 962 INFO 879-[main] c.g.rabbitdelay.RabbitdelayApplication: Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)

2019-10-11 17 c.g.rabbitdelay.consumer.TestConsumer 2614 INFO 28.342 879-[ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer: 1 is the message received

As you can see from the log, it takes 18 seconds to send a message and 28 seconds to receive a message, with an interval of 10 seconds.

The above is all the content of the article "how to set up RabbitMQ delay queue". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report