Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does the RocketMQ master and slave synchronize the progress of message consumption?

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

I also told you about the rules of RocketMQ read-write separation earlier, but you may ask, how is the consumption progress between master and slave servers kept synchronized? Now let me give you an answer.

If consumers have different consumption patterns, there will be different ways to save them. The progress of message consumption on the consumer side is saved to OffsetStore, which has two implementation classes:

Org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore / / Local consumption progress saving implementation org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore / / remote consumption progress saving implementation

If the message is consumed in broadcast mode, the consumption progress of the message is saved locally. In the case of cluster consumption mode, the consumption progress of the message is saved to Broker, but no matter whether it is saved locally or saved to Broker, consumers will leave a cache locally. Let's take a look at how the cache of message consumption progress is saved in cluster consumption mode: http://m.qd8.com.cn/yiyao/xinxi21_3710012.html

Org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:

Public void updateOffset (MessageQueue mq, long offset, boolean increaseOnly) {if (mq! = null) {AtomicLong offsetOld = this.offsetTable.get (mq); if (null = = offsetOld) {offsetOld = this.offsetTable.putIfAbsent (mq, new AtomicLong (offset));} if (null! = offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly (offsetOld, offset);} else {offsetOld.set (offset);}

After consuming the message, the messenger will call the above method to put the consumption progress into the offsetTable cache. When the Rebalance load is redistributed to generate a PullRequest object, the RemoteBrokerOffsetStore.readOffset method will be called to retrieve the corresponding consumption progress cache value from the offsetTable cache and put the value into the PullRequest object. Then the message consumption progress cache will be sent to the Broker side when the message is pulled, so we will continue to look at the processing logic of the Broker side.

When I organized the Broker startup process before, I found that a scheduled task will be opened when Broker starts:

Org.apache.rocketmq.broker.BrokerController#initialize:

This.scheduledExecutorService.scheduleAtFixedRate (new Runnable () {@ Override public void run () {try {BrokerController.this.slaveSynchronize.syncAll ();} catch (Throwable e) {log.error ("ScheduledTask syncAll slave exception", e);}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS)

If the Broker is a slave server, the above scheduled task will be started.

Org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll:

Public void syncAll () {this.syncTopicConfig (); this.syncConsumerOffset (); this.syncDelayOffset (); this.syncSubscriptionGroupConfig ();}

When the master server is not down, the slave server will regularly synchronize message consumption progress and other information from the master server. Now the problem arises, because this synchronization is unilateral, that is, only the master server will be synchronized with the master server. If the master server goes down, the consumer switches to pull messages from the server for consumption, if the master server starts later. The offset that has already been consumed is synchronized from the server, isn't that causing synchronous consumption?

In fact, when the consumer pulls the message, if there is consumption progress in the consumer's cache, it will update the message consumption progress to Broker, so even if the main server is down, after it is restarted, the consumer's consumption progress will not be lost, and the message consumption progress of the main server will still be updated. In this way, the consumer will only hang up one of the devices with the main server, which will not cause the message to be consumed again. The specific code logic is as follows:

Org.apache.rocketmq.broker.processor.PullMessageProcessor#proce***equest:

Boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable & & hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable & & this.brokerController.getMessageStoreConfig (). GetBrokerRole ()! = BrokerRole.SLAVE;if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager (). CommitOffset (RemotingHelper.parseChannelRemoteAddr (channel), requestHeader.getConsumerGroup (), requestHeader.getTopic (), requestHeader.getQueueId (), requestHeader.getCommitOffset ());}

BrokerAllowSuspend indicates whether broker is allowed to be suspended. The default value of true,hasCommitOffsetFlag indicates whether the consumer has cached the progress of message consumption in memory. As can be seen from the code logic, if Broker is the master server and brokerAllowSuspend and hasCommitOffsetFlag are both true, then the progress of consumer consumption will be updated locally. What is the evaluation of Jiaozuo Gastrointestinal Hospital: http://jz.lieju.com/zhuankeyiyuan/37325143.htm

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report