In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the example analysis of synchronous disk brushing of RocketMQ design, which is very detailed and has certain reference value. Friends who are interested must finish it!
In synchronous flush mode, when the message is written to memory, it waits for the data to be written to the CommitLog file on disk.
The handleDiskFlush method of CommitLog:
Public void handleDiskFlush (AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {/ / Synchronization flush if (FlushDiskType.SYNC_FLUSH = = this.defaultMessageStore.getMessageStoreConfig () .getFlushDiskType ()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK ()) {GroupCommitRequest request = new GroupCommitRequest (result.getWroteOffset () + result.getWroteBytes ()); service.putRequest (request) Boolean flushOK = request.waitForFlush (this.defaultMessageStore.getMessageStoreConfig (). GetSyncFlushTimeout ()); if (! flushOK) {log.error ("do groupcommit, wait for flush failed, topic:" + messageExt.getTopic () + "tags:" + messageExt.getTags () + "client address:" + messageExt.getBornHostString ()); putMessageResult.setPutMessageStatus (PutMessageStatus.FLUSH_DISK_TIMEOUT) }} else {service.wakeup ();}} / / Asynchronous flush else {if (! this.defaultMessageStore.getMessageStoreConfig (). IsTransientStorePoolEnable ()) {flushCommitLogService.wakeup ();} else {commitLogService.wakeup ();}} class GroupCommitService extends FlushCommitLogService {private volatile List requestsWrite = new ArrayList () Private volatile List requestsRead = new ArrayList (); / / submit the task to the task list public synchronized void putRequest (final GroupCommitRequest request) {synchronized (this.requestsWrite) {this.requestsWrite.add (request);} if (hasNotified.compareAndSet (false, true)) {waitPoint.countDown () / / notify}} private void swapRequests () {List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp } private void doCommit () {synchronized (this.requestsRead) {if (! this.requestsRead.isEmpty ()) {for (GroupCommitRequest req: this.requestsRead) {/ / There may be a message in the next file So a maximum of / / two times the flush boolean flushOK = false For (int I = 0; I
< 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset (); if (! flushOK) {CommitLog.this.mappedFileQueue.flush (0);}} req.wakeupCustomer (flushOK) } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp (); if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint () .setPhysicMsgTimestamp (storeTimestamp);} this.requestsRead.clear () } else {/ / Because of individual messages is set to not sync flush, it / / will come to this process CommitLog.this.mappedFileQueue.flush (0);} public void run () {CommitLog.log.info (this.getServiceName () + "service started") While (! this.isStopped ()) {try {this.waitForRunning (10); this.doCommit ();} catch (Exception e) {CommitLog.log.warn (this.getServiceName () + "service has exception. ", e);} / / Under normal circumstances shutdown, wait for the arrival of the / / request, and then flush try {Thread.sleep (10);} catch (InterruptedException e) {CommitLog.log.warn (" GroupCommitService Exception, ", e) } synchronized (this) {this.swapRequests ();} this.doCommit (); CommitLog.log.info (this.getServiceName () + "service end");} @ Override protected void onWaitEnd () {this.swapRequests () } @ Override public String getServiceName () {return GroupCommitService.class.getSimpleName ();} @ Override public long getJointime () {return 1000 * 60 * 5;}}
GroupCommitRequest is a brushing task. After submitting the brushing task, it will wait in the brushing queue and wait for the brushing thread.
GroupCommitService writes data to disk every 10 milliseconds. The reason why we do not write directly is that the disk io pressure is high and the write performance is low. Writing every 10 milliseconds can improve the disk io efficiency and write performance.
PutRequest (request) submit the brushing task to the task list
Request.waitForFlush synchronously waits for GroupCommitService to complete the tasks in the task list.
The read and write of the two queues are separated. RequestsWrite is the write queue. The user saves the added brushing task. RequestsRead is the read queue. The data of the write queue is put into the read queue before brushing.
The doCommit method of CommitLog:
Private void doCommit () {synchronized (this.requestsRead) {if (! this.requestsRead.isEmpty ()) {for (GroupCommitRequest req: this.requestsRead) {/ / There may be a message in the next file, so a maximum of / / two times the flush boolean flushOK = false For (int I = 0; I
< 2 && !flushOK; i++) { //根据offset确定是否已经刷盘 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset (); if (! flushOK) {CommitLog.this.mappedFileQueue.flush (0);}} req.wakeupCustomer (flushOK) } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp (); if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint () .setPhysicMsgTimestamp (storeTimestamp);} / / clear the list of brushed disks this.requestsRead.clear () } else {/ / Because of individual messages is set to not sync flush, it / / will come to this process CommitLog.this.mappedFileQueue.flush (0);}
When brushing the disk, read the data in requestsRead and write to the disk in turn.
Clear the requestsRead after the write is complete.
The purpose of the read-write separation design is that it does not affect task submission to the list when brushing the disk.
CommitLog.this.mappedFileQueue.flush (0); is the disk refresh operation:
Public boolean flush (final int flushLeastPages) {boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset (this.flushedWhere, this.flushedWhere = = 0); if (mappedFile! = null) {long tmpTimeStamp = mappedFile.getStoreTimestamp (); int offset = mappedFile.flush (flushLeastPages); long where = mappedFile.getFileFromOffset () + offset; result = where = = this.flushedWhere; this.flushedWhere = where If (0 = = flushLeastPages) {this.storeTimestamp = tmpTimeSt}} return result;}
Write to disk through CommitLog files mapped by MappedFile
This is the basic situation of synchronous flushing in RocketMQ's highly available design. The general idea is that a read-write separation queue is used to flush the disk. After the synchronous brushing task is submitted, it will wait in the flushing queue to complete and then return, while GroupCommitService writes a batch of data to disk every 10 milliseconds.
The above is all the contents of the article "sample Analysis of synchronous disk flushing of RocketMQ Design". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.