Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the method of implementing Topic theme pattern by integrating RabbitMq with springboot2.5.6?

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article focuses on "what is the method of springboot2.5.6 integration RabbitMq to achieve Topic theme pattern", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the method of implementing the Topic theme pattern with springboot2.5.6 integrated RabbitMq?"

1.application.yml

Server: port: 8184spring: application: name: rabbitmq-demo rabbitmq: host: 127.0.0.1 # ip address port: 5672 username: admin # connection account password: 123456 # connection password template: retry: enabled: true # failed retry initial-interval: 10000ms # interval between the first retry max-interval: 300000ms # maximum retry interval Beyond this interval, multiplier: 2 # multiple of the next retry interval will not be retried. Here is 2, that is, the next retry interval is twice the default switch name of the last time. After being configured here, if the switch is not specified, the publisher-confirm-type: correlated # producer confirmation mechanism will be used to send the message to ensure that the message will be sent correctly. If it fails, there will be an error receipt. This triggers a retry of publisher-returns: true listener: type: simple simple: acknowledge-mode: manual prefetch: 1 # to limit sending one piece of data at a time. Concurrency: 3 # launch several consumers in the same queue max-concurrency: 3 # launch maximum number of consumers # retry policy-related configuration retry: enabled: true # support retry max-attempts: 5 stateless: false multiplier: 1.0 # time policy multiplier factor initial-interval: 1000ms Max-interval: 10000ms default-requeue-rejected: true

2.pom.xml introduces dependency

Org.springframework.boot spring-boot-starter-amqp

3. Constant class creation

/ * @ author kkp * @ ClassName RabbitMqConstants * @ date, 2021-11-3 14:16 * @ Description * / public class RabbitMqConstants {public final static String TEST1_QUEUE = "test1-queue"; public final static String TEST2_QUEUE = "test2-queue"; public final static String EXCHANGE_NAME = "test.topic.exchange"; / * * routingKey1 * / public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*" Public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test"; / * routingKey1 * / public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*"; public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";}

4. Configure Configuration

