Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to integrate DLedger in RocketMQ

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

RocketMQ in how to integrate DLedger, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can gain something.

1. Thinking before reading the source code

The message storage files of RocketMQ mainly include commitlog files, consumequeue files and Index files. Commitlog files store a full amount of messages, and consumequeue and index files are all built on commitlog files. To use DLedger to achieve the consistency of message storage, the key should be to achieve the consistency of commitlog files, that is, the object to be integrated by DLedger should be commitlog files, that is, it is only necessary to ensure that the commitlog files of each node in the replication group of raft protocol are consistent.

We know that using files to store messages is based on a certain storage format. An entry in rocketmq's commitlog contains magic number, message length, message attributes, message body, and so on. Let's review the storage format of DLedger logs:

If DLedger wants to integrate commitlog files, is it possible to treat rocketmq messages, that is, individual commitlog entries as a whole, as body fields of DLedger?

What are you waiting for? come with me to see the source code! Don't worry, throw one more question, can DLedger integrate RocketMQ commitlog, can it be upgraded smoothly?

With these thoughts and questions, let's explore how DLedger integrates RocketMQ.

2. Look at DLedger from Broker start-up process

> Tip: this article will not introduce the startup process of Broker in detail, but will only point out the codes related to DLedger during startup. If you want to know more about the startup process of Broker, you are advised to pay attention to the author's book "inside RocketMQ Technology".

The key points related to DLedger related to Broker are as follows:

2.1 build DefaultMessageStore

DefaultMessageStore construction method

