In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article is to share with you about the changes in the number of consumer, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
ConsumerManagerpublic boolean registerConsumer (final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set subList, boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get (group); if (null = = consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo (group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent (group, tmp); consumerGroupInfo = prev! = null? Prev: tmp;} boolean R1 = consumerGroupInfo.updateChannel (clientChannelInfo, consumeType, messageModel, consumeFromWhere); boolean R2 = consumerGroupInfo.updateSubscription (subList); if (R1 | | R2) {if (isNotifyConsumerIdsChangedEnable) {/ / notify other consumer this.consumerIdsChangeListener.handle (ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel ()) in the same group;} this.consumerIdsChangeListener.handle (ConsumerGroupEvent.REGISTER, group, subList) Return R1 | | R2;} public void unregisterConsumer (final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get (group); if (null! = consumerGroupInfo) {consumerGroupInfo.unregisterChannel (clientChannelInfo); if (consumerGroupInfo.getChannelInfoTable (). IsEmpty ()) {ConsumerGroupInfo remove = this.consumerTable.remove (group) If (remove! = null) {log.info ("unregister consumer ok, no any connection, and remove consumer group, {}", group); this.consumerIdsChangeListener.handle (ConsumerGroupEvent.UNREGISTER, group);}} if (isNotifyConsumerIdsChangedEnable) {/ / one-way notification channel this.consumerIdsChangeListener.handle (ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel ()) } DefaultConsumerIdsChangeListener@Overridepublic void handle (ConsumerGroupEvent event, String group, Object... Args) {case CHANGE: if (args = = null | | args.length < 1) {return;} List channels = (List) args [0] If (channels! = null & & brokerController.getBrokerConfig (). IsNotifyConsumerIdsChangedEnable ()) {/ / A pair of channel connections of other consumer in the group sends an one-way notification (regardless of whether the other party receives it or not) for (Channel chl: channels) {this.brokerController.getBroker2Client (). NotifyConsumerIdsChanged (chl, group);} break } Broker2Clientpublic void notifyConsumerIdsChanged (final Channel channel, final String consumerGroup) {if (null = = consumerGroup) {log.error ("notifyConsumerIdsChanged consumerGroup is null"); return;} NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader (); requestHeader.setConsumerGroup (consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand (RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try {this.brokerController.getRemotingServer (). InvokeOneway (channel, request, 10) } catch (Exception e) {/ / send an exception, just print log log.error ("notifyConsumerIdsChanged exception," + consumerGroup, e.getMessage ());}}
The notification channel is one-way, that is, regardless of whether the other party has replied or not, the notification is considered to be successful, so two things will happen:
Channel receives the message: after receiving the message, channel will trigger rebalance, normal logic
Channel did not receive the message: the consumer will not trigger rebalance, there is a problem!
Register: the consumer does not know that a new consumer has been added, resulting in multiple consumer consumption for the same mq
Unregister: the consumer does not know that there is a consumer offline, resulting in some mq without consumer responsible for consumption
Let's take a look at unregister's situation first.
When consumer starts, a RebalanceService thread will be started at the same time. What this thread does is to initiate rebalance every 20 seconds, so that the influence of unregister can be reduced, and messages leading to the mq will be delayed for up to 20 seconds before consumer is responsible for consumption.
RebalanceServiceprivate static long waitInterval = Long.parseLong (System.getProperty ("rocketmq.client.rebalance.waitInterval", "20000")); @ Overridepublic void run () {log.info (this.getServiceName () + "service started"); while (! this.isStopped ()) {this.waitForRunning (waitInterval); this.mqClientFactory.doRebalance ();} log.info (this.getServiceName () + "service end");}
Next, analyze and compare the large Register.
The same mq has different consumer consumption in the same group, which is a big problem in clustering mode, which will lead to repeated consumption, consumption schedule errors and other problems. With the idea that rocketmq should not make such a low-level mistake, it is really different to look at the code.
RebalanceImplprivate void rebalanceByTopic (final String topic, final boolean isOrder) {/ / rebalance process / / the key point is here. After rebalance above, you can know which mq consumption boolean changed = this.updateProcessQueueTableInRebalance (topic, allocateResultSet, isOrder) you are responsible for. } private boolean updateProcessQueueTableInRebalance (final String topic, final Set mqSet, final boolean isOrder) {for (MessageQueue mq: mqSet) {/ / if it is a new mq, it attempts to call the remote broker lock mq and fails to acquire the lock, which means that another consumer has acquired the lock. You should give up consuming the mq if (! this.processQueueTable.containsKey (mq)) {if (isOrder & &! this.lock (mq)) {log.warn ("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq) Continue;} above is how the number of consumer changes. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.