In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article will explain in detail how to solve some problems of master-slave synchronization in RocketMQ. The content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
1. Master-slave synchronization for the first time
The basic implementation process of master-slave synchronization is shown in the following figure:
The master-slave synchronization mechanism of RocketMQ is as follows:
First start Master and listen on the designated port
The client starts, actively connects to Master, and establishes a TCP connection
The client pulls the message from the server at intervals of 5 seconds. If it is the first time, it first obtains the maximum offset in the local commitlog file, and then pulls the message from the server with this offset.
The server parses the request and returns a batch of data to the client
After receiving a batch of messages, the client writes the message to the local commitlog file, then reports the pull progress to Master, and updates the next pull offset.
Then repeat step 3.
An important feature of RocketMQ master-slave synchronization: master-slave synchronization does not have master-slave switching function, that is, when the master node is down, it will never take over message transmission, but can provide message reading.
Warm Tip: this article will not analyze the implementation details of RocketMQ master-slave synchronization in detail. If you are interested in it, you can refer to the author's "inside RocketMQ Technology" or check the author's blog post: RocketMQ master-slave synchronization details.
2. Ask questions
Master, the slave server is running, does the message consumer pull the message from the master or from the slave?
In the RocketMQ master-slave synchronization architecture, if the master server goes down, the slave server will take over message consumption, how to maintain the progress of message consumption? when the master server resumes, whether the message consumer pulls the message from the master server or from the slave server, and how to synchronize the progress of message consumption between master and slave servers?
Next, with the above questions, let's explore its implementation principle.
3. Probe into the principle of 3.1 RocketMQ master-slave read-write separation mechanism
The master-slave synchronization of RocketMQ, by default, RocketMQ will give priority to pulling messages from the master server, which is not the usual separation of read and write, so when will it be pulled?
> Tip: this section will not detail the whole process, but will only point out its key points. If you want to learn more about the core processes such as message pull and message consumption, you are advised to refer to the author's "inside RocketMQ Technology".
The core code to determine whether to pull from the master or from the slave in RocketMQ is as follows: DefaultMessageStore#getMessage
Long diff = maxOffsetPy-maxPhyOffsetPulling; / / [@ 1] (https://my.oschina.net/u/1198)long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio () / 100.0)); / / @ 2getResult.setSuggestPullingFromSlave (diff > memory); / / [@ 3] (https://my.oschina.net/u/2648711)
Code @ 1: first introduce the meaning of several local variables:
The current maximum physical offset of the maxOffsetPy. The offset returned is the content that has been stored in the operating system's PageCache.
MaxPhyOffsetPulling pull the maximum physical offset of this message. According to the basic principle of pulling the message in order, you can basically predict that the physical offset pulled next time will be greater than this value and will be near it.
The interval between diff maxOffsetPy and maxPhyOffsetPulling, which getMessage is usually used for when consuming messages, that is, this interval can be understood as the total size of messages currently outstanding.
Code @ 2: get the total size of RocketMQ messages stored in PageCache. If the RocketMQ capacity exceeds this value, it will be replaced out of memory. If you want to access messages that are not in PageCache, you need to read them from disk.
StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE returns the total physical memory of the current system. Parameters.
AccessMessageInMemoryMaxRatio sets the threshold at which messages are stored in memory, which defaults to 40. Combined with the meaning of the two parameters of code @ 2, it is calculated that the maximum value that RocketMQ messages can be mapped to memory is 40% * (machine physical memory).
Code @ 3: set whether to pull the flag from the pull next time. The condition to trigger the next pull from the server is that the size of all available message data (all commitlog) files has exceeded their width, and defaults to 40% of the physical memory.
So where is the suggestPullingFromSlave property of GetResult used?
PullMessageProcessor#processRequest
If (getMessageResult.isSuggestPullingFromSlave ()) {/ / @ 1responseHeader.setSuggestWhichBrokerId (subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly ());} else {responseHeader.setSuggestWhichBrokerId (MixAll.MASTER_ID);} switch (this.brokerController.getMessageStoreConfig (). GetBrokerRole ()) {/ / @ 2 case ASYNC_MASTER: case SYNC_MASTER: break Case SLAVE: if (! this.brokerController.getBrokerConfig (). IsSlaveReadEnable ()) {response.setCode (ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId (MixAll.MASTER_ID);} break } if (this.brokerController.getBrokerConfig (). IsSlaveReadEnable ()) {/ / @ 3 / / consume too slow, redirect to another machine if (getMessageResult.isSuggestPullingFromSlave ()) {responseHeader.setSuggestWhichBrokerId (subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly ());} / / consume ok else {responseHeader.setSuggestWhichBrokerId (subscriptionGroupConfig.getBrokerId ()) }} else {responseHeader.setSuggestWhichBrokerId (MixAll.MASTER_ID);}
Code @ 1: if you find too many messages when looking for messages from the commitlog file, it is recommended to read from the server after the default exceeds 40% of the physical memory.
Code @ 2: if the role of the current server is slave server: and slaveReadEnable=true, the value set by code @ 1 is ignored and the next pull switch to slave pull.
Code @ 3: if slaveReadEnable=true (read from allowed), and it is recommended to read from the server, it is recommended from the message consumption group to pull brokerId when the message consumption is slow, which is determined by the subscription group configuration attribute whichBrokerWhenConsumeSlowly; if the message consumption speed is normal, the brokerId recommended by the subscription group is used to pull the message for consumption, which defaults to the master server. If slave readability is not allowed, pull from master is always used.
> Note: please note the broker service parameter slaveReadEnable and the subscription group configuration information: the values of whichBrokerWhenConsumeSlowly and brokerId. In the production environment, you can dynamically change the configuration information of the subscription group through the updateSubGroup command.
If the configuration of the subscription group remains at the default value, after the pull message request is sent to the slave server, the next message pull will be sent to the master server regardless of whether slaveReadEnable is enabled or not.
In the above step, in the return field of the message pull command, the next suggested pull Broker is returned to the client and pulled from the specified broker according to its value.
Message pull implementation PullAPIWrapper updates the brokerId recommended by the server to the broker pull cache table when processing the pull result.
Before initiating the pull request, first select the Broker of the message to be pulled according to the following code.
3.2 synchronization mechanism of message consumption progress
As can be seen from the above, the main purpose of master-slave synchronization is that the content of message accumulation is more than 40% of physical memory by default, then message reading is taken over by the slave server to achieve the separation of read and write messages, so as to avoid serious jitter in the master service IO. Then the problem arises, where is the progress of message consumption stored after the master server goes down and the slave server takes over the message consumption? When the primary server returns to normal, is the message pulled from the primary server or from the slave server? How does the master server know the latest news consumption progress?
RocketMQ message consumption progress management (cluster mode): in cluster mode, the message consumption progress storage file is located on the server ${ROCKETMQ_HOME} / store/config/consumerOffset.json. Message consumers pull a batch of messages from the server and submit them to the specific thread pool of the consumer group for processing. When the message is consumed successfully, it will send an ACK message to Broker to inform the consumer which message has been successfully consumed. After receiving the feedback on the progress of message consumption, Broker first stores it in memory, and then persists it to the consumeOffset.json file regularly. Note: for more details on the implementation of message consumption schedule management, it is recommended to consult the author's "inside RocketMQ Technology".
Let's first take a look at how to select Broker when the client reports the progress of message consumption to the server. Because the brokerId of the master service is 0, by default, when the master server is alive, the master server will be selected first, and the slave server will be selected only if the master server is down.
Since the progress of message consumption in cluster mode is stored on the broker side, and when the master server is normal, the message consumption progress file is stored in the master server, then the following two questions are raised: 1) the message consumer will give priority to feedback on the progress of message consumption to the master server while the master server is alive, so how does the slave server synchronize the progress of message consumption. 2) when the master server goes down, the message consumer will report the progress of message consumption to the slave server, how to store the progress of message consumption, and how to know the latest progress of message consumption when the master server returns to normal.
In order to solve the above two questions, let's first take a look at the processing logic of the Broker server after receiving the order to submit the message consumption progress feedback:
The client periodically sends a request to update the progress of message consumption to the broker. The entry is: RemoteBrokerOffsetStore#updateConsumeOffsetToBroker. A very key point in this method is to select the logic of broker, as shown below:
If the master server is alive, select the master server, and if the master server is down, select the slave server. In other words, regardless of whether the message is pulled from the master server or from the slave server, submit a request for message consumption progress and give priority to the master server. The server receives its offset, updates it to the server's memory, and then periodically persists it to ${ROCKETMQ_HOME} / store/config/consumerOffset.json.
After the above analysis, let's discuss this scenario: the message consumer first pulls the message from the master server and submits the message consumption progress to it. If the slave server takes over the message pull service when the master server goes down, the message consumption progress is stored in the slave server, and the message consumption progress of the master server and slave server will be inconsistent? When the primary server returns to normal, how can the progress of message consumption between the two be synchronized?
3.2.1 synchronize the progress of the master server from the service timing
If the Broker role is a slave server, syncAll is called through scheduled tasks to synchronize topic routing information, message consumption progress, delay queue processing progress, and consumption group subscription information from the master server.
The problem arises: if the slave server synchronizes the message progress from the master server immediately after the master server starts, won't it have to consume again?
In fact, in most cases, even if the slave service synchronizes the consumption progress of a long time ago from the master server, there is no need to re-consume as long as the messenger does not restart. In this case, RocketMQ provides two mechanisms to ensure that the progress of message consumption is not lost.
First, message consumers have the latest progress of message consumption in memory. After continuing to pull messages from the server at this rate, and after the messages are processed, they will regularly feedback the progress of message consumption to the Broker server. As mentioned above, when feedback on the progress of message consumption, the master server will be preferred. At this time, the progress of message consumption of the main server will be updated immediately. At this time, the slave server only needs to regularly synchronize the message consumption progress of the master server.
The second is that when the message consumer pulls the message from the master server, if it is the master server, it will also update the progress of message consumption when processing the message pull.
3.2.2 Update the progress of message consumption when pulling messages from the master server
When the master server processes the message pull command, it will trigger the update of the message consumption progress. The code entry is: PullMessageProcessor#processRequest
Boolean storeOffsetEnable = brokerAllowSuspend; / / @ 1storeOffsetEnable = storeOffsetEnable & & hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable & & this.brokerController.getMessageStoreConfig (). GetBrokerRole ()! = BrokerRole.SLAVE; / / @ 2if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager () .commitOffset (RemotingHelper.parseChannelRemoteAddr (channel), requestHeader.getConsumerGroup (), requestHeader.getTopic (), requestHeader.getQueueId (), requestHeader.getCommitOffset ());}
Code @ 1: first introduce the meaning of several local variables:
Whether brokerAllowSuspend:broker allows suspending. When pulling messages, this value defaults to true.
HasCommitOffsetFlag: whether the message consumer caches the message consumption progress in memory, and if so, the flag is set to true. If the role of Broker is the primary server, and both of the above variables are true, then update the message consumption progress with commitOffset first.
See here, the master-slave synchronization of message consumption progress-related questions, should have the answer.
The explanation of the above implementation principle is a bit boring, let's first answer the following questions:
1. Master and slave servers are running. Does the message consumer pull the message from the master or from the slave? A: by default, RocketMQ message consumers pull from the master server. When the backlog of messages on the master server exceeds 40% of the physical memory, it is recommended to pull from the slave server. However, if slaveReadEnable is false, it means that the slave server is not readable and the slave server will not take over the message pull.
2. When the message consumer pulls the message from the server, it will always pull the message from the server. A: no. The situation is as follows: 1) if the slaveReadEnable of the slave server is set to false, the next time it is pulled, it will be pulled from the master server. 2) if the slave server is allowed to read and the backlog of messages from the slave server does not exceed 30% of its physical memory, the Broker used for the next pull is the Broker server specified by the brokerId of the subscription group. This value defaults to 0, which represents the master server. 3) if the messages allowed to be read from the server and the backlog from the server exceed 30% of its physical memory, the Broker used for the next pull is the Broker server specified for the whichBrokerWhenConsumeSlowly of the subscription group. This value defaults to 1, which represents the slave server.
3. How is the master-slave service message consumption synchronized? Answer: the synchronization of the progress of message consumption is one-way. Start a scheduled task from the slave server to synchronize the progress of message consumption from the master server; no matter whether the message consumer pulls the message from the master server or the message from the slave server, when feedback the progress of message consumption to the Broker, priority is given to the master server. When the message consumer pulls the message from the master server, if the message consumer has the message consumption progress in memory, the master will try to keep up with the new message consumption progress.
The correct use posture of read-write separation: 1. The slaveReadEnable of the master-slave Broker server is set to true. 2. Update the message groups whichBrokerWhenConsumeSlowly and brokerId through the updateSubGroup command, especially do not set their brokerId to 0, otherwise after pulling from the server, the next pull will be pulled from the master.
On how to solve a number of RocketMQ master-slave synchronization problems to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.