Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the RocketMQ consumption pattern?

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article will explain in detail what the RocketMQ consumption pattern is, and the content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

RocketMQ provides two consumption strategies: CLUSTERING cluster consumption (default) and BROADCASTING broadcast consumption. You can specify consumption policies when creating Consumer consumers. Different policies have different internal mechanisms, and the consumption mode, recording consumption progress and message consumption status of messages are also different. Let's discuss it in detail.

1. Cluster consumption (default)

Cluster mode: a Consumer instance in ConsumerGroup allocates queues to Consumer according to the queue allocation policy algorithm, and equally apportion (default) consumption messages. For example, messages whose Topic is Test are sent to different queues of the topic, and 100 messages are sent, of which one ConsumerGroup has 3 Consumer instances. Then according to the queue allocation algorithm, each queue will have consumers. Each consumer instance only consumes the data on its own queue, and the consumed messages cannot be consumed by other consumption instances.

A consumption queue allocates one consumer, one and only one

A consumer may consume 0 to multiple queues, for example: a topic has four consumption queues, but consumers have five. Then according to the first principle, there can be only one consumer in a consumption queue, and there will be consumers who are not assigned to the queue.

1.1. Create cluster consumers

Create two cluster consumers, Consumer1 and Consumer2, and write the code for Consumer1 below, and Consumer2 will not be repeated any more.

Public class Consumer1 {public static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("consumer_test_clustering"); consumer.setNamesrvAddr ("10.10.12.203 String 9876") Consumer.subscribe ("TopicTest", "*") Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List paramList) ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") System.out.println ("Consumer1=== MessageBody:" + msgbody); / / output message content}} catch (Exception e) {e.printStackTrace () Return ConsumeConcurrentlyStatus.RECONSUME_LATER; / / try again later} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / / consumption success}}) Consumer.start (); System.out.println ("Consumer1=== started successfully!");} catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}

We found that there is no obvious cluster consumption, how can we judge that it is cluster consumption? let's take a look at the source code analysis.

We found that DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); has defaulted to many built-in parameters when creating consumers, including consumption pattern CLUSTERING cluster consumption.

Let's start Consumer1 and Consumer2 and send 10 messages to take a look at the consumption. RocketMQ messages are based on the subscription publishing model.

We found that all 10 messages were consumed and there was no repetition. Cluster messages each message is consumed jointly by consumers in the cluster and will not be consumed repeatedly.

Visual interface to view its client information

Check the status of its messages and send 10 messages, and we check its status.

1.2. Queue allocation strategy

1.2.1, average distribution policy (default) (AllocateMessageQueueAveragely)

Let's first look at the schematic.

The core method of analyzing its source code AllocateMessageQueueAveragely class is allocate.

Public List allocate (String consumerGroup, String currentCID, List mqAll, List cidAll) {/ / check whether the current consumer id exists if ((currentCID = = null) | (currentCID.length ())

< 1)) { throw new IllegalArgumentException("currentCID is empty"); } //校验消息队列是否存在 if ((mqAll == null) || (mqAll.isEmpty())) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } //校验所有的消费者id的集合是否为null if ((cidAll == null) || (cidAll.isEmpty())) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List result = new ArrayList(); //校验当前的消费者id是否在消费者id的集群中 if (!(cidAll.contains(currentCID))) { this.log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", new Object[] { consumerGroup, currentCID, cidAll }); return result; } //获取当前的消费者id在集合中的下标 int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = ((mod >

