In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What this article shares to you is about how to solve the loss of a large number of messages caused by a line of RocketMQ code. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
1. Problem phenomenon
If you first receive feedback from the project, the following error will occur when you use RocketMQ:
Error message key: MQBrokerException:CODE:2 DESC: [timeout _ CLEAN_QUEUE] broker busy,start flow control for a while,period in queue:205ms,size of queue:880.
Because the project team did not make any compensation for the failure of sending the message, resulting in the failure of sending the lost message, it is necessary to discuss this problem in depth and solve it.
2. Problem analysis.
First of all, let's query in RocketMQ according to the keyword: TIMEOUT_CLEAN_QUEUE to find out when the above error will be thrown. Search based on the full text as shown in the following figure:
This method is defined in BrokerFastFailure and can be seen as its design purpose by name: broker-side quick failure mechanism.
The schematic diagram of rapid failure on the Broker side is as follows:
The message sender sends a message write request to the Broker. After receiving the request, the Broker side will first put it into a queue (SendThreadPoolQueue). The default capacity is 10000.
Broker specifically uses a thread pool (SendMessageExecutor) to get tasks from the queue and perform message write requests. To ensure that messages are processed sequentially, the default number of threads in this thread pool is 1.
If a single write data jitter occurs due to garbage collection and other factors on the Broker side, the backlog of requests on a single Broker side can not be processed in time, which will greatly prolong the sending time of the client message.
Imagine that if, due to the increased pressure on Broker, it takes 500ms or even more than 1 second to write a message, and there is a backlog of 5000 messages in the queue, the default timeout on the message sender is 3 seconds. At this rate, the client has already timed out these requests when it is Broker's turn to execute the write request, which will not only cause a lot of invalid processing, but also cause the client to send a timeout.
Therefore, in order to solve this problem, RocketMQ introduces a quick failure mechanism on the Broker side, that is, a scheduled scheduling thread is started to check the first queued node in the queue every 10 milliseconds. If the queuing time of the node has exceeded 200ms, it will cancel all requests in the queue that have exceeded 200ms, and immediately return the failure to the client, so that the client can retry as soon as possible, because Broker is deployed in a cluster. The next retry can be sent to another Broker, which can ensure that the message is sent through the retry mechanism within the default time of 3 seconds, which can effectively avoid the unavailability of message delivery caused by instantaneous pressure on a certain Broker, thus achieving high availability of message delivery.
From the original intention of introducing the rapid failure mechanism on Broker, a retry will be initiated after a quick failure. Unless all Broker in the cluster are busy at the same time, the message will be sent successfully, and users will not be aware of this error, so why are users aware of it? Is there a TIMEOUT_ CLEAN _ QUEUE error and Broker won't try again?
In order to solve this mystery, the source code analysis will be used to explore the truth. Next, we will take the synchronous sending of messages as an example to reveal the core key points in the process of sending messages.
The MQ Client message sender first sends the request to Broker using the network channel, then receives the request result and calls the processSendResponse method to parse the response result, as shown in the following figure:
The code returned here is RemotingSysResponseCode. SYSTEM_BUSY .
We know from the proccessSendResponse method that if code is SYSTEM_BUSY, the method throws MQBrokerException, the response code is SYSTEM_BUSY, and the error is described as the error message at the beginning.
Then along the calling link of the method, we can find its direct caller: the sendKernelImpl of DefaultMQProducerImpl, and we focus on what will happen if the underlying method throws a MQBrokerException.
The key code is shown in the following figure:
You can see that in the sendKernelImpl method, the exception is caught first, and the registered hook function is executed first, that is, even if the execution fails, the corresponding hook function after the message is sent will be executed, and then the exception will be thrown up intact.
The sendKernelImpl method is called by the sendDefaultImpl method of DefaultMQProducerImpl. The following is a screenshot of its core implementation:
From this, we can see that a very critical point in the high availability design of RocketMQ message sending is that the retry mechanism is implemented by wrapping the sendKernelImpl method with try catch in the for loop, which ensures that the method can continue to retry after throwing an exception. As you can see from the above, if SYSTEM_BUSY throws MQBrokerException, but only the above error codes are found to retry, because if it is not the above error code, it will continue to throw an exception, and the for loop will be interrupted, that is, it will not be retried.
Here is very surprising that even SYSTEM_ERROR will try again, but does not include SYSTEM_BUSY, obviously against the original intention of rapid failure of the design, so the author concluded that this is a BUG of RocketMQ, the SYSTEM_BUSY will be omitted, and then will mention a PR, add a line of code, SYSTEM_BUSY can be added.
At this point in the analysis of the problem, the problem should be very clear.
3. Solution
If you search for the solution of TIMEOUT_CLEAN_QUEUE on the Internet, the solution we all propose is to increase the value of waitTimeMillsInSendQueue, which defaults to 200ms, for example, setting it to 1000s, etc. I was opposed to it before, because I knew that Broker would try again, but now I find that Broker will not try again, so I now think that raising this value appropriately can effectively alleviate the situation that the BUG has not been resolved.
But this is not a good solution. I will submit a PR to the authorities in the near future to fix this problem. I suggest you try your best to modify the version you use in the company and repackage it, because this has gone against the original intention of the rapid failure of the Broker design.
However, in the business side of message sending, try to implement the message retry mechanism on your own, that is, do not rely on the retry mechanism provided by RocketMQ itself. Because it is restricted by the network and other factors, message sending cannot be 100% successful. It is recommended that you catch an exception when sending the message. If the message fails, you can store the message in the database and retry the message with the scheduled task to ensure that the message will not be lost to the maximum extent.
The above is how to solve the loss of a large number of messages caused by a line of RocketMQ code. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.