Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize Sequential consumption in Message Queue Selector

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Message Queue Selector how to achieve sequential consumption, I believe that many inexperienced people do not know what to do, so this paper summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.

Definition of sequential messages:

Sequential messages means that the consumption order and production order of messages are the same. In some scenarios, sequential messages must be guaranteed. Such as order generation, payment, delivery. Sequential messages are divided into global sequential messages and partial sequential messages. Global sequential messages mean that all messages under a certain topic should be ordered; some sequential messages only need to ensure that a certain group of messages are consumed sequentially. For order messages, as long as you ensure that the generation, payment and delivery messages of the same order ID are consumed sequentially.

Part of the principle of sequential consumption:

1. Sender: ensure that all messages of the same order ID are sent to the same MessageQueue (a certain queue under the same Topic)

two。 Consumer side: ensure that messages in the same MessageQueue are not processed concurrently (different MessageQueue of the same Topic can be consumed at the same time)

DefaultMQProducer producer = new DefaultMQProducer ("local-test-producer"); producer.setNamesrvAddr ("10.76.0.38 producer.start 9876"); producer.start (); for (int I = 0; I < 1000; iTunes +) {Order order = new Order (); order.orderId = I Order.status = "generate"; Message msg1 = new Message ("local-test-producer", "TagA", JsonUtils.toJson (order) .getBytes ()) SendResult sendResult1 = producer.send (msg1, new MessageQueueSelector () {@ Override public MessageQueue select (List mqs, Message msg, Object arg) {return null }, order.orderId); log.info ("sendResult1= {}", sendResult1); Uninterruptibles.sleepUninterruptibly (3, TimeUnit.SECONDS); order.status= "payment" Message msg2 = new Message ("local-test-producer", "TagA", JsonUtils.toJson (order) .getBytes () SendResult sendResult2 = producer.send (msg2, new MessageQueueSelector () {@ Override public MessageQueue select (List mqs, Message msg, Object arg) {return null }, order.orderId); log.info ("sendResult2= {}", sendResult2); Uninterruptibles.sleepUninterruptibly (3, TimeUnit.SECONDS); order.status= "Shipping" Message msg3 = new Message ("local-test-producer", "TagA", JsonUtils.toJson (order) .getBytes () Producer.send (msg2, new MessageQueueSelector () {@ Override public MessageQueue select (List mqs, Message msg, Object arg) {return null;}}, order.orderId) Uninterruptibles.sleepUninterruptibly (3, TimeUnit.SECONDS); SendResult sendResult3 = producer.send (msg3, new MessageQueueSelector () {@ Override public MessageQueue select (List mqs, Message msg, Object arg) {Integer id = (Integer) arg Int index = id% mqs.size (); return mqs.get (index);} / / MessageQueueSelector ensures that messages of the same orderId are stored in the same MessageQueue. }, order.orderId); log.info ("sendResult3= {}", sendResult1);}

The main logic of the consumer side is as follows. The main MessageListenerOrderly callback ensures that messages in the same MessageQueue will not be consumed concurrently:

/ / messages in the same MessageQueue should be consumed sequentially, not concurrently. / / but different MessageQueue of the same Topic are DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("local-test-consumer2"); consumer.setNamesrvAddr ("10.76.0.38 local-test-consumer2"); consumer.subscribe ("test", "); consumer.setPullBatchSize (1); consumer.setConsumeThreadMin (1). Consumer.setConsumeThreadMax (1); / / consumer.registerMessageListener (new MessageListenerConcurrently () {consumer.registerMessageListener (new MessageListenerOrderly () {@ Override public ConsumeOrderlyStatus consumeMessage (List msgs, ConsumeOrderlyContext context) {List messages = new ArrayList () For (MessageExt msg: msgs) {messages.add (new String (msg.getBody ()) + "\ tbroker:" + msg.getStoreHost ()) } System.out.printf ("% s Receive New Messages:% s% n", Thread.currentThread () .getName (), messages); return ConsumeOrderlyStatus.SUCCESS;}}); consumer.start () Thread.currentThread () .join ()

Source code analysis:

We know that it is possible to set multiple threads to consume concurrently for a consumer instance in RocketMQ. Consumer.setConsumeThreadMin and setConsumeThreadMax

So how does MessageListenerOrderly ensure that only one thread of a consumer is consuming a MessageQueue at a certain time?

In the ConsumeMessageOrderlyService of the Client module, the consumer side does not simply prohibit concurrent processing, but locks each Consumer Queue.

Private final MessageQueueLock messageQueueLock = new MessageQueueLock ()

Before consuming each message, you need to obtain the lock corresponding to the Consumer Queue corresponding to this message to ensure that messages of the same Consumer Queue will not be consumed concurrently, but messages of different Consumer Queue can be processed concurrently.

After reading the above, have you mastered how Message Queue Selector can achieve sequential consumption? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report