0) & (index

< mod)) ? mqAll.size() / cidAll.size() + 1 : (mqAll.size() 0) && (index < mod)) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; ++i) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result;} 分析其源码 26行:消息队列总数和消费者总数取余 27、28行:计算当前的消费者分配的消息队列数 (1)取余大于0且当前消费者的下标小于取余数(mod >

0) & (index

< mod) -》确定当前的消费者是否在取余里,在则在整除数中+1,mqAll.size() / cidAll.size() + 1 (2)如果已经整除或者不在取余里则判断消息队列是否小于等于消费者总数mqAll.size() 0) && (index < mod)) -》当前下标乘以每个队列的平均队列数index * averageSize (2)如果(1)中不成立则index * averageSize + mod 31行:根据Math.min()计算消费者最终需要消费的数量 32行:获取当前的消费者的队列集合 1.2.2、环形分配策略(AllocateMessageQueueAveragelyByCircle) 我们首先查看其原理图 分析其源码AllocateMessageQueueAveragelyByCircle类的核心方法是allocate public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { if ((currentCID == null) || (currentCID.length() < 1)) { throw new IllegalArgumentException("currentCID is empty"); } if ((mqAll == null) || (mqAll.isEmpty())) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if ((cidAll == null) || (cidAll.isEmpty())) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List result = new ArrayList(); if (!(cidAll.contains(currentCID))) { this.log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", new Object[] { consumerGroup, currentCID, cidAll }); return result; } //上面一堆校验我们之间略过 int index = cidAll.indexOf(currentCID); for (int i = index; i < mqAll.size(); ++i) { if (i % cidAll.size() == index) { result.add(mqAll.get(i)); } } return result; } 分析其源码 23、24、25行:遍历消息的下标, 对下标取模(mod), 如果与index相等, 则存储到result集合中 1.2.3、手动配置分配策略(AllocateMessageQueueByConfig) 分析其源码AllocateMessageQueueByConfig类 public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { private List messageQueueList; public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { return this.messageQueueList; } public String getName() { return "CONFIG"; } public List getMessageQueueList() { return this.messageQueueList; } public void setMessageQueueList(List messageQueueList) { this.messageQueueList = messageQueueList; }} 通过配置来记性消息队列的分配 1.2.4、机房分配策略(AllocateMessageQueueByMachineRoom) 分析其源码AllocateMessageQueueByMachineRoom类的核心方法是allocate public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { List result = new ArrayList(); int currentIndex = cidAll.indexOf(currentCID); if (currentIndex < 0) { return result; } List premqAll = new ArrayList(); for (MessageQueue mq : mqAll) { String[] temp = mq.getBrokerName().split("@"); if ((temp.length == 2) && (this.consumeridcs.contains(temp[0]))) { premqAll.add(mq); } } int mod = premqAll.size() / cidAll.size(); int rem = premqAll.size() % cidAll.size(); int startIndex = mod * currentIndex; int endIndex = startIndex + mod; for (int i = startIndex; i < endIndex; ++i) { result.add(mqAll.get(i)); } if (rem >

CurrentIndex) {result.add (premqAll.get (currentIndex + mod * cidAll.size ();} return result;}

Analyze the source code

Line 4-7, which calculates the subscript (index) of the current consumer in the consumer collection, and returns directly if the subscript is less than 0

Lines 8-14, parse out all valid computer room information (actually valid mq) according to brokerName, remove duplicates with Set collection, and store the result in premqAll.

16 lines to calculate the average result mod of the message divider

Line 17 to calculate whether messages can be consumed rem on average (that is, how many messages are left after average message consumption (remaing))

18 lines to calculate the subscript (startIndex) for the current consumer to start spending

Line 19 to calculate the subscript (endIndex) for the end of consumption of the current consumer

Line 20-26 divides the consumption of messages into two parts, the first part-(cidAllSize * mod) and the second part-(premqAll-cidAllSize * mod); query all messages between startIndex and endIndex from the first part, query currentIndex + mod * cidAll.size () from the second part, and finally return the query result result

1.2.5. Consistent Hash allocation Policy (AllocateMessageQueueConsistentHash)

The core method of analyzing its source code AllocateMessageQueueByMachineRoom class is allocate.

Public List allocate (String consumerGroup, String currentCID, List mqAll, List cidAll) {if ((currentCID = = null) | | (currentCID.length () < 1)) {throw new IllegalArgumentException ("currentCID is empty") } if ((mqAll = = null) | | (mqAll.isEmpty ()) {throw new IllegalArgumentException ("mqAll is null or mqAll empty");} if ((cidAll = = null) | | (cidAll.isEmpty () {throw new IllegalArgumentException ("cidAll is null or cidAll empty") } List result = new ArrayList (); if (! (cidAll.contains (currentCID) {this.log.info ("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", new Object [] {consumerGroup, currentCID, cidAll}) Return result;} Collection cidNodes = new ArrayList (); for (String cid: cidAll) cidNodes.add (new ClientNode (cid)); ConsistentHashRouter router; ConsistentHashRouter router If (this.customHashFunction! = null) router = new ConsistentHashRouter (cidNodes, this.virtualNodeCnt, this.customHashFunction); else {router = new ConsistentHashRouter (cidNodes, this.virtualNodeCnt);} List results = new ArrayList () For (MessageQueue mq: mqAll) {ClientNode clientNode = (ClientNode) router.routeNode (mq.toString ()); if ((clientNode! = null) & & (currentCID.equals (clientNode.getKey () {results.add (mq)) }} return results;} 2. Broadcast consumption

Broadcast consumption: a message is consumed by multiple consumer. Even if these consumer belong to the same ConsumerGroup, the message will be consumed once by each Consumer in the ConsumerGroup. The concept of ConsumerGroup in broadcast consumption can be considered meaningless in message division.

2.1. Create broadcast consumers

Create two cluster consumers, ConsumerGB1 and ConsumerGB2, and write the code for ConsumerGB1 below, and ConsumerGB2 will not be repeated any more.

Public class ConsumerGB1 {public static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("consumer_test_broadcasting"); / / set broadcast consumption consumer.setMessageModel (MessageModel.BROADCASTING) Consumer.setNamesrvAddr ("10.10.12.203 TopicTest 9876"); consumer.subscribe ("TopicTest", "*") Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List paramList) ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") System.out.println ("ConsumerGB1=== MessageBody:" + msgbody); / / output message content}} catch (Exception e) {e.printStackTrace () Return ConsumeConcurrentlyStatus.RECONSUME_LATER; / / try again later} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / / consumption success}}) Consumer.start (); System.out.println ("ConsumerGB1=== started successfully!");} catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}

* * consumer.setMessageModel (MessageModel.BROADCASTING); * * set it to broadcast mode

The consumption progress of messages in broadcast mode is stored locally on the client, but not on the server.

The consumption progress of messages in cluster mode is stored on the server side.

Broadcast mode consumption progress folder: under the C:/Users/gumx/.rocketmq_offsets folder

View consumption progress file

{"offsetTable": {{"brokerName": "broker-a", "queueId": 3, "topic": "TopicTest"}: 14, {"brokerName": "broker-a", "queueId": 2 "topic": "TopicTest"}: 14, {"brokerName": "broker-b", "queueId": 3, "topic": "TopicTest"}: 14, {"brokerName": "broker-b" "queueId": 0, "topic": "TopicTest"}: 14, {"brokerName": "broker-a", "queueId": 1, "topic": "TopicTest"}: 14 {"brokerName": "broker-b", "queueId": 2, "topic": "TopicTest"}: 14, {"brokerName": "broker-a", "queueId": 0 "topic": "TopicTest"}: 13, {"brokerName": "broker-b", "queueId": 1, "topic": "TopicTest"}: 13}}

Through this we can find that the topic is TopicTest-there are 8 consumption queues, broker-an and broker-b are distributed on two Broker nodes, the ID of the queue is 4 from 0 to 3, each queue now has a different message offset, two 13, six 14.

View its consumer information through the interface client

Send 10 messages

We found that both consumer groups consumed 10 messages.

Once again, view its consumer information through the interface client

In broadcast mode, the consumption point is still 0 before and after the message is sent, in fact, it is because the state of the message does not change after the message is consumed in broadcast mode.

After the message is sent in the cluster mode, if the consumer consumes successfully, the consumption site will also increase, and the message status of the consumer group will change.

Let's check the local consumption progress file.

{"offsetTable": {{"brokerName": "broker-a", "queueId": 3, "topic": "TopicTest"}: 15, {"brokerName": "broker-a", "queueId": 2 "topic": "TopicTest"}: 15, {"brokerName": "broker-b", "queueId": 3, "topic": "TopicTest"}: 15, {"brokerName": "broker-b" "queueId": 0, "topic": "TopicTest"}: 15, {"brokerName": "broker-a", "queueId": 1, "topic": "TopicTest"}: 15 {"brokerName": "broker-b", "queueId": 2, "topic": "TopicTest"}: 16, {"brokerName": "broker-a", "queueId": 0 "topic": "TopicTest"}: 14, {"brokerName": "broker-b", "queueId": 1, "topic": "TopicTest"}: 15}

It is found that it has changed its consumption point.

2.2, precautions

There will be some problems in the broadcast mode, let's analyze them in detail.

When you see this picture, will there be some questions about why we have enabled two consumers? why is the consumer's ID the same? is there something wrong with the picture? In fact, this is the consumer's ID. Let's discuss it further.

* * consumer.start (); * * the core method of consumer-initiated source code search is DefaultMQPushConsumerImpl.start (). By analyzing the source code, let's take a look at the ID generation of its consumers.

This.mQClientFactory=MQClientManager.getInstance () getAndCreateMQClientInstance (this.defaultMQPushConsumer,this.rpcHook)

Public MQClientInstance getAndCreateMQClientInstance (final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId (); MQClientInstance instance = this.factoryTable.get (clientId); if (null = = instance) {instance = new MQClientInstance (clientConfig.cloneClientConfig (), this.factoryIndexGenerator.getAndIncrement (), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent (clientId, instance) If (prev! = null) {instance = prev; log.warn ("Returned Previous MQClientInstance for clientId: [{}]", clientId);} else {log.info ("Created new MQClientInstance for clientId: [{}]", clientId);}} return instance;}

String clientId = clientConfig.buildMQClientId ()

Private String instanceName = System.getProperty ("rocketmq.client.name", "DEFAULT"); public String buildMQClientId () {StringBuilder sb = new StringBuilder (); sb.append (this.getClientIP ()); sb.append ("@"); sb.append (this.getInstanceName ()); if (! UtilAll.isBlank (this.unitName)) {sb.append ("@") Sb.append (this.unitName);} return sb.toString ();}

We found that its clientId consists of two parts, the client IP address, and the InstanceName is connected with "@". Of course, InstanceName can be set, and the default is DEFAULT, so we explain why the consumer has started only one of its client consumers'ID.

In broadcast mode consumption, when the client needs to start multiple consumers, it is recommended to set its InstanceName manually. If it is not set, you will find that the consumption progress is a file used.

During broadcast mode consumption, if we migrate the client, we will regenerate its consumption progress file. By default, the consumption starts at the end of the queue, the previous consumption is abandoned by default, and the starting location of consumption can be configured (PS: described in the next chapter)

When consuming in broadcast mode, the offsets.json under the consumer ID/ consumer group name folder under the rocketmq_offsets folder is very important, remember that it cannot be deleted or damaged, otherwise the progress of consumption will be affected, and data may be duplicated or lost during consumption.

C:/Users/gumx/.rocketmq_offsets/192.168.1.102@DEFAULT/consumer_test_broadcasting/offsets.json

Transform its consumer side ConsumerGB1 and ConsumerGB2 to set InstanceName to 00001 and 00002 respectively

Public class ConsumerGB1 {public static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("consumer_test_broadcasting"); / / set broadcast consumption consumer.setMessageModel (MessageModel.BROADCASTING) Consumer.setNamesrvAddr ("10.10.12.203 TopicTest 9876"); consumer.subscribe ("TopicTest", "*"); / / set its InstanceName consumer.setInstanceName ("00001") Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List paramList) ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") System.out.println ("ConsumerGB1=== MessageBody:" + msgbody); / / output message content}} catch (Exception e) {e.printStackTrace () Return ConsumeConcurrentlyStatus.RECONSUME_LATER; / / try again later} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / / consumption success}}) Consumer.start (); System.out.println ("ConsumerGB1=== started successfully!");} catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}

Let's check its consumption progress folder.

In broadcast mode, if two consumers GB1 and GB2 have the same InstanceName consumption group and both start up, messages will be consumed when sending messages, in which GB1 stops abnormally and GB2 consumes normally, then messages during the abnormal stop period will not be consumed after GB1 manual intervention is started, because a consumption progress file is shared.

About what the RocketMQ consumption pattern is shared here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report