In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the brushing strategy of RocketMQ and the example code of realizing synchronous and asynchronous brushing". In daily operation, I believe that many people have doubts about the brushing strategy of RocketMQ and the example code of realizing synchronous and asynchronous brushing. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts of "RocketMQ's flushing strategy and the example code to realize synchronous and asynchronous flushing". Next, please follow the editor to study!
1. The strategy of brushing disk
RocketMQ provides two strategies for synchronous and asynchronous flushing.
Synchronous flushing: after the message arrives at MQ, RocketMQ needs to persist the data. Synchronous flushing means that after the data reaches memory, it must be flushed to the commitlog log before it is considered successful, and then the producer data is sent successfully.
Asynchronous flushing: synchronous flushing means that after the data arrives in memory, it returns to producer to say that the data has been sent successfully. And then write to the commitlog log
The advantages and disadvantages of replication mode adapt to the scenario synchronous flushing ensures that the message throughput is low compared to asynchronous flushing, the higher message reliability requirements are required, the throughput of asynchronous brushing system increases, the system power outage and other anomalies, there will be partial loss corresponding to scenarios with higher throughput requirements.
Let's analyze the logic of its implementation from the point of view of source code.
2. Synchronously brushing disk
The core method of brushing disk in CommitLog.putMessage () method handleDiskFlush ()
Public void handleDiskFlush (AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {/ / Synchronization flush synchronous flushing if (FlushDiskType.SYNC_FLUSH = = this.defaultMessageStore.getMessageStoreConfig () .getFlushDiskType ()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService / / client confirms to wait for successful flushing if (messageExt.isWaitStoreMsgOK ()) {/ / encapsulates the request object nextoffset: the location of the current memory write + the number of bytes to be written this time GroupCommitRequest request = new GroupCommitRequest (result.getWroteOffset () + result.getWroteBytes ()); / / add the flushing request (scheduled tasks in the background perform flushing every 10 milliseconds. If there are multiple requests in 10 milliseconds, then multiple requests for a flush disk) service.putRequest (request); / / wait for the result of the flushing request (wait up to 5 seconds, and the result can be obtained immediately after the flushing is successful. ) boolean flushOK = request.waitForFlush (this.defaultMessageStore.getMessageStoreConfig (). GetSyncFlushTimeout ()); if (! flushOK) {log.error ("do groupcommit, wait for flush failed, topic:" + messageExt.getTopic () + "tags:" + messageExt.getTags () + "client address:" + messageExt.getBornHostString ()); putMessageResult.setPutMessageStatus (PutMessageStatus.FLUSH_DISK_TIMEOUT) }} else {service.wakeup ();}} else {/ / Asynchronous flush asynchronous flash disk if (! this.defaultMessageStore.getMessageStoreConfig (). IsTransientStorePoolEnable ()) {/ / wake up FlushRealTimeService service thread flushCommitLogService.wakeup ();} else {/ / wake up CommitRealTimeService service thread commitLogService.wakeup () }}}
View the core properties in the core class GroupCommitService of synchronous flash disk
Private volatile List requestsWrite = new ArrayList (); private volatile List requestsRead = new ArrayList (); requestsWrite: write queue, which is mainly used to add brushing tasks to this thread requestsRead: read queue, which is mainly used to perform specific brushing tasks. This is a bright spot of GroupCommitService design, which separates reading and writing, and swaps these two queues every time the tasks in requestsRead are processed.
Let's look at its run () method
Public void run () {CommitLog.log.info (this.getServiceName () + "service started"); while (! this.isStopped ()) {try {/ / wait for notification. If the data comes, end in advance and wait for the onWaitEnd () method to exchange the requestsWrite- of the read / write swapRequests () / / refresh request > requestsRead this.waitForRunning (10). / / execute this.doCommit ();} catch (Exception e) {CommitLog.log.warn (this.getServiceName () + "service has exception. ", e);}} / / omit the code.}
The swapRequests () method is executed in the waitForRunning method
Private void swapRequests () {List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp;}
The refresh request received by GroupCommitService is added to the requestsWrite collection through the putRequest () method. The swapRequests () method exchanges the requestsWrite request collection to the requestsRead collection for use. Let's focus on the doCommit () method.
Private void doCommit () {synchronized (this.requestsRead) {if (! this.requestsRead.isEmpty ()) {/ / loop each refresh request for (GroupCommitRequest req: this.requestsRead) {/ / There may be a message in the next file, so a maximum of / / two times the flush boolean flushOK = false For (int I = 0; I
< 2 && !flushOK; i++) { //判断是否已经刷盘过了,刷盘的位置和当前消息下次刷盘需要的位置比较 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset (); if (! flushOK) {/ / 0 code flashes immediately, no matter how many messages there are in the cache CommitLog.this.mappedFileQueue.flush (0) }} / / return the result req.wakeupCustomer (flushOK) of flushing disk;} long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp (); / / set the time point of flushing disk if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint () .setPhysicMsgTimestamp (storeTimestamp) } / / clear requestsRead object this.requestsRead.clear ();} else {/ / Because of individual messages is set to not sync flush, it / / will come to this process CommitLog.this.mappedFileQueue.flush (0);}
MappedFileQueue.flush (0) swipe the disk immediately.
Public boolean flush (final int flushLeastPages) {boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset (this.flushedWhere, this.flushedWhere = = 0); if (mappedFile! = null) {long tmpTimeStamp = mappedFile.getStoreTimestamp (); / / Brush disk, return to write to disk pointer int offset = mappedFile.flush (flushLeastPages) / / calculate the current refresh pointer. All previous data has been persisted to disk long where = mappedFile.getFileFromOffset () + offset; result = where = = this.flushedWhere; this.flushedWhere = where; if (0 = = flushLeastPages) {this.storeTimestamp = tmpTimeSt}} return result;}
MappedFile.flush (0); ensure that the mappedFile.flush () method will also be called when the disk is flushed immediately and asynchronously.
3. Asynchronous flash disk if (! this.defaultMessageStore.getMessageStoreConfig (). IsTransientStorePoolEnable ()) {/ / wake up FlushRealTimeService service thread flushCommitLogService.wakeup ();} else {/ / wake up CommitRealTimeService service thread commitLogService.wakeup ();}
We found that there are two ways to flush the disk asynchronously, one is to start the CommitRealTimeService service thread when the out-of-heap memory pool is turned on, and the other is the default execution of the FlushRealTimeService service thread to perform the flushing operation. TransientStorePoolEnable is introduced in * * "creating a mapping file MappedFile" in the chapter "RocketMQ memory Mapping".
Figure 3-1
1 、 FlushRealTimeService
View its run () method
Public void run () {CommitLog.log.info (this.getServiceName () + "service started"); while (! this.isStopped ()) {/ / interval between each flush. Default 200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig () .getCommitIntervalCommitLog (); / / the minimum number of pages per commit defaults to 4 pages int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig () .getCommitCommitLogLeastPages () / / if the last refresh time + this value is less than the current time, then change flushPhysicQueueLeastPages = 0 to 200 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig () .getCommitCommitLogThoroughInterval (); long begin = System.currentTimeMillis () / / if the last flush time exceeds 200ms, immediately flush the disk. The minimum number of pages in commit is set to 0 if (begin > = (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin; commitDataLeastPages = 0;} try {/ / flush disk boolean result = CommitLog.this.mappedFileQueue.commit (commitDataLeastPages); long end = System.currentTimeMillis () If (! result) {this.lastCommitTimestamp = end; / / result = false means some data committed. / / now wake up flush thread. FlushCommitLogService.wakeup ();} if (end-begin > 500) {log.info ("Commit data to file costs {} ms", end-begin);} this.waitForRunning (interval);} catch (Throwable e) {CommitLog.log.error (this.getServiceName () + "service has exception. ", e);}} boolean result = false; for (int I = 0; I
< RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }} 这种方式和同步刷盘一样就是mappedFileQueue.commit(commitDataLeastPages)参数有限制,数据达到一定量的时候才进行刷盘操作提高数据的刷盘性能。 2、CommitRealTimeService 查看其run()方法 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每次刷盘的间隔时间,默认 200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 每次commit最少的页数 默认4页 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); //距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin; commitDataLeastPages = 0;} try {/ / boolean result = CommitLog.this.mappedFileQueue.commit (commitDataLeastPages); long end = System.currentTimeMillis () / / what is returned is that false indicates that the data has been commit to fileChannel if (! result) {this.lastCommitTimestamp = end; / / result = false means some data committed. / / now wake up flush thread. FlushCommitLogService.wakeup ();} if (end-begin > 500) {log.info ("Commit data to file costs {} ms", end-begin);} this.waitForRunning (interval);} catch (Throwable e) {CommitLog.log.error (this.getServiceName () + "service has exception. ", e);}} boolean result = false; for (int I = 0; I
< RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }} 我们发现其刷盘方法不一样mappedFileQueue.commit()调用MappedFile.commit()方法 public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } //如果提交的数据不满commitLeastPages则不执行本次的提交,待下一次提交 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get();} 查看其核心刷盘方法 protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() >0) {try {/ / create writeBuffer's shared cache ByteBuffer byteBuffer = writeBuffer.slice (); / / return the pointer to the last committed location byteBuffer.position (lastCommittedPosition); / / set limit to writePos byteBuffer.limit (writePos); this.fileChannel.position (lastCommittedPosition) / / copy (write) data from committedPosition pointer to wrotePosition to this.fileChannel.write (byteBuffer) in fileChannel; / / update committedPosition pointer to writePos this.committedPosition.set (writePos);} catch (Throwable e) {log.error ("Error occurred when commit data to FileChannel.", e);}
Commit0 () just adds the cached data to the fileChannel. In the CommitRealTimeService.run () method, we see that the wake-up flushCommitLogService thread needs to flush the data in fileChannel to disk. We find that both methods need to use the flushCommitLogService.run () method to execute MappedFile.flush (int).
Public int flush (final int flushLeastPages) {if (this.isAbleToFlush (flushLeastPages)) {if (this.hold ()) {int value = getReadPosition (); try {/ / We only append data to fileChannel or mappedByteBuffer, never both. If (writeBuffer! = null | | this.fileChannel.position ()! = 0) {this.fileChannel.force (false);} else {this.mappedByteBuffer.force ();}} catch (Throwable e) {log.error ("Error occurred when force data to disk.", e) } / / set the pointer this.flushedPosition.set (value); this.release ();} else {log.warn ("in flush, hold failed, flush offset =" + this.flushedPosition.get ()); this.flushedPosition.set (getReadPosition ());}} return this.getFlushedPosition ();}
The flushing logic of the two cache methods is also different. You can see the processing flow chart of * * "figure 3-1" * *.
We also found a method isAbleToFlush () to determine whether the disk needs to be cleaned or not.
Private boolean isAbleToFlush (final int flushLeastPages) {int flush = this.flushedPosition.get (); int write = getReadPosition (); if (this.isFull ()) {return true;} if (flushLeastPages > 0) {return ((write / OS_PAGE_SIZE)-(flush / OS_PAGE_SIZE)) > = flushLeastPages;} return write > flush;}
FlushLeastPages=0 immediately flushes the disk when synchronizing.
When flushLeastPages=4 is used for asynchronous flushing, the default is 4. It will only be flushed when the number of pages of PageCache is 4 times that of the data needed, or if the time from the last flush > = 200ms, set flushLeastPages=0 to flush the disk immediately.
When synchronizing the flushing disk, regardless of the size of the message, the thread blocks and waits for the result of the flushing disk.
There are two ways of asynchronous flushing, but its logic is 4 times the OS_PAGE_SIZE of the data that needs to be flushed, that is, (1024 * 4) * 4x16k or when the time from the last flushing is > = 200ms, to improve the performance of the data.
At this point, the study on the "RocketMQ flushing strategy and the example code to achieve synchronous and asynchronous flushing" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.