If (messageStoreConfig.isEnableDLegerCommitLog ()) {/ / [@ 1] (https://my.oschina.net/u/1198) this.commitLog = new DLedgerCommitLog (this); else {this.commitLog = new CommitLog (this); / / @ 2}

Code @ 1: if DLedger is enabled, the implementation class of commitlog is DLedgerCommitLog, which is also the key point of this article.

Code @ 2: if DLedger is not enabled, the old Commitlog implementation class is used.

2.2 add node state change event listeners

BrokerController#initialize

If (messageStoreConfig.isEnableDLegerCommitLog ()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler (this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog) ((DefaultMessageStore) messageStore). GetCommitLog (). GetdLedgerServer (). GetdLedgerLeaderElector (). AddRoleChangeHandler (roleChangeHandler);}

The addRoleChanneHandler method of LedgerLeaderElector is mainly called to increase the node role change event listener, and DLedgerRoleChangeHandler is another key point to achieve master-slave switching.

2.3 call the load method of DefaultMessageStore

DefaultMessageStore#load

/ / load Commit Logresult = result & & this.commitLog.load (); / / [@ 1] (https://my.oschina.net/u/1198)// loadConsume Queueresult = result & & this.loadConsumeQueue (); if (result) {this.storeCheckpoint = new StoreCheckpoint (StorePathConfigHelper.getStoreCheckpoint (this.messageStoreConfig.getStorePathRootDir (); this.indexService.load (lastExitOK); this.recover (lastExitOK) / / @ 2 log.info ("load over, and the max phy offset = {}", this.getMaxPhyOffset ());}

The code @ 1 and @ 2 are ultimately executed by delegating the commitlog object, and the key here is that if DLedger is turned on, the final call is DLedgerCommitLog.

After laying the groundwork above, the protagonist DLedgerCommitLog "shining debut".

3. Detailed explanation of DLedgerCommitLog

> Tip: since most of the methods of Commitlog have been described in detail in the book "RocketMQ Technology Insider", and the implementation principle of DLedgerCommitLog is similar to that of Commitlog files, this article will go through the implementation details of the storage part.

3.1 Core Class Diagram

DLedgerCommitlog inherits from Commitlog. Let's take a look at its core attributes one by one.

A node in a cluster implemented by DLedgerServer dLedgerServer based on the raft protocol, represented by an DLedgerServer instance.

Configuration information for DLedgerConfig dLedgerConfig DLedger.

DLedgerMmapFileStore dLedgerFileStore DLedger is a storage implementation based on file mapping.

The collection of storage files managed by MmapFileList dLedgerFileList DLedger, compared to the MappedFileQueue in RocketMQ.

Int id node ID,0 represents the master node, and non-zero represents the slave node

MessageSerializer messageSerializer message Sequencer.

Long beginTimeInDledgerLock = 0 is used to record the time spent on message appends (the lock time held by log appends).

Long dividedCommitlogOffset =-1 the maximum offset in the old commitlog file recorded, and if the accessed offset is greater than it, the file managed by dledger is accessed.

Boolean isInrecoveringOldCommitlog = whether the old commitlog file is being restored by false.

Next, we will introduce the core methods of DLedgerCommitlog and their implementation points in detail.

3.2 Construction method public DLedgerCommitLog (final DefaultMessageStore defaultMessageStore) {super (defaultMessageStore); / / @ 1 dLedgerConfig = new DLedgerConfig (); dLedgerConfig.setEnableDiskForceClean (defaultMessageStore.getMessageStoreConfig (). IsCleanFileForciblyEnable ()); dLedgerConfig.setStoreType (DLedgerConfig.FILE); dLedgerConfig.setSelfId (defaultMessageStore.getMessageStoreConfig (). GetdLegerSelfId ()); dLedgerConfig.setGroup (defaultMessageStore.getMessageStoreConfig (). GetdLegerGroup ()); dLedgerConfig.setPeers (defaultMessageStore.getMessageStoreConfig (). GetdLegerPeers ()) DLedgerConfig.setStoreBaseDir (defaultMessageStore.getMessageStoreConfig (). GetStorePathRootDir ()); dLedgerConfig.setMappedFileSizeForEntryData (defaultMessageStore.getMessageStoreConfig (). GetMapedFileSizeCommitLog ()); dLedgerConfig.setDeleteWhen (defaultMessageStore.getMessageStoreConfig (). GetDeleteWhen ()); dLedgerConfig.setFileReservedHours (defaultMessageStore.getMessageStoreConfig (). GetFileReservedTime () + 1); id = Integer.valueOf (dLedgerConfig.getSelfId (). Substring (1)) + 1; / / @ 2 dLedgerServer = new DLedgerServer (dLedgerConfig) / / @ 3 dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore (); DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset)-> {assert bodyOffset = = DLedgerEntry.BODY_OFFSET; buffer.position (buffer.position () + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong (entry.getPos () + bodyOffset);}; dLedgerFileStore.addAppendHook (appendHook) / / @ 4 dLedgerFileList = dLedgerFileStore.getDataFileList (); this.messageSerializer = new MessageSerializer (defaultMessageStore.getMessageStoreConfig () .getMaxMessageSize ()); / / @ 5}

Code @ 1: call the constructor of the parent class, CommitLog, and load the commitlog file under ${ROCKETMQ_HOME} / store/ comitlog to be compatible with the upgrade DLedger message. Let's take a look at the constructor of CommitLog:

Code @ 2: build the configuration properties related to DLedgerConfig. The main attributes are as follows:

Whether enableDiskForceClean forcefully deletes files is taken from the broker configuration attribute cleanFileForciblyEnable, which defaults to true.

StoreType DLedger storage type, fixed to file-based storage mode.

The id name of the dLegerSelfId leader node, example configuration: N0, whose configuration requires that the second character must be followed by a number.

The name of dLegerGroup DLeger group, which is recommended to be consistent with the broker configuration property brokerName.

All the node information in dLegerPeers DLeger Group, and its configuration example N0-127.0.0.1 40911 position N1-127.0.0.1 40912 position N2-127.0.0.1 40913. Multiple nodes are separated by semicolons.

StoreBaseDir sets the root directory of the log file of DLedger, which is taken from the storePathRootDir in the borker fitting file, that is, the data storage root path of RocketMQ.

MappedFileSizeForEntryData sets the size of a single log file for DLedger, which is taken from the-mapedFileSizeCommitLog in the broker configuration file, which is the same as the single file size of the commitlog file.

The deletion time of the deleteWhen DLedger log file is taken from the deleteWhen in the broker configuration file, which defaults to 4 a.m.

The fileReservedHours DLedger log file is retained for how long, taken from the fileReservedHours in the broker configuration file, and defaults to 72 hours.

Code @ 3: create a DLedgerServer based on the DLedger configuration information, that is, create a DLedger cluster node. After each node in the cluster starts, the selection will be triggered.

Code @ 4: build the appendHook append hook function, which is a critical step in compatibility with Commitlog files, which will be described in more detail later.

Code @ 5: build message serialization.

According to the above flow chart, after the DefaultMessageStore implementation is built, the load method is called. After the DLedger mechanism is enabled, the load and recover methods of DLedgerCommitlog are called in turn.

3.3 loadpublic boolean load () {boolean result = super.load (); if (! result) {return false;} return true;}

The laod method implementation of DLedgerCommitLog is relatively simple, which is to call the load method of its parent class Commitlog, that is, to be compatible with previous messages when DLedger is enabled.

3.4 recover

Commitlog, consumequeue and other files will be loaded when Broker starts, and the data structure that needs to be restored is related to the data structure, especially the pointers such as writing, brushing, submitting, etc., which specifically calls the recover method. DLedgerCommitLog#recover

Public void recoverNormally (long maxPhyOffsetOfConsumeQueue) {/ / @ 1 recover (maxPhyOffsetOfConsumeQueue);}

The consumequeue is first restored, the maximum valid physical offset recorded in the consumequeue is obtained, and then restored based on that physical offset. Next, let's take a look at the processing flow and key points of this method.

DLedgerCommitLog#recover

DLedgerFileStore.load ()

Step1: load the storage files related to DLedger, and build the corresponding MmapFile one by one. The three important pointers wrotePosition, flushedPosition and committedPosition are initialized to the size of the file.

DLedgerCommitLog#recover

If (dLedgerFileList.getMappedFiles (). Size () > 0) {dLedgerFileStore.recover (); / / @ 1 dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile () .getFileFromOffset (); / / @ 2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile () If (mappedFile! = null) {/ / @ 3 disableDeleteDledger ();} long maxPhyOffset = dLedgerFileList.getMaxWrotePosition () / / Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue > = maxPhyOffset) {/ / @ 4 log.warn ("[TruncateCQ] maxPhyOffsetOfConsumeQueue ({}) > = processOffset ({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles (maxPhyOffset);} return;}

Step2: if a DLedger data file already exists, you only need to restore the DLedger-related data artifacts, because its important data pointer was set to the maximum value when the old commitlog file was loaded. The key implementation points are as follows:

First, the recover method of the DLedger file storage implementation class DLedgerFileStore is called to restore the relevant pointers of the MMapFile object under the jurisdiction (a file corresponds to a MMapFile instance). The implementation method is similar to the recovery process of RocketMQ's DefaultMessageStore.

Set the value of dividedCommitlogOffset to the minimum offset of all physical files in DLedger. If the physical offset of the operation message is less than this value, it is looked up from the commitlog file; if the physical offset is greater than or equal to this value, the message is found from the DLedger-related file.

If an old commitlog file exists, the deletion of the DLedger file is prohibited by forbidding the forced deletion of the file and setting the valid storage time of the file to 10 years.

If the maximum physical offset stored in consumequeue is greater than the maximum physical offset in DLedger, the redundant consumequeue file is deleted.

> warm reminder: why can't the log files related to commitlog be deleted when there are DLedger files?

Because in this case, if the physical file in the DLedger is deleted, the physical offset will be faulted.

Normally, maxCommitlogPhyOffset and dividedCommitlogOffset are contiguous, so it is very convenient to access commitlog or DLedger, but if some files in DLedger are deleted, the two values become discontiguous, resulting in a hole in the middle of the file, which cannot be accessed continuously.

DLedgerCommitLog#recover

IsInrecoveringOldCommitlog = true;super.recoverNormally (maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false

Step3: if DLedger is enabled and this is the first time to start (no DLedger-related log files have been generated), you need to restore the old commitlog files.

DLedgerCommitLog#recover

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile (); if (mappedFile = = null) {/ / @ 1 return;} ByteBuffer byteBuffer = mappedFile.sliceByteBuffer (); byteBuffer.position (mappedFile.getWrotePosition ()); boolean needWriteMagicCode = true;// 1 TOTAL SIZEbyteBuffer.getInt (); / / sizeint magicCode = byteBuffer.getInt (); if (magicCode = = CommitLog.BLANK_MAGIC_CODE) {/ / @ 2 needWriteMagicCode = false;} else {log.info ("Recover old commitlog found an illegal magic code= {}", magicCode) } dLedgerConfig.setEnableDiskForceClean (false); dividedCommitlogOffset= mappedFile.getFileFromOffset () + mappedFile.getFileSize (); / / @ 3log.info ("Recover old commitlog needWriteMagicCode= {} pos= {} file= {} dividedCommitlogOffset= {}", needWriteMagicCode, mappedFile.getFileFromOffset () + mappedFile.getWrotePosition (), mappedFile.getFileName (), dividedCommitlogOffset); if (needWriteMagicCode) {/ / @ 4 byteBuffer.position (mappedFile.getWrotePosition ()); byteBuffer.putInt (mappedFile.getFileSize ()-mappedFile.getWrotePosition ()); byteBuffer.putInt (BLANK_MAGIC_CODE) MappedFile.flush (0);} mappedFile.setWrotePosition (mappedFile.getFileSize ()); / / @ 5mappedFile.setCommittedPosition (mappedFile.getFileSize ()); mappedFile.setFlushedPosition (mappedFile.getFileSize ()); dLedgerFileList.getLastMappedFile (dividedCommitlogOffset); log.info ("Will set the initial commitlog offset= {} for dledger", dividedCommitlogOffset);}

Step4: if there is an old commitlog file, the rest of the last file needs to be filled, that is, no new data writes are accepted, and all the new data is written to the DLedger data file. The key implementation points are as follows:

Try to find the last commitlog file, or end if it is not found.

From the last write point of the last file (the original commitlog file to be written point) to try to find the number of magic written, if there is a magic number and equal to CommitLog.BLANK_MAGIC_CODE, then do not need to write magic number, in the upgrade DLedger the first time to start, the magic number is empty, so you need to write magic number.

Initialize dividedCommitlogOffset, which is equal to the start offset of the last file plus the size of the file, that is, the pointer points to the end of the last file.

Write all the data that is not fully written in the last commitlog by setting the size and magic number of the message body.

Setting the wrotePosition, flushedPosition, and committedPosition of the last file to the size of the file also means that the last file is full and the next message will be written to DLedger.

When the DLedger mechanism is enabled, the startup process of Broker is introduced here. I believe you are already aware of the efforts made by DLedger in integrating RocketMQ. Next, we will discuss how DLedger seamlessly integrates RocketMQ from two aspects of message appending and message reading to achieve smooth upgrade.

4. How to realize seamless compatibility of DLedger integration RocketMQ from message appending

> warm reminder: this section also does not describe the entire message appending (storage process) in detail, but only points out the core key points related to DLedger (multi-copy, master-slave switching). If you want to know more about the process of message addition, you can read the author's book "inside RocketMQ Technology".

DLedgerCommitLog#putMessage

AppendEntryRequest request = new AppendEntryRequest (); request.setGroup (dLedgerConfig.getGroup ()); request.setRemoteId (dLedgerServer.getMemberState (). GetSelfId ()); request.setBody (encodeResult.data); dledgerFuture = (AppendFuture) dLedgerServer.handleAppend (request); if (dledgerFuture.getPos () = =-1) {return new PutMessageResult (PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult (AppendMessageStatus.UNKNOWN_ERROR));}

Key point 1: when the message is appended, it is no longer written to the original commitlog file, but the handleAppend of DLedgerServer is called to append the message. In this method, the Leader nodes in the cluster are responsible for message appending and message replication. Only when more than half of the nodes in the cluster successfully write the message, the write success will be returned. If the append is successful, the starting offset after the successful append is returned, that is, the pos attribute, which is similar to the offset of commitlog in rocketmq, that is, the physical offset.

DLedgerCommitLog#putMessage

Long wroteOffset = dledgerFuture.getPos () + DLedgerEntry.BODY_OFFSET;ByteBuffer buffer = ByteBuffer.allocate (MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId (buffer, msg.getStoreHostBytes (), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock (). Now ()-beginTimeInDledgerLock;appendResult = new AppendMessageResult (AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis (), queueOffset, eclipseTimeInLock)

Key point 2: calculate the physical offset of the real message according to the starting offset of DLedger. We know from the beginning that DLedger itself has its own storage protocol, and its body field stores the real message, that is, the storage structure of commitlog entries. The message offset returned to the client is the start offset of the body field, that is, the physical offset returned through putMessage has the same meaning as the physical offset returned without Dledger. That is, starting from the open offset, the message can be read correctly, so that DLedger is perfectly compatible with RocketMQ Commitlog. The illustration of pos and wroteOffset is as follows:

5. See how DLedger integrates RocketMQ to achieve seamless compatibility from message reading.

DLedgerCommitLog#getMessage

Public SelectMappedBufferResult getMessage (final long offset, final int size) {if (offset < dividedCommitlogOffset) {/ / @ 1 return super.getMessage (offset, size);} int mappedFileSize = this.dLedgerServer.getdLedgerConfig (). GetMappedFileSizeForEntryData (); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset (offset, offset = = 0); / / @ 2 if (mappedFile! = null) {int pos = (int) (offset% mappedFileSize) Return convertSbr (mappedFile.selectMappedBuffer (pos, size)); / / @ 3} return null;}

Message lookup is relatively simple, because the physical offset of the message returned to the client and forwarded to consumequeue is not the offset of the DLedger entry, but the starting offset of the real message. The key points of its implementation are as follows:

If the physical offset of the lookup is less than dividedCommitlogOffset, look for it from the previous commitlog file.

Then the specific physical file is found according to the dichotomy according to the physical offset.

Take the module of the physical offset, get the absolute offset in the physical file, and look up the message, because only the physical offset is known, the length of the message can be read out first, and then a complete message can be read out.

5. Summary

Based on the detailed introduction above, I think it is not difficult for readers to draw the following conclusions:

When DLedger integrates, it uses DLedger entries to wrap commitlog entries in RocketMQ, that is, to store the entire commitlog entry in the body field of DLedger entries.

The dividedCommitlogOffset variable is introduced to indicate that the message whose physical offset is less than this value exists in the old commitlog file, so that the old data can be accessed after upgrading the DLedger cluster.

When the new DLedger cluster starts, the last commitlog is populated, that is, the new data is no longer written to the original commitlog file.

When the message is appended to the DLedger data log file, the returned offset is not the starting offset of the DLedger entry, but the starting offset of the body field in the DLedger entry, that is, the starting offset of the real message, ensuring that the semantics of the physical offset of the message is the same as that of RocketMQ Commitlog.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report