In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains the reasons and solutions for the emergence of system busy and broker busy in RocketMQ messaging. The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn the reasons and solutions for system busy and broker busy in RocketMQ messaging.
1. Phenomenon
Recently, many RocketMQ users have received feedback that one of the following four error messages occasionally occurs during message delivery in the production environment:
[REJECTREQUEST] system busy, start flow control for a while
Too many requests and system thread pool busy, RejectedExecutionException
[PC_SYNCHRONIZED] broker busy, start flow control for a while
[PCBUSY_CLEAN_QUEUE] broker busy, start flow control for a while, period in queue:% sms, size of queue:% d
2. Principle interpretation
In the selection of message middleware, if the middleware can satisfy the business in terms of function and performance, it is suggested that the implementation language of middleware should be taken into account. after all, choosing a middleware implemented in a language you are good at will be more controllable. In the event of an exception, we can extract the error message keyword system busy based on our own experience, search directly in the RocketMQ source code, and get the code that throws the above error message as follows:
The code entry is: the processRequestCommand method of org.apache.rocketmq.remoting.netty.NettyRemotingAbstract. As you can see from the figure, the key reasons for throwing the above error are the rejectRequest method of pair.getObject1 () and the RejectedExecutionException exception thrown.
Note: this article is partial to the actual combat, the source code is only used as the key evidence of the analysis, so this article will only point out the key source code, and will not track its entire implementation process in detail. If you want to know more about its implementation, you can consult the "inside RocketMQ Technology" edited by the author.
2.1 Overview of RocketMQ network processing mechanism
The network design of RocketMQ is very worthy of our study and reference. First, different requests are defined by different request commands CODE on the client side, and the server will classify the client requests, each command or each type of request command defines a NettyRequestProcessor, and then each NettyRequestProcessor is bound to a separate thread pool for command processing. Different types of requests will be processed with different thread pools to achieve thread isolation.
For the convenience of the following description, let's take a brief look at NettyRequestProcessor, Pair, and RequestCode. Its core key points are as follows:
The NettyRequestProcessor RocketMQ server request processor, such as SendMessageProcessor is the message sending processor and PullMessageProcessor is the message pull command processor.
RequestCode request CODE, which is used to distinguish the type of request. For example, SEND_MESSAGE: indicates that the request is sent as a message, and PULL_MESSAGE: message pull request.
Pair is used to encapsulate the binding relationship between NettyRequestProcessor and ExecuteService. In RocketMQ's network processing model, each NettyRequestProcessor is bound to a specific thread pool, and all the processing logic of that NettyRequestProcessor runs in that thread pool.
2.2 pair.getObject1 () .rejectRequest ()
Since the questions raised by readers all occur in the process of sending messages, this paper focuses on the SendMessageProcessor#rejectRequest method. SendMessageProcessor#rejectRequest
Public boolean rejectRequest () {return this.brokerController.getMessageStore () .isOSPageCacheBusy () | | / / [@ 1] (https://my.oschina.net/u/1198) this.brokerController.getMessageStore () .isTransientStorePoolDeficient (); / / @ 2}
There are two conditions for rejecting a request, and true is returned as long as either of them is satisfied.
The code @ 1:Os PageCache busy determines whether the operating system PageCache is busy, and returns true if it is busy. Everyone here must be as curious as I am, how can RocketMQ tell if pageCache is busy? The following will focus on the analysis.
Whether the code @ 2:transientStorePool is insufficient.
2.2.1 isOSPageCacheBusy ()
DefaultMessageStore#isOSPageCacheBusy ()
Public boolean isOSPageCacheBusy () {long begin = this.getCommitLog () .getBeginTimeInLock (); / / [@ 1] (https://my.oschina.net/u/1198) start long diff = this.systemClock.now ()-begin; / / @ 1 end return diff
< 10000000 && diff >This.messageStoreConfig.getOsPageCacheBusyTimeOutMills (); / / @ 2}
Code @ 1: first, explain the meaning of the two local variables, begin and diff:
The popular point of begin is the time when the message is written to the lock held by the Commitlog file, that is, the timestamp when the message body is appended to the memory-mapped file (DirectByteBuffer) or pageCache (FileChannel#map). For specific code, please refer to: CommitLog#putMessage.
Diff the total time it takes to hold a lock during a message append, that is, the time it takes to append a message to a memory-mapped file or pageCache.
Code @ 2: if a message append process takes longer than the Broker profile osPageCacheBusyTimeOutMills, pageCache is considered busy, and the default value of osPageCacheBusyTimeOutMills is 1000, which means 1s.
2.2.2 isTransientStorePoolDeficient ()
DefaultMessageStore#isTransientStorePoolDeficient
Public boolean isTransientStorePoolDeficient () {return remainTransientStoreBufferNumbs () = = 0;} public int remainTransientStoreBufferNumbs () {return this.transientStorePool.remainBufferNumbs ();}
Finally, the TransientStorePool#remainBufferNumbs method is called.
Public int remainBufferNumbs () {if (storeConfig.isTransientStorePoolEnable ()) {return availableBuffers.size ();} return Integer.MAX_VALUE;}
If the transientStorePoolEnable mechanism is enabled, the number of ByteBuffer currently available is returned, that is, whether the entire isTransientStorePoolDeficient method is intended to have an available ByteBuffer, and if it does not, the pageCache is busy. So what is the transientStorePoolEnable mechanism?
2.3 talking about transientStorePoolEnable Mechanism
Java NIO's memory mapping mechanism provides a mechanism for mapping files in the file system to memory, realizing operations on files to convert memory addresses, which greatly improves IO characteristics, but this part of memory is not resident memory and can be replaced with swap memory (virtual memory). In order to improve the performance of message sending, RocketMQ introduces a memory locking mechanism, that is, commitlog files that need to be operated recently are mapped to memory. And provide memory locking function to ensure that these files are always in memory, and the control parameter of this mechanism is transientStorePoolEnable.
2.3.1 MappedFile
Focus on the initialization of the ByteBuffer writeBuffer and MappedByteBuffer mappedByteBuffer attributes of MappedFile, because these two methods are the direct data structures for writing and querying message operations.
Two key points are as follows:
ByteBuffer writeBuffer if transientStorePoolEnable is enabled, use ByteBuffer.allocateDirect (fileSize) to create (java.nio 's memory mapping mechanism). Empty if it is not turned on.
MappedByteBuffer mappedByteBuffer is created using the FileChannel#map method, which is the true PageCache.
When message is written: MappedFile#appendMessagesInner
It can be seen that when the message is written, if the writerBuffer is not empty, which means that the transientStorePoolEnable mechanism is turned on, the message is first written to writerBuffer, and if it is empty, it is written to mappedByteBuffer.
Message pull (read message): MappedFile#selectMappedBuffer
When the message is read, it is read from the mappedByteBuffer (pageCache).
Did you find an interesting point? if you turn on the transientStorePoolEnable mechanism, does it have the effect of separation of read and write? first write to writerBuffer, but read from mappedByteBuffer.
In order to clarify the intention of introducing transientStorePoolEnable, here I introduce the views of Hu Zongtang, a contributor to the Rocketmq community, on this issue.
There are usually two ways to read and write:
First, the Mmap+PageCache way, read and write messages go to pageCache, so both read and write in pagecache will inevitably have lock problems, in the case of concurrent read and write operations, there will be page fault interrupt reduction, memory locking, polluting page write back.
Second, the two-tier architecture of DirectByteBuffer (out-of-heap memory) + PageCache, which can achieve the separation of read and write messages. When writing messages, they are written in DirectByteBuffer-- out-of-heap memory, and read messages are written in PageCache (for DirectByteBuffer, there are two steps to flush the disk, one step is to brush to PageCache, and the other step is to brush to disk files). The advantage is that many easy blockages in memory operations are avoided and the delay is reduced. For example, the interrupt of missing pages is reduced, the memory is locked, and the write-back of contaminated pages is written back.
Warm Tip: if you want to further communicate with Hu Zongtang, you can follow his github account: https://github.com/zongtanghu
I wonder if you will have another worry, if you turn on transientStorePoolEnable, memory locking mechanism, will it eventually lead to memory overflow as the number of commitlog files continues to increase?
2.3.2 TransientStorePool initialization
As can be seen here, TransientStorePool initializes 5 DirectByteBuffer (external memory) by default and provides memory locking function, that is, this part of memory will not be replaced and can be controlled by the transientStorePoolSize parameter.
When the message is written to the message, you first get a DirectByteBuffer from the pool to append the message. What should I do when all five DirectByteBuffer are full of messages? From the point of view of RocketMQ's design, only one commitlog file is written sequentially at the same time, and after writing one, continue to create a new commitlog file. Therefore, the design idea of TransientStorePool is to recycle the five DirectByteBuffer, and only the content written to DirectByteBuffer can be reused after it is submitted to PageCache. The corresponding code is as follows: TransientStorePool#returnBuffer
Public void returnBuffer (ByteBuffer byteBuffer) {byteBuffer.position (0); byteBuffer.limit (fileSize); this.availableBuffers.offerFirst (byteBuffer);}
The call stack is as follows:
From the above analysis, it does not cause a memory overflow as messages are constantly written.
3. Answer the phenomenon 3.1 [REJECTREQUEST] system busy
It throws the source code entry point: NettyRemotingAbstract#processRequestCommand, the above principle analysis part has introduced its implementation principle in detail, summarized as follows.
If the transientStorePoolEnable mechanism is not enabled, the above error is thrown if the Broker PageCache is busy. The reason for judging that the PageCache is busy is that if the lock is held for more than 1 second when the message is appended to the PageCache, the error will be thrown; when the transientStorePoolEnable mechanism is enabled, the error will be thrown if no out-of-heap memory is available in the TransientStorePool.
3.2 too many requests and system thread pool busy, RejectedExecutionException
Its source code entry point: NettyRemotingAbstract#processRequestCommand, which is called immediately after 3.1and is thrown when the thread pool refuses to execute when executing a task to the thread pool. We can take a look at the thread information sent by Broker message processing by the way: BrokerController#registerProcessor
The queue length of this thread pool is 10000 by default, and we can change the default value through sendThreadPoolQueueCapacity.
3.3 [PC_SYNCHRONIZED] broker busy
The source code entry point it throws: DefaultMessageStore#putMessage, which determines once again whether the PageCache is busy when the message is appended, and throws the above error if busy.
3.4 broker busy, period in queue:% sms, size of queue:% d
The entry point of its throwing source code: BrokerFastFailure#cleanExpiredRequest. This method is called every 10 seconds, but one of the prerequisites is that fast failure is enabled on the Broker side. Default is enabled, which can be set by parameter brokerFastFailureEnable. The key point of this method is to detect every 10 seconds. If it detects that the PageCache is busy and there are queued tasks in the sending queue, it no longer waits and directly throws out the busy error of the system, which makes the queued thread fail quickly and end the waiting.
4. Practical suggestions
After the above principle explanation and phenomenon analysis, the reason why system busy and broker busy are thrown when sending messages is that PageCache is busy. Can we avoid throwing errors by adjusting some of the parameters mentioned above? . For example, the following parameters:
OsPageCacheBusyTimeOutMills sets the timeout of the PageCache system, which defaults to 1000, which means 1s. Is it possible to increase this value, such as 2000 or 3000? Author's point of view: very undesirable.
The queue handled by the sendThreadPoolQueueCapacity Broker server defaults to 10000. If there is a backlog of 10000 requests in the queue, a RejectExecutionException is thrown. Author's point of view: not advisable.
Whether brokerFastFailureEnable enables rapid failure by default is true, which means that if the PageCache of the Broker server is found to be busy, and if the sendThreadPoolQueue queue is not empty, it means that there are still queued delivery requests waiting for execution, then the waiting will be terminated and broker busy will be returned. If you do not turn on quick failure, you can also avoid throwing this error. Author's point of view: very undesirable.
It is not advisable to modify the above parameters because of the error of system busy and broker busy. The essence of this error is that the PageCache of the system is busy. Generally speaking, when appending messages to PageCache, it takes more than 1 second for a single message to be sent. If you continue to send messages to the Broker server and wait, its TPS simply cannot be satisfied, which is in the middle of high-performance messages. Therefore, the quick failure mechanism is adopted to return an error directly to the message sender. By default, the message sender will retry 2 times and send the message to other Broker to ensure its high availability.
Based on my personal opinion, the following solutions are proposed:
4.1 enable transientStorePoolEnable
In broker.config, you will transientStorePoolEnable=true.
The solution is based on enabling "read-write" separation. When sending a message, the message is first appended to DirectByteBuffer (out-of-heap memory), and then under the asynchronous flushing mechanism, the contents of DirectByteBuffer are submitted to PageCache, and then written to disk. When the message is pulled, it is pulled directly from the PageCache, which realizes the separation of reading and writing, reduces the pressure on the PageCaceh, and can fundamentally solve the problem.
Disadvantages of the solution: it will increase the possibility of data loss. If the Broker JVM process exits abnormally, the messages submitted to the PageCache will not be lost, but the messages that exist in the out-of-heap memory (DirectByteBuffer) but have not been submitted to the PageCache will be lost. In general, however, it is unlikely that the RocketMQ process will exit.
4.2 expand Broker server capacity
The basis of the scheme:
When the Broker server itself is busy, it fails quickly, and the Broker will be circumvented in the following period of time, so that the Broker recovery provides a time guarantee, and the architecture of Broker itself supports distributed horizontal expansion, increasing the number of queues of Topic and reducing the load of a single Broker server, so as to avoid PageCache. > Tip: during the expansion of Broker, you can copy ${ROCKETMQ_HOME} / store/config/topics.json under any Broker service in the cluster to the specified directory of the new Broker server to avoid creating queues for Broker on the new Broker server, and then message senders and message consumers can dynamically obtain the routing information of Topic.
Corresponding to the capacity expansion, you can also upgrade the original Broker, such as increasing memory and replacing the mechanical disk with SSD, but in this case, you usually need to restart the Broekr server, which is not convenient for capacity expansion.
Thank you for your reading, the above is the content of "the reasons and solutions for system busy and broker busy in RocketMQ messaging". After the study of this article, I believe you have a deeper understanding of the causes and solutions of system busy and broker busy in RocketMQ messaging, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.