Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Why should RocketMQ ensure the consistency of subscription relationships?

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/03 Report--

Wechat official account "back-end advanced", focusing on back-end technology sharing: Java, Golang, WEB framework, distributed middleware, service governance and so on.

Some time ago, a friend asked me a question. He said that in the process of building a RocketMQ cluster, he encountered the problem of consumer subscription. The specific questions are as follows:

Then he sent me a log of the error:

The consumer's subscription not exist

I immediately found the location of the error in the source code:

Org.apache.rocketmq.broker.processor.PullMessageProcessor#proce***equest:

SubscriptionData = consumerGroupInfo.findSubscriptionData (requestHeader.getTopic ()); if (null = = subscriptionData) {log.warn ("the consumer's subscription not exist, group: {}, topic: {}", requestHeader.getConsumerGroup (), requestHeader.getTopic ()); response.setCode (ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark ("the consumer's subscription not exist" + FAQUrl.suggestTodo (FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response;}

The source code here is to find out the subscription information of the Topic, but it is not found here, so it reports an error that the consumption subscription does not exist.

My friend also told me that in his consumer cluster, each consumer subscribes to his own Topic. There are C1 and C2 consumers in his consumer group, C1 subscribing to topicA and C2 subscribing to topicB.

Now that I already know why, let me first say that consumers' subscription information is grouped in group in broker. The data structure is as follows:

Org.apache.rocketmq.broker.client.ConsumerManager:

Private final ConcurrentMap consumerTable = new ConcurrentHashMap (1024)

This means that each consumer in the cluster overwrites each other's subscription information when registering subscription information with broker, which is why the same consumer group should have exactly the same subscription relationship, while each consumer of friends in the same consumption group has a different subscription relationship, which leads to the problem of mutual coverage of subscription information.

But the friend has doubts at this time, he thinks that every consumer subscribes to his own topic, it seems to be no problem, ah, logic also works, he does not understand why RocketMQ does not allow this, so adhering to the professional accomplishment of the old driver, below I will deeply analyze RocketMQ consumption subscription registration, message pull, message queue load and redistribution mechanism from the perspective of source code, so that we can thoroughly understand the RocketMQ consumption subscription mechanism.

Registration of consumer subscription information

Consumers register subscription information with all broker when starting, and start the heartbeat mechanism to update subscription information regularly. Each consumer has a MQClientInstance. Consumers start this class when starting, and some column timing tasks are started in the startup method, among which:

Org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:

This.scheduledExecutorService.scheduleAtFixedRate (new Runnable () {@ Override public void run () {try {MQClientInstance.this.cleanOfflineBroker (); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock ();} catch (Exception e) {log.error ("ScheduledTask sendHeartbeatToAllBroker exception", e);}, 1000, this.clientConfig.getHeartbeatBrokerInterval (), TimeUnit.MILLISECONDS)

The above is a scheduled task to send subscription heartbeat information to all broker in the cluster. The source code continues to follow and finds that each broker in the cluster will be sent its own HeartbeatData,HeartbeatData, that is, the heartbeat data of each client, which contains the following data:

/ / client IDprivate String clientID;// producer information private Set producerDataSet = new HashSet (); / / Consumer information private Set consumerDataSet = new HashSet ()

The consumer information contains the topic information subscribed by the client.

Let's continue to look at how broker handles HeartbeatData data. When the client sends HeartbeatData, the request type is HEART_BEAT. We directly find the logic for broker to handle HEART_BEAT request types:

Org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat:

Public RemotingCommand heartBeat (ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand (null); / / Decoding, get HeartbeatData HeartbeatData heartbeatData = HeartbeatData.decode (request.getBody (), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo (ctx.channel (), heartbeatData.getClientID (), request.getLanguage (), request.getVersion ()) / / Circular registration of consumer subscription information for (ConsumerData data: heartbeatData.getConsumerDataSet ()) {/ / obtain subscription configuration information by consumption group SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager () .findSubscriptionGroupConfig (data.getGroupName ()); boolean isNotifyConsumerIdsChangedEnable = true; if (null! = subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable (); int topicSysFlag = 0 If (data.isUnitMode ()) {topicSysFlag = TopicSysFlag.buildSysFlag (false, true);} String newTopic = MixAll.getRetryTopic (data.getGroupName ()); this.brokerController.getTopicConfigManager () .createTopicInSendMessageBackMethod (newTopic, subscriptionGroupConfig.getRetryQueueNums (), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag) } / / registered consumer subscription information boolean changed = this.brokerController.getConsumerManager (). RegisterConsumer (data.getGroupName (), clientChannelInfo, data.getConsumeType (), data.getMessageModel (), data.getConsumeFromWhere (), data.getSubscriptionDataSet (), isNotifyConsumerIdsChangedEnable); / /. Response.setCode (ResponseCode.SUCCESS); response.setRemark (null); return response;}

Here, we can see that after receiving the HEART_BEAT request, broker decompresses the request data to obtain HeartbeatData, and registers in a loop according to the consumption and subscription information in HeartbeatData:

Org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer:

Public boolean registerConsumer (final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set subList, boolean isNotifyConsumerIdsChangedEnable) {/ / get consumer information in the consumer group ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get (group) / / if the consumer information of the consumer group is empty, create a new if (null = = consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo (group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent (group, tmp); consumerGroupInfo = prev! = null? Prev: tmp;} boolean R1 = consumerGroupInfo.updateChannel (clientChannelInfo, consumeType, messageModel, consumeFromWhere); / / Update subscription information, which is stored by consumer group, so this step will cause the subscription information of various consumer clients in the same consumer group to be overwritten each other boolean R2 = consumerGroupInfo.updateSubscription (subList) If (R1 | | R2) {if (isNotifyConsumerIdsChangedEnable) {this.consumerIdsChangeListener.handle (ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel ());}} this.consumerIdsChangeListener.handle (ConsumerGroupEvent.REGISTER, group, subList); return R1 | | R2;}

This step is the core method for broker to update consumer subscription information. If the consumer information ConsumerGroupInfo of the consumer group is empty, create a new one. As you can see from the name, the subscription information is stored according to the consumer group, so when updating the subscription information, the subscription information is stored according to the consumer group. This step will cause the subscription information of various consumer clients in the same consumer group to be overwritten each other.

Message pull

When MQClientInstance starts, a thread is started to handle the message pull task:

Org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

/ / Start pull servicethis.pullMessageService.start ()

PullMessageService inherits ServiceThread, while ServiceThread implements the Runnable interface, and its run method is implemented as follows:

Org.apache.rocketmq.client.impl.consumer.PullMessageService#run:

@ Overridepublic void run () {while (! this.isStopped ()) {try {/ / get pull message request object PullRequest pullRequest = this.pullRequestQueue.take () from pullRequestQueue; / / execute message pull this.pullMessage (pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error ("Pull Message Service Run Method exception", e);}

The consumer gets the PullRequest object to pull the message. PullRequestQueue is a blocking queue. If the pullRequest data is empty, the take () method will block until a new pullRequest pull task comes in. Here is a critical step. You may wonder, when will the pullRequest be created and put into the pullRequestQueue? PullRequest, which is created in RebalanceImpl, is an implementation of the RocketMQ message queue load and redistribution mechanism.

Message queue load and redistribution

From the above message pull source code analysis, we can see that when pullMessageService starts, it will always block because there is no pullRequest object in pullRequestQueue, but when MQClientInstance starts, it will also start a thread to handle the message queue load and redistribution task:

Org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

/ / Start rebalance servicethis.rebalanceService.start ()

RebalanceService also inherits ServiceThread, and its run method is as follows:

Overridepublic void run () {while (! this.isStopped ()) {this.waitForRunning (waitInterval); this.mqClientFactory.doRebalance ();}}

Continue to follow:

Org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance:

Public void doRebalance (final boolean isOrder) {/ / get all consumer subscription information Map subTable = this.getSubscriptionInner (); if (subTable! = null) {for (final Map.Entry entry: subTable.entrySet ()) {final String topic = entry.getKey (); try {/ / message queue load and redistribution this.rebalanceByTopic (topic, isOrder) } catch (Throwable e) {if (! topic.startsWith (MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn ("rebalanceByTopic Exception", e);} this.truncateMessageQueueNotMyTopic ();}

Here, the main purpose is to obtain the topics subscribed by the client, and redistribute the message queue load according to the topic. SubTable stores the subscription information of consumers, which will be filled in when consumers subscribe to messages. Let's move on:

Org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

Set mqSet = this.topicSubscribeInfoTable.get (topic); List cidAll = this.mQClientFactory.findConsumerIdList (topic, consumerGroup)

The rebalanceByTopic method is the core of realizing the load balancing on the Consumer side. With the load and redistribution of the message queue in the cluster mode, we first obtain the queue information of the subscription topic from the topicSubscribeInfoTable, and then randomly obtain the subscription client ID list of a topic in the consumption group from a broker in the cluster. It should be noted here that the subscription client information can be obtained from any broker in the cluster. The previous analysis also said that when the consumer client starts, it starts a thread that sends heartbeats to all broker.

Org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

/ / if the topic subscription information mqSet and the topic subscription client are not empty, execute the message queue load and redistribute if (mqSet! = null & & cidAll! = null) {List mqAll = new ArrayList (); mqAll.addAll (mqSet); / / sort to ensure that only one consumer Collections.sort (mqAll) is assigned to each message queue; Collections.sort (cidAll); / / message queue allocation algorithm AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy / / execute the algorithm and get the result object allocateResult List allocateResult = null; try {allocateResult = strategy.allocate (this.consumerGroup, this.mQClientFactory.getClientId (), mqAll, cidAll);} catch (Throwable e) {log.error ("AllocateMessageQueueStrategy.allocate Exception. AllocateMessageQueueStrategyName= {} ", strategy.getName (), e); return;} / /...}

The above is the core logic of message load balancer. RocketMQ itself provides five load algorithms. AllocateMessageQueueAveragely average allocation algorithm is used by default. The characteristics of its allocation algorithm are as follows:

Suppose there are consumer group G1, consumer C1 and c2MaginC1 subscribe to topicA,c2 and subscribe to topicB, and there are broker1 and broker2 in the cluster. Suppose topicA has eight message queues, broker_a (q0/q1/q2/q3) and broker_b (q0/q1/q2/q3). Previously, we know that the findConsumerIdList method will obtain the consumption of all consumer clients in the consumer group after being allocated by the average allocation algorithm:

C1:broker_a (q0/q1/q2/q3)

C2:broker_b (q0/q1/q2/q3)

Here is the problem. C2 does not subscribe to topicA at all, but according to the allocation algorithm, c2 has to be added for allocation, which will result in half of the messages allocated to c2 for consumption, and the message queues assigned to c2 will be delayed for more than ten seconds or more before they are consumed. TopicB is the same.

Below, I use a graph to show the consumption of topicA and topicB after rebalance:

As for why we signed up for the consumer's subscription not exist, let's continue to do so:

Org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

If (mqSet! = null & & cidAll! = null) {/ /. Set allocateResultSet = new HashSet (); if (allocateResult! = null) {allocateResultSet.addAll (allocateResult);} / / the result of user reassignment allocateResult updates the message queue cache table processQueueTable of the current consumer load and generates pullRequestList to put into the pullRequestQueue blocking queue boolean changed = this.updateProcessQueueTableInRebalance (topic, allocateResultSet, isOrder); if (changed) {log.info ("rebalanced result changed. AllocateMessageQueueStrategyName= {}, group= {}, topic= {}, clientId= {}, mqAllSize= {}, cidAllSize= {}, rebalanceResultSize= {}, rebalanceResultSet= {} ", strategy.getName (), consumerGroup, topic, this.mQClientFactory.getClientId (), mqSet.size (), cidAll.size (), allocateResultSet.size (), allocateResultSet); this.messageQueueChanged (topic, mqSet, allocateResultSet);}

The above code logic mainly uses mqSet and cidAll to load and redistribute the message queue to get the result allocateResult, which is a MessageQueue list, and then updates the consumer load message queue cache table processQueueTable with allocateResult to generate pullRequestList and put it into the pullRequestQueue blocking queue:

Org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

List pullRequestList = new ArrayList () / / execute in a loop, encapsulating the mqSet subscription data into a PullRequest object and adding it to the pullRequestList for (MessageQueue mq: mqSet) {/ / if the subscription information does not exist in the cache list Explain the newly added message queue if (! this.processQueueTable.containsKey (mq)) {if (isOrder & &! this.lock (mq)) {log.warn ("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq) after this message queue redistribution Continue;} this.removeDirtyOffset (mq); ProcessQueue pq = new ProcessQueue (); long nextOffset = this.computePullFromWhere (mq); if (nextOffset > = 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent (mq, pq); if (pre! = null) {log.info ("doRebalance, {}, mq already exists, {}", consumerGroup, mq) } else {log.info ("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest (); pullRequest.setConsumerGroup (consumerGroup); pullRequest.setNextOffset (nextOffset); pullRequest.setMessageQueue (mq); pullRequest.setProcessQueue (pq); pullRequestList.add (pullRequest); changed = true }} else {log.warn ("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);} / / add pullRequestList to the pullRequestQueue blocking queue in PullMessageService to wake up the PullMessageService thread to perform message pull this.dispatchPullRequest (pullRequestList)

We mentioned earlier that message pull is performed by pulling pullRequest from the pullRequestQueue blocking queue, and the above method is where the pullRequest is created.

At this point in the source code analysis, you can figure out why the the consumer's subscription not exist error was reported:

Suppose consumer C1 and consumer c2MagneC1 subscribe to topicA,c2 and subscribe to topicB, and at this time c2 starts first, updates the subscription information of G1 to topicB,c1 and then starts, overwrites the subscription information of G1 to the Rebalance load of topicA,c1, adds the pullRequest of topicA to pullRequestQueue, and just at this time the c2 heartbeat package updates the subscription information of G1 to topicB, then the PullMessageService thread of C1 gets the pullRequest of topicA in pullRequestQueue to pull the message. However, if the subscription information of topicA under consumer group G1 cannot be found on the broker side (because it is covered by c2 heartbeat packet at this time), an error will be reported that the consumer subscription information does not exist.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report