In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces how to use RabbitMQ to achieve priority queues in Spring Boot. The content is very detailed. Interested friends can refer to it for reference. I hope it can help you.
Run RabbitMQ locally
docker run -d \--name rabbitmq \--restart always \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=user \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3-management
Access the visualization panel
Address: 127.0.0.1:15672/
Account: user
Password: password
Spring Boot With RabbitMQ
Spring Boot Integration RabbitMQ
org.springframework.boot spring-boot-starter-amqp
Basic parameter configuration
# host & portspring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672
Queue / Exchange / Routing Configuration
/** * RabbitMQ Configuration */@Configurationpublic class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.# "; /** * Define Priority Queue */ @Bean Queue queue() { Map args= new HashMap(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * definition exchanger */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }}
Priority queue definition refer to official document: www.rabbitmq.com/priority.html
Spring Boot 应用启动后,会自动创建 Queue 和 Exchange ,并相互绑定,优先级队列会有如图所示标识。
RabbitMQ Publisher
Spring Boot 相关配置
# 是否开启消息发送到交换器(Exchange)后触发回调spring.rabbitmq.publisher-confirms=false# 是否开启消息发送到队列(Queue)后触发回调spring.rabbitmq.publisher-returns=false# 消息发送失败重试相关配置spring.rabbitmq.template.retry.enabled=truespring.rabbitmq.template.retry.initial-interval=3000msspring.rabbitmq.template.retry.max-attempts=3spring.rabbitmq.template.retry.max-interval=10000msspring.rabbitmq.template.retry.multiplier=1
发送消息
@Component@AllArgsConstructorpublic class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * 发送设置有优先级的消息 * * @param priority 优先级 */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); }}RabbitMQ Consumer
Spring Boot 相关配置
# 消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)spring.rabbitmq.listener.simple.acknowledge-mode=AUTO# 最小线程数量spring.rabbitmq.listener.simple.concurrency=10# 最大线程数量spring.rabbitmq.listener.simple.max-concurrency=10# 每个消费者可能未完成的最大未确认消息数量spring.rabbitmq.listener.simple.prefetch=1
消费者执行耗时较长的话,建议 spring.rabbitmq.listener.simple.prefetch 设置为较小数值,让优先级越高的消息更快加入到消费者线程。
监听消息
@Slf4j@Componentpublic class MessageListener { /** * 处理消息 */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); }}番外补充
1、自定义消息发送确认的回调
配置如下:
# 开启消息发送到交换器(Exchange)后触发回调spring.rabbitmq.publisher-confirms=true# 开启消息发送到队列(Queue)后触发回调spring.rabbitmq.publisher-returns=true
自定义
RabbitTemplate.ConfirmCallback
实现类
@Slf4jpublic class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一标识: {}", correlationData); log.info("确认状态: {}", ack); log.info("造成原因: {}", cause); }}
自定义
RabbitTemplate.ConfirmCallback
实现类
@Slf4jpublic class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体: {}", message); log.info("回复编码: {}", replyCode); log.info("回复内容: {}", replyText); log.info("交换器: {}", exchange); log.info("路由键: {}", routingKey); }}
配置 rabbitTemplate
@Component@AllArgsConstructorpublic class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }关于Spring Boot中怎么利用RabbitMQ实现优先级队列就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.