Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the process of building RabbitMq message middleware by Java

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

In this issue, the editor will bring you about the process of building RabbitMq message middleware by Java. The article is rich in content and analyzed and described from a professional point of view. I hope you can get something after reading this article.

Preface

Message queuing is needed when factors such as the speed or stability of "production" and "consumption" are inconsistent in the system.

Noun

Exchange: switch routingkey: routing key queue: queue

Console port: 15672

Exchange and queue need to be bound together, and then the message is sent to exchange and then sent by exchange to the corresponding queue through routingkey.

Working with scen

1. Skill order is automatically cancelled in 3 minutes to change status.

two。 Reminder 15 minutes before the start of the live broadcast

3. The live broadcast status ends automatically

Process flow

Producer sends message-> order_pre_exchange switch-> order_per_ttl_delay_queue queue

-> time expiration-> order_delay_exchange switch-> order_delay_process_queue queue-> Consumer

Step 1: add to the pom file

Org.springframework.boot spring-boot-starter-amqp

Step 2: add to the application.properties file

Spring.rabbitmq.host=172.xx.xx.xxxspring.rabbitmq.port=5672spring.rabbitmq.username=rabbitspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true

Step 3: configure OrderQueueConfig

Package com.tuohang.platform.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration / * * rabbitMQ queue setting (messages sent by producers always enter exchange first, then be routed and forwarded to queue) * @ author Administrator * @ version 1.0 * @ Date September 18, 2018 * / @ Configurationpublic class OrderQueueConfig {/ * * * order buffer switch name * / public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange" / * the message sent to the queue will expire into order_delay_process_queue after a period of time. [all message in the queue have a uniform expiration time] * / public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue"; / * switch name of the order DLX * / final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange" / * the queue entered after the order message time expires, that is, the actual consumption queue of the order * / public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue"; / * the order expires in the buffer queue for 30 minutes (milliseconds) * / public final static int ORDER_QUEUE_EXPIRATION = 1800000 / * * order buffer switch * * @ return * / @ Bean public DirectExchange preOrderExange () {return new DirectExchange (ORDER_PRE_EXCHANGE_NAME) } / * create an order_per_ttl_delay_queue queue, and the order message is buffered by the switch Will enter the queue * * @ return * / @ Bean public Queue delayQueuePerOrderTTLQueue () {return QueueBuilder.durable (ORDER_PRE_TTL_DELAY_QUEUE_NAME) .withargument ("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) / / DLX .withargument ("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) / / dead letter carrying routing key .withargument ("x-message-ttl") ORDER_QUEUE_EXPIRATION) / / sets the expiration time of the order queue. Build () } / * bind order_pre_exchange to order_pre_ttl_delay_queue queue * * @ param delayQueuePerOrderTTLQueue * @ param preOrderExange * @ return * / @ Bean public Binding queueOrderTTLBinding (Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {return BindingBuilder.bind (delayQueuePerOrderTTLQueue) .to (preOrderExange) .with (ORDER_PRE_TTL_DELAY_QUEUE_NAME) } / * create DLX exchange * * @ return * / @ Bean public DirectExchange delayOrderExchange () {return new DirectExchange (ORDER_DELAY_EXCHANGE_NAME);} / * create order_delay_process_queue queue, that is, the actual order consumption queue * * @ return * / @ Bean public Queue delayProcessOrderQueue () {return QueueBuilder.durable (ORDER_DELAY_PROCESS_QUEUE_NAME). Build () } / * bind DLX to the actual consumption queue * * @ param delayProcessOrderQueue * @ param delayExchange * @ return * / @ Bean public Binding dlxOrderBinding (Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {return BindingBuilder.bind (delayProcessOrderQueue) .to (delayOrderExchange) .with (ORDER_DELAY_PROCESS_QUEUE_NAME) Order_delay_process_queue * * @ param connectionFactory * @ param processReceiver * @ return * / @ Bean public SimpleMessageListenerContainer orderProcessContainer (ConnectionFactory connectionFactory, OrderProcessReceiver processReceiver) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (); container.setConnectionFactory (connectionFactory); container.setQueueNames (ORDER_DELAY_PROCESS_QUEUE_NAME); / / Monitor order_delay_process_queue container.setMessageListener (new MessageListenerAdapter (processReceiver)); return container;}}

Consumer OrderProcessReceiver:

Package com.tuohang.platform.config;import java.util.Objects;import org.apache.tools.ant.types.resources.selectors.Date;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel / * * delayed processing of orders for consumers * @ author Administrator * @ version 1.0 * @ Date September 18, 2018 * / @ Componentpublic class OrderProcessReceiver implements ChannelAwareMessageListener {private static Logger logger = LoggerFactory.getLogger (OrderProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @ Override public void onMessage (Message message, Channel channel) throws Exception {try {processMessage (message) } catch (Exception e) {/ / if an exception occurs, redirect the message to the buffer queue and automatically redo channel.basicPublish (OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes ()) after a certain delay }} / * process the order message, cancel the order if the order is not paid (if the message is FAIL_MESSAGE, you need to throw an exception) * * @ param message * @ throws Exception * / public void processMessage (Message message) throws Exception {String realMessage = new String (message.getBody ()); logger.info ("Received") / / cancel order if (! Objects.equals (realMessage, msg)) {/ / SpringKit.getBean (ITestService.class) .roomSexById (Long.valueOf (realMessage)); System.out.println ("Test 111111 -" + new Date ()); System.out.println (message);}

Or

/ * * * testing rabbit consumers * @ author Administrator * @ version 1.0 * @ Date September 25, 2018 * / @ Component@RabbitListener (queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME) public class TestProcessReceiver {private static Logger logger = LoggerFactory.getLogger (TestProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @ RabbitHandler public void onMessage (Message message, Channel channel) throws Exception {try {processMessage (message) / / tell the server that the message has been consumed by me and can be deleted in the queue. Otherwise, the message server thinks that the message will be sent channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false);} catch (Exception e) {/ / if an exception occurs, redirect the message to the buffer queue and automatically redo channel.basicPublish (TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes ()) after a certain delay }} / * process the order message, cancel the order if the order is not paid (if the message content is FAIL_MESSAGE, you need to throw an exception) * * @ param message * @ throws Exception * / public void processMessage (Message message) throws Exception {String realMessage = new String (message.getBody ()); logger.info ("Received

< " + realMessage + " >

"); / / cancel order if (! Objects.equals (realMessage, msg)) {System.out.println (" Test 111111-"+ new Date ());} else {System.out.println (" rabbit else... ");}

Producer

/ * Test rabbitmq * * @ return * / @ RequestMapping (value = "/ testrab") public String testraa () {GenericResult gr = null; try {String name = "test_pre_ttl_delay_queue"; long expiration = 10000; name,String.valueOf (123456); / / set the expiration time on a single message / / rabbitTemplate.convertAndSend (name, (Object) String.valueOf (123456), new ExpirationMessagePostProcessor (expiration)) } catch (ServiceException e) {e.printStackTrace (); gr = new GenericResult (StateCode.ERROR, languageMap.get ("network_error"), e.getMessage ());} return getWrite (gr);}

This is how the process of building RabbitMq message middleware for the Java shared by Xiaobian is like. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report