In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains the "case study of the RPC function of RabbitMQ in SpringBoot". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "the case study of the RPC function of RabbitMQ in SpringBoot".
I. A brief introduction to RPC of RabbitMQ
In actual business, sometimes we still need to wait for the consumer to return the result to us, or we need a function, a method or an interface on the consumer to return the corresponding value to us, but often the large system software, the producer and the consumer are two independent systems, deployed on two different computers, not through direct objects. Method to get the desired results, we need to use the RPC (Remote Procedure Call) remote procedure call method.
The way for RabbitMQ to implement RPC is very simple. The producer sends a message with a tag (message ID (correlation_id) + callback queue name) to the sending queue, the consumer (also known as RPC server) gets the message from the sending queue and processes the business, parses the information of the tag and sends the business result to the specified callback queue, and the producer obtains the return result of the message sent from the callback queue according to the information of the label.
2. Use the RPC function of RabbitMQ in SpringBoot
Note: when used in springboot, correlation_id is automatically generated by the system, and reply_to is set when the AmqpTemplate instance is loaded.
Example:
Description: queue 1 is the sending queue and queue 2 is the return queue.
1. Configure rabbitmq first
Package com.ws.common;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value Import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/* * rabbitMQ configuration class * / @ Configurationpublic class RabbitMQConfig {public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String TOPIC_EXCHANGE = "topic.exchange"; @ Value ("${spring.rabbitmq.host}") private String host @ Value ("${spring.rabbitmq.port}") private int port; @ Value ("${spring.rabbitmq.username}") private String username; @ Value ("${spring.rabbitmq.password}") private String password; @ Autowired ConnectionFactory connectionFactory; @ Bean (name = "connectionFactory") public ConnectionFactory connectionFactory () {CachingConnectionFactory connectionFactory = new CachingConnectionFactory (); connectionFactory.setHost (host) ConnectionFactory.setPort (port); connectionFactory.setUsername (username); connectionFactory.setPassword (password); connectionFactory.setVirtualHost ("/"); return connectionFactory;} @ Bean public RabbitTemplate rabbitTemplate () {RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory) / / set reply_to (return queue, can only be set here) rabbitTemplate.setReplyAddress (TOPIC_QUEUE2); rabbitTemplate.setReplyTimeout (60000); return rabbitTemplate;} / return queue listener (required) @ Bean (name= "replyMessageListenerContainer") public SimpleMessageListenerContainer createReplyListenerContainer () {SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer (); listenerContainer.setConnectionFactory (connectionFactory) ListenerContainer.setQueueNames (TOPIC_QUEUE2); listenerContainer.setMessageListener (rabbitTemplate ()); return listenerContainer;} / / create queue @ Bean public Queue topicQueue1 () {return new Queue (TOPIC_QUEUE1);} @ Bean public Queue topicQueue2 () {return new Queue (TOPIC_QUEUE2) } / / create the switch @ Bean public TopicExchange topicExchange () {return new TopicExchange (TOPIC_EXCHANGE);} / / bind the switch to the queue @ Bean public Binding topicBinding1 () {return BindingBuilder.bind (topicQueue1 ()) .to (topicExchange ()) .with (TOPIC_QUEUE1) } @ Bean public Binding topicBinding2 () {return BindingBuilder.bind (topicQueue2 ()) .to (topicExchange ()) .with (TOPIC_QUEUE2);}}
two。 Send a message and wait for the return value synchronously
@ Autowiredprivate RabbitTemplate rabbitTemplate;// message bodyString sss = "message content"; / / encapsulate MessageMessage msg = this.con (sss); log.info ("client -" + msg.toString ()); / / use sendAndReceive method to complete rpc call Message message=rabbitTemplate.sendAndReceive (RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg); / / extract rpc response content bodyString response = new String (message.getBody ()) Log.info ("response:" + response); log.info ("rpc complete -"); public Message con (String s) {MessageProperties mp = new MessageProperties (); byte [] src = s.getBytes (Charset.forName ("UTF-8")); / / mp.setReplyTo ("adsdas") When loading AmqpTemplate, the setting is useless / / mp.setCorrelationId ("2222"); the system is generated, and the setting is not mp.setContentType ("application/json"); mp.setContentEncoding ("UTF-8"); mp.setContentLength ((long) s.length ()); return new Message (src, mp);}
3. Write about consumers
Package com.ws.listener.mq;import java.nio.charset.Charset;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.ws.common.RabbitMQConfig;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class Receiver {@ Autowired private RabbitTemplate rabbitTemplate @ RabbitListener (queues=RabbitMQConfig.TOPIC_QUEUE1) public void receiveTopic1 (Message msg) {log.info ("queue 1:" + msg.toString ()); String msgBody = new String (msg.getBody ()); / / data processing, returned Message Message repMsg = con (msgBody+ "returned", msg.getMessageProperties () .getCorrelationId ()) RabbitTemplate.send (RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg);} @ RabbitListener (queues=RabbitMQConfig.TOPIC_QUEUE2) public void receiveTopic2 (Message msg) {log.info ("queue 2:" + msg.toString ()) } public Message con (String s, String id) {MessageProperties mp = new MessageProperties (); byte [] src = s.getBytes (Charset.forName ("UTF-8")); mp.setContentType ("application/json"); mp.setContentEncoding ("UTF-8"); mp.setCorrelationId (id) Return new Message (src, mp);}}
Log printing:
2019-06-26 17 Body:' 11 http-nio-8080-exec-4 16.607 [http-nio-8080-exec-4] client-(content of Body:' message 'MessageProperties [headers= {}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2019-06-26 17 Body:' 11 SimpleAsyncTaskExecutor-1 16.618 [SimpleAsyncTaskExecutor-1] queue 1: (contents of Body:' message 'MessageProperties [headers= {}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])
2019-06-26 17 1115 16. 623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController-response: the contents of the message have been returned
2019-06-26 17 rpc 1115 16. 623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController-rpc completed-
Thank you for your reading, the above is the content of "case study of RPC function of RabbitMQ in SpringBoot". After the study of this article, I believe you have a deeper understanding of the case study of RPC function of RabbitMQ in SpringBoot, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.