In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how to achieve persistence and release confirmation of Java RabbitMQ. It is very detailed and has a certain reference value. Friends who are interested must read it!
1. Persistence
Messages sent by message producers are not lost when the RabbitMQ service is stopped. By default, queues and messages are ignored when RabbitMQ exits or crashes. To ensure that the message is not lost, both the queue and the message need to be marked as persistent.
1.1 implementation of persistence
1. Queue persistence: channel.queueDeclare () when creating the queue; the second parameter is changed to true.
two。 Message persistence: channel.basicPublish () when sending a message using the channel; change the third parameter to: MessageProperties.PERSISTENT_TEXT_PLAIN indicates persistent message.
/ * * @ Description persistence MQ * @ date 9:14 on 2022-3-7 * / public class Producer3 {private static final String LONG_QUEUE = "long_queue"; public static void main (String [] args) throws Exception {Channel channel = RabbitMQUtils.getChannel (); / / persistence queue channel.queueDeclare (LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner (System.in); int I = 0 While (scanner.hasNext ()) {imessages; String msg = scanner.next () + I; / / persistent messages channel.basicPublish ("", LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes (StandardCharsets.UTF_8)); System.out.println ("send messages:'" + msg + "'success");}
But there is also a cached interval for storing messages, there is no real write to disk, and the persistence guarantee is not strong enough, but it is more than sufficient for simple queues.
1.2 unfair distribution
The method of polling distribution is not applicable when consumers have different processing efficiency. So the real fairness should follow the premise that those who can do more work.
Modify channel.basicQos (1) at the consumer to enable unfair distribution
/ * * @ Description unfair distribution of consumers * @ date 9:27 on 2022-3-7 * / public class Consumer2 {private static final String LONG_QUEUE = "long_queue"; public static void main (String [] args) throws Exception {Channel channel = RabbitMQUtils.getChannel () DeliverCallback deliverCallback = (consumerTag, message)-> {/ / simulate concurrent sleep for 30 seconds try {Thread.sleep (30000); System.out.println ("Thread B receives messages:" + new String (message.getBody (), StandardCharsets.UTF_8)); channel.basicAck (message.getEnvelope () .getDeliveryTag (), false) } catch (InterruptedException e) {e.printStackTrace ();}}; / / set unfair distribution channel.basicQos (1); channel.basicConsume (LONG_QUEUE,false,deliverCallback, consumerTag-> {System.out.println (consumerTag + "consumers cancel consumption") });}} 1.3 Test unfair distribution
The purpose of the test: whether those who can do more work can be achieved.
Test method: two consumers sleep different events to simulate different processing events, if the processing time (sleep time) is short enough to process multiple messages, it represents the goal.
First start the producer to create the queue, and then start two consumers respectively.
The producer sends four messages in order:
Thread A with a short sleep time received three messages
Thread B, which has a long sleep time, only receives the second message:
Because thread B takes a long time to process messages, other messages are assigned to thread A.
The experiment was a success!
1.4 Prefetching value
Message sending and manual acknowledgement are done asynchronously, so there is a buffer for unacknowledged messages, and developers want to be able to limit the size of the buffer to avoid unlimited unacknowledged message problems in the buffer.
The expected value here is worth the above method channel.basicQos (); if there is a message equal to the parameter on the current channel, the message will not be consumed in the current channel.
1.4.1 Code testing
Test method:
1. Create two different consumers each with an expected value of 5 2s.
two。 5 for those who sleep for a long time and 2 for those who sleep for a short time.
3. If the message is obtained according to the specified expected value, it indicates that the test is successful, but it does not mean that it will be assigned according to 5 and 2, which is similar to the discrimination of weight.
The code can modify the expected value according to the above code.
two。 Release confirmation
Release confirmation is the process that after the producer publishes the message to the queue, the queue confirmation is persisted and then notified to the producer. This is the only way to ensure that the message will not be lost.
It is important to note that queue persistence needs to be turned on in order to use confirmation publishing.
Enable method: channel.confirmSelect ()
2.1 single confirmation release
Is a synchronous publishing method, that is, after sending a message, only after confirming it to confirm the release, subsequent messages will continue to be released, and an exception will be thrown without confirmation within a specified period of time. The disadvantage is that it is very slow.
/ * * @ Description confirm release-single acknowledgement * @ date 14:49 on 2022-3-7 * / public class SoloProducer {private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main (String [] args) throws Exception {Channel channel = RabbitMQUtils.getChannel (); / / generate queue channel.queueDeclare (QUEUE_NAME,true,false,false,null) / / enable confirm release channel.confirmSelect (); / / record start time long beginTime = System.currentTimeMillis (); for (int I = 0; I
< MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 单个发布确认 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("发送消息:" + i); } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); }}2.2 批量确认发布 一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。 /** * @Description 确认发布--批量确认 * @date 2022/3/7 14:49 */public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 设置一个多少一批确认一次。 int batchSize = MESSAGE_COUNT / 10; // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量发布确认 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("发送消息:" + i); } } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); }} 显然效率要比单个确认发布的高很多。 2.3 异步确认发布 在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。 /** * @Description 确认发布--异步确认 * @date 2022/3/7 14:49 */public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{System.out.println ("confirm success message:" + deliveryTab);}; / confirm failure callback ConfirmCallback nackCallback = (deliveryTab,multiple)-> {System.out.println ("unacknowledged message:" + deliveryTab);} / / message listener / * addConfirmListener: * 1. Confirm the message of success; * 2. Confirm the failed message. * / channel.addConfirmListener (ackCallback,nackCallback); for (int I = 0; I
< MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); }}2.4 处理未确认的消息 最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。 例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。 处理方式: 1.记录要发送的全部消息; 2.在发布成功确认处删除; 3.打印未确认的消息。 使用一个哈希表存储消息,它的优点: 可以将需要和消息进行关联;轻松批量删除条目;支持高并发。 ConcurrentSkipListMap map = new ConcurrentSkipListMap();/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String >Map = new ConcurrentSkipListMap (); / / record start time long beginTime = System.currentTimeMillis (); / / confirm successful callback ConfirmCallback ackCallback = (deliveryTab, multiple)-> {/ / 2. Delete at the confirmation of successful release / / bulk delete if (multiple) {ConcurrentNavigableMap confirmMap = map.headMap (deliveryTab); confirmMap.clear ();} else {/ / delete map.remove (deliveryTab) alone;} System.out.println ("confirm success message:" + deliveryTab) }; / / callback ConfirmCallback nackCallback = (deliveryTab,multiple)-> {/ / 3 to confirm the failure. Print unacknowledged messages. System.out.println ("unacknowledged message:" + map.get (deliveryTab) + ", tag:" + deliveryTab);}; / / message listener / * addConfirmListener: * 1. Confirm the message of success; * 2. Confirm the failed message. * / channel.addConfirmListener (ackCallback,nackCallback); for (int I = 0; I < MESSAGE_COUNT; iTunes +) {String msg = "" + I; channel.basicPublish (", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes (StandardCharsets.UTF_8)); / / 1. Record all messages to be sent Map.put (channel.getNextPublishSeqNo (), msg);} / record end time long endTime = System.currentTimeMillis (); System.out.println ("send" + MESSAGE_COUNT + "message consumption:" + (endTime-beginTime) + "millisecond");}} above is all the content of the article "how Java RabbitMQ achieves persistence and release confirmation". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.