Import com.example.demo.common.RabbitMqConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope / * @ author kkp * @ ClassName RabbitMqConfig * @ date, 2021-11-3 14:16 * @ Description * / @ Slf4j@Configurationpublic class RabbitMqConfig {@ Autowired private CachingConnectionFactory connectionFactory / * declare that the switch * / @ Bean (RabbitMqConstants.EXCHANGE_NAME) public Exchange exchange () {/ / durable (true) is persistent, and the switch is still in / / Topic mode / / return ExchangeBuilder.topicExchange (RabbitMqConstants.EXCHANGE_NAME) .durable (true) .build () after mq restart. / / publish and subscribe mode return ExchangeBuilder.fanoutExchange (RabbitMqConstants.EXCHANGE_NAME) .durable (true) .build () } / * declare queue * new Queue (QUEUE_EMAIL,true,false,false) * durable= "true" persistent rabbitmq restart does not need to create a new queue * auto-delete indicates that the message queue will be automatically deleted when it is not in use. The default is false * exclusive indicates whether the message queue is only valid in the current connection. The default is false * / @ Bean (RabbitMqConstants.TEST1_QUEUE) public Queue esQueue () {return new Queue (RabbitMqConstants.TEST1_QUEUE) } / * declaration queue * / @ Bean (RabbitMqConstants.TEST2_QUEUE) public Queue gitalkQueue () {return new Queue (RabbitMqConstants.TEST2_QUEUE) } / * TEST1_QUEUE queue binds the switch and specifies routingKey * / @ Bean public Binding bindingEs (@ Qualifier (RabbitMqConstants.TEST1_QUEUE) Queue queue, @ Qualifier (RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {return BindingBuilder.bind (queue) .to (exchange) .with (RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY) .noargs () } / * TEST2_QUEUE queue binds the switch and specifies routingKey * / @ Bean public Binding bindingGitalk (@ Qualifier (RabbitMqConstants.TEST2_QUEUE) Queue queue, @ Qualifier (RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {return BindingBuilder.bind (queue) .to (exchange) .with (RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY) .noargs () } / * if you need a callback after the message is sent by the producer, * you need to set the ConfirmCallback object to the rabbitTemplate. * because different producers need to correspond to different ConfirmCallback, * if the rabbitTemplate is set to singleton bean, * the actual ConfirmCallback of all rabbitTemplate is the last declared ConfirmCallback. * @ return * / @ Bean @ Scope (ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate () {RabbitTemplate template = new RabbitTemplate (connectionFactory); return template;}}

5.Rabbit utility class creation

Import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.UUID;/** * @ author kkp * @ ClassName RabbitMqUtils * @ date, 2021-11-3 14:21 * @ Description * / @ Slf4j@Componentpublic class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private RabbitTemplate rabbitTemplate / * Constructor injection * / @ Autowired public RabbitMqUtils (RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate; / / this is to set the callback to receive and send to the response rabbitTemplate.setConfirmCallback (this); / / if the backup queue is set, it does not work rabbitTemplate.setMandatory (true); rabbitTemplate.setReturnCallback (this) } / * callback confirmation * / @ Override public void confirm (CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info ("message sent successfully: correlationData ({}), ack ({}), cause ({})", correlationData,ack,cause) } else {log.info ("message delivery failed: correlationData ({}), ack ({}), cause ({})", correlationData,ack,cause) }} / * message sent to the converter without alignment, if backup is configured, the callback will not take effect * @ param message * @ param replyCode * @ param replyText * @ param exchange * @ param routingKey * / @ Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange) String routingKey) {log.info ("message loss: exchange ({}), route ({}), replyCode ({}), replyText ({}), message: {}", exchange,routingKey,replyCode,replyText,message) } / * send to the specified Queue * @ param queueName * @ param obj * / public void send (String queueName, Object obj) {CorrelationData correlationId = new CorrelationData (UUID.randomUUID (). ToString ()); this.rabbitTemplate.convertAndSend (queueName, obj, correlationId) } / * 1, switch name * 2, routingKey * 3, message content * / public void sendByRoutingKey (String exChange, String routingKey, Object obj) {CorrelationData correlationId = new CorrelationData (UUID.randomUUID (). ToString ()); this.rabbitTemplate.convertAndSend (exChange, routingKey, obj, correlationId);}}

6.service creation

Public interface TestService {String sendTest1 (String content); String sendTest2 (String content);}

7.impl implementation

Import com.example.demo.common.RabbitMqConstants;import com.example.demo.util.RabbitMqUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/** * @ author kkp * @ ClassName TestServiceImpl * @ date on 2021-11-3 14:24 * @ Description * / @ Service@Slf4jpublic class TestServiceImpl implements TestService {@ Autowired private RabbitMqUtils rabbitMqUtils @ Override public String sendTest1 (String content) {rabbitMqUtils.sendByRoutingKey (RabbitMqConstants.EXCHANGE_NAME, RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content); log.info (RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST+ "* sent successfully *"); return "sent successfully!" ;} @ Override public String sendTest2 (String content) {rabbitMqUtils.sendByRoutingKey (RabbitMqConstants.EXCHANGE_NAME, RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content); log.info (RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST+ "* sent successfully *"); return "sent successfully!" ;}}

8. Monitoring class

Import com.example.demo.common.RabbitMqConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel / * @ author kkp * @ ClassName RabbitMqListener * @ date, 2021-11-3 14:22 * @ Description * / @ Slf4j@Componentpublic class RabbitMqListener {@ RabbitListener (queues = RabbitMqConstants.TEST1_QUEUE) public void test1Consumer (Message message, Channel channel) {try {/ / manually confirm that the message has been consumed channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false) Log.info ("Counsoum1 consumption message:" + message.toString () + ". Success! ") ;} catch (Exception e) {e.printStackTrace (); log.info ("Counsoum1 consumption message:" + message.toString () + ". Failed!) ;} @ RabbitListener (queues = RabbitMqConstants.TEST2_QUEUE) public void test2Consumer (Message message, Channel channel) {try {/ / manually confirm that the message has been consumed channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false); log.info ("Counsoum2 consumption message:" + message.toString () + ". Success!) ;} catch (Exception e) {e.printStackTrace (); log.info ("Counsoum2 consumption message:" + message.toString () + ". Failed!) ;}

9.Controller test

Import com.example.demo.server.TestService;import jdk.nashorn.internal.objects.annotations.Getter;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.util.Map;/** * @ author kkp * @ ClassName TestController * @ date on 2021-11-3 14:25 * @ Description * / @ Slf4j@RestController@RequestMapping ("/ enterprise") public class TestController {@ Autowired private TestService testService @ GetMapping ("/ finance") public String hello3 (@ RequestParam (required = false) Map params) {return testService.sendTest2 (params.get ("entId"). ToString ());} / * * send the message test2 * @ param content * @ return * / @ PostMapping (value = "/ finance2") public String sendTest2 (@ RequestBody String content) {return testService.sendTest2 (content) }} at this point, I believe you have a deeper understanding of "what is the method of springboot2.5.6 integrating RabbitMq to implement Topic theme pattern". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report