Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the log append process in RocketMQ DLedger?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article shows you what the log addition process is like in RocketMQ DLedger, which is concise and easy to understand, which will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

We have analyzed the source code analysis RocketMQ DLedger multi-copy Leader selection in detail, and will analyze the implementation of log replication in detail.

According to the raft protocol, when the Leader master is selected in the whole cluster, the master node in the cluster can accept the request from the client, while the slave node in the cluster is only responsible for synchronizing data from the master node, but will not deal with read-write requests, which is very different from the read-write separation of Mmurs structure.

With the foundation of the previous article, this article will start directly with the Leader processing client request entry, which is: the handleAppend method of DLedgerServer.

1. Basic process of log replication

Before formally analyzing RocketMQ DLedger multi-replica replication, let's first take a look at the request protocol field in which the client sends the log. The class diagram is as follows:

Let's first introduce the meaning of each field one by one:

String group the group name to which the cluster belongs.

String remoteId requests the destination node ID.

String localId node ID.

The int code request response field indicates that the response code is returned.

String leaderId = Leader Id in the null cluster.

The current election round of the long term cluster.

Byte [] body the data to be sent.

The request processing entry for the log is the handleAppend method of DLedgerServer.

DLedgerServer#handleAppend

PreConditions.check (memberState.getSelfId (). Equals (request.getRemoteId ()), DLedgerResponseCode.UNKNOWN_MEMBER, "% s! =% s", request.getRemoteId (), memberState.getSelfId ()); reConditions.check (memberState.getGroup (). Equals (request.getGroup ()), DLedgerResponseCode.UNKNOWN_GROUP, "% s! =% s", request.getGroup (), memberState.getGroup ()); PreConditions.check (memberState.isLeader (), DLedgerResponseCode.NOT_LEADER)

Step1: first verify the reasonableness of the request:

If the requested node ID is not the current processing node, an exception is thrown.

If the requested cluster is not the one in which the current node resides, an exception is thrown.

If the current node is not the primary node, an exception is thrown.

DLedgerServer#handleAppend

Long currTerm = memberState.currTerm (); if (dLedgerEntryPusher.isPendingFull (currTerm)) {/ / [@ 1] (https://my.oschina.net/u/1198) AppendEntryResponse appendEntryResponse = new AppendEntryResponse (); appendEntryResponse.setGroup (memberState.getGroup ()); appendEntryResponse.setCode (DLedgerResponseCode.LEADER_PENDING_FULL.getCode ()); appendEntryResponse.setTerm (currTerm); appendEntryResponse.setLeaderId (memberState.getSelfId ()); return AppendFuture.newCompletedFuture (- 1, appendEntryResponse) } else {/ / @ 2 DLedgerEntry dLedgerEntry = new DLedgerEntry (); dLedgerEntry.setBody (request.getBody ()); DLedgerEntry resEntry = dLedgerStore.appendAsLeader (dLedgerEntry); return dLedgerEntryPusher.waitAck (resEntry);}

Step2: if the preprocessing queue is full, reject the client request and return a LEADER_PENDING_FULL error code; if not, encapsulate the request as DledgerEntry, call the dLedgerStore method to append the log, and use the waitAck method of dLedgerEntryPusher to synchronously wait for the replication response of the replica node, and finally return the result to the calling method.

Code @ 1: if the push queue for dLedgerEntryPusher is full, an append is returned with the error code LEADER_PENDING_FULL.

Code @ 2: appends the message to the Leader server and broadcasts it to the slave node. If the acknowledgement from the slave node is not received within a specified period of time, the append is considered to have failed.

Next, follow the above three main points:

Determine whether the Push queue is full

Leader node stores messages

The master node waits for the slave node to copy the ACK

1.1 how to determine whether the Push queue is full

DLedgerEntryPusher#isPendingFull

Public boolean isPendingFull (long currTerm) {checkTermForPendingMap (currTerm, "isPendingFull"); / / [@ 1] (https://my.oschina.net/u/1198) return pendingAppendResponsesByTerm.get (currTerm). Size () > dLedgerConfig.getMaxPendingRequestsNum (); / / @ 2}

It is mainly divided into two steps: code @ 1: check whether the current voting round is in PendingMap. If not, initialize it. Its structure is: Map

< Long/* 投票轮次*/, ConcurrentMap>

>.

Code @ 2: to detect whether the number of results currently waiting to be returned from the node exceeds its maximum number of requests, it can be configured through maxPendingRequests Num. The default value is 10000.

The above logic is relatively simple, but the question arises, where does the data in ConcurrentMap > come from? We might as well move on.

1.2 Leader nodes store data

The data storage of Leader nodes is mainly realized by DLedgerStore's appendAsLeader method. DLedger implements memory-based storage and file-based storage respectively. This paper focuses on file-based storage implementation. In fact, the current class is: DLedgerMmapFileStore.

Let's focus on the analysis of the data storage process, whose entry is the appendAsLeader method of DLedgerMmapFileStore.

DLedgerMmapFileStore#appendAsLeader

PreConditions.check (memberState.isLeader (), DLedgerResponseCode.NOT_LEADER); PreConditions.check (! isDiskFull, DLedgerResponseCode.DISK_FULL)

Step1: first of all, we can judge whether additional data can be added. The judgment is mainly based on the following two points:

Whether the current node's state is Leader, and if not, an exception is thrown.

Whether the current disk is full is based on the fact that the utilization of the root directory or data file directory of DLedger exceeds the maximum allowed, and the default value is 85%.

ByteBuffer dataBuffer = localEntryBuffer.get (); ByteBuffer indexBuffer = localIndexBuffer.get ()

Step2: gets a data and index buffer from the local thread variable. The ByteBuffer used to store data has a fixed capacity of 4m, and the ByteBuffer of the index is the length of two index entries, fixed at 64 bytes.

DLedgerEntryCoder.encode (entry, dataBuffer); public static void encode (DLedgerEntry entry, ByteBuffer byteBuffer) {byteBuffer.clear (); int size = entry.computSizeInBytes (); / / always put magic on the first position byteBuffer.putInt (entry.getMagic ()); byteBuffer.putInt (size); byteBuffer.putLong (entry.getIndex ()); byteBuffer.putLong (entry.getTerm ()); byteBuffer.putLong (entry.getPos ()); byteBuffer.putInt (entry.getChannel ()) ByteBuffer.putInt (entry.getChainCrc ()); byteBuffer.putInt (entry.getBodyCrc ()); byteBuffer.putInt (entry.getBody () .length); byteBuffer.put (entry.getBody ()); byteBuffer.flip ();}

Step3: write DLedgerEntry, that is, data to ByteBuffer. From this, we can see that each write will call the clear method of ByteBuffer to empty the data. From this, we can see that only 4m of data can be stored each time data is appended.

DLedgerMmapFileStore#appendAsLeader

Synchronized (memberState) {PreConditions.check (memberState.isLeader (), DLedgerResponseCode.NOT_LEADER, null); / / Omit the code}

Step4: lock the state machine and once again check whether the state of the node is a Leader node.

DLedgerMmapFileStore#appendAsLeader

Long nextIndex = ledgerEndIndex + 1 nextIndex entry.setIndex (nextIndex); entry.setTerm (memberState.currTerm ()); entry.setMagic (CURRENT_MAGIC); DLedgerEntryCoder.setIndexTerm (dataBuffer, nextIndex, memberState.currTerm (), CURRENT_MAGIC)

Step5: sets the sequence number for the current log entry, namely entryIndex and entryTerm (voting round). And write the magic number, entryIndex, entryTerm, and so on to the bytebuffer.

DLedgerMmapFileStore#appendAsLeader

Long prePos = dataFileList.preAppend (dataBuffer.remaining ()); entry.setPos (prePos); PreConditions.check (prePos! =-1, DLedgerResponseCode.DISK_ERROR, null); DLedgerEntryCoder.setPos (dataBuffer, prePos)

Step6: calculates the starting offset of the new message, describes its implementation in detail later on the preAppend of dataFileList, and then writes the offset to the bytebuffer of the log.

DLedgerMmapFileStore#appendAsLeader

For (AppendHook writeHook: appendHooks) {writeHook.doHook (entry, dataBuffer.slice (), DLedgerEntry.BODY_OFFSET);}

Step7: executes the hook function.

DLedgerMmapFileStore#appendAsLeader

Long dataPos = dataFileList.append (dataBuffer.array (), 0, dataBuffer.remaining ()); PreConditions.check (dataPos! =-1, DLedgerResponseCode.DISK_ERROR, null); PreConditions.check (dataPos = = prePos, DLedgerResponseCode.DISK_ERROR, null)

Step8: appends data to pagecache. This method will be described in more detail later.

DLedgerMmapFileStore#appendAsLeader

DLedgerEntryCoder.encodeIndex (dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm (), indexBuffer); long indexPos = indexFileList.append (indexBuffer.array (), 0, indexBuffer.remaining (), false); PreConditions.check (indexPos = = entry.getIndex () * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null)

Step9: build the item index and append the index data to the pagecache.

DLedgerMmapFileStore#appendAsLeader

LedgerEndIndex++;ledgerEndTerm = memberState.currTerm (); if (ledgerBeginIndex = =-1) {ledgerBeginIndex = ledgerEndIndex;} updateLedgerEndIndexAndTerm ()

Step10:ledgerEndeIndex plus the serial number of the next entry. And set the ledgerEndIndex and ledgerEndTerm of the state machine of the leader node.

This is the end of the Leader node data append, which will focus on the implementation details of storage-related methods later.

1.3 Master node waits for ACK replication from slave node

Its implementation entry is the waitAck method of dLedgerEntryPusher.

DLedgerEntryPusher#waitAck

Public CompletableFuture waitAck (DLedgerEntry entry) {updatePeerWaterMark (entry.getTerm (), memberState.getSelfId (), entry.getIndex ()); / / @ 1 if (memberState.getPeerMap (). Size () = = 1) {/ / @ 2 AppendEntryResponse response = new AppendEntryResponse (); response.setGroup (memberState.getGroup ()) Response.setLeaderId (memberState.getSelfId ()); response.setIndex (entry.getIndex ()); response.setTerm (entry.getTerm ()); response.setPos (entry.getPos ()); return AppendFuture.newCompletedFuture (entry.getPos (), response);} else {checkTermForPendingMap (entry.getTerm (), "waitAck") AppendFuture future = new AppendFuture (dLedgerConfig.getMaxWaitAckTimeMs ()); / / @ 3 future.setPos (entry.getPos ()); CompletableFuture old = pendingAppendResponsesByTerm.get (entry.getTerm ()) .put (entry.getIndex (), future) / / @ 4 if (old! = null) {logger.warn ("[MONITOR] get old wait at index= {}", entry.getIndex ());} wakeUpDispatchers (); / / @ 5 return future;}}

Code @ 1: updates the push water mark of the current node. Code @ 2: if the number of nodes in the cluster is 1, the successful result is returned without forwarding. Code @ 3: build the append response Future and set the timeout. The default value is 2500 ms, which can be changed through the maxWaitAckTimeMs configuration. Code @ 4: put the built Future into the waiting result collection. Code @ 5: wake up the Entry forwarding thread, that is, push the data from the master node to each slave node.

Next, we interpret the above key points respectively.

1.3.1 updatePeerWaterMark method

DLedgerEntryPusher#updatePeerWaterMark

Private void updatePeerWaterMark (long term, String peerId, long index) {/ / Code @ 1 synchronized (peerWaterMarksByTerm) {checkTermForWaterMark (term, "updatePeerWaterMark"); / / Code @ 2 if (peerWaterMarksByTerm.get (term) .get (peerId)

< index) { // 代码@3 peerWaterMarksByTerm.get(term).put(peerId, index); } }} 代码@1:先来简单介绍该方法的两个参数: long term 当前的投票轮次。 String peerId 当前节点的ID。 long index 当前追加数据的序号。 代码@2:初始化 peerWaterMarksByTerm 数据结构,其结果为 < Long /** term */, Map< String /** peerId */, Long /** entry index*/>

.

Code @ 3: if the index stored by peerWaterMarksByTerm is less than the index of the current data, update.

1.3.2 wakeUpDispatchers detailed explanation

DLedgerEntryPusher#updatePeerWaterMark

Public void wakeUpDispatchers () {for (EntryDispatcher dispatcher: dispatcherMap.values ()) {dispatcher.wakeup ();}}

The main method is to traverse the transponder and wake up. The core of this method is EntryDispatcher, and let's take a look at the initialization of the collection before we cover it in detail.

DLedgerEntryPusher construction method

For (String peer: memberState.getPeerMap (). KeySet ()) {if (! peer.equals (memberState.getSelfId () {dispatcherMap.put (peer, new EntryDispatcher (peer, logger);}}

Originally, an EntryDispatcher object was created for each slave node when the DLedgerEntryPusher was built.

Obviously, log replication is implemented by DLedgerEntryPusher. Because of the space, this part will continue in the next article.

The above explanation of Leader additional log does not analyze the storage-related implementation in detail. For the sake of the completeness of the knowledge system, let's analyze its core implementation.

2. Log storage implementation details

This section mainly explains the preAppend and append methods of MmapFileList in detail.

> for the design of the storage section, please refer to the author's blog: source code analysis, RocketMQ DLedger multi-copy storage implementation, and MmapFileList MappedFileQueue for standard RocketMQ.

2.1the preAppend explanation of MmapFileList

This method will eventually call the two-parameter preAppend method, so let's look directly at the two-parameter preAppend method.

MmapFileList#preAppend

Public long preAppend (int len, boolean useBlank) {/ / @ 1 MmapFile mappedFile = getLastMappedFile (); / / @ 2 start if (null = = mappedFile | | mappedFile.isFull ()) {mappedFile = getLastMappedFile (0);} if (null = = mappedFile) {logger.error ("Create mapped file for {}", storePath); return-1 } / / @ 2 end int blank = useBlank? MIN_BLANK_LEN: 0; if (len + blank > mappedFile.getFileSize ()-mappedFile.getWrotePosition ()) {/ / @ 3 if (blank < MIN_BLANK_LEN) {logger.error ("Blank {} should ge {}", blank, MIN_BLANK_LEN); return-1;} else {ByteBuffer byteBuffer = ByteBuffer.allocate (mappedFile.getFileSize ()-mappedFile.getWrotePosition ()) / / @ 4 byteBuffer.putInt (BLANK_MAGIC_CODE); / / @ 5 byteBuffer.putInt (mappedFile.getFileSize ()-mappedFile.getWrotePosition ()) / / @ 6 if (mappedFile.appendMessage (byteBuffer.array () {/ / @ 7 / / need to set the wrote position mappedFile.setWrotePosition (mappedFile.getFileSize ()) } else {logger.error ("Append blank error for {}", storePath); return-1;} mappedFile = getLastMappedFile (0); if (null = = mappedFile) {logger.error ("Create mapped file for {}", storePath); return-1 } return mappedFile.getFileFromOffset () + mappedFile.getWrotePosition (); / / @ 8}

Code @ 1: first introduce the meaning of its parameters:

Int len requires the length of the application.

Whether boolean useBlank needs to be populated, the default is true.

Code @ 2: get the last file, that is, the file that is currently being written.

Code @ 3: the logic to deal with if the requested resource exceeds the writable bytes of the current file. The code @ 4Mutual 7 is its processing logic.

Code @ 4: request a bytebuffer of the size of the remaining bytes of the current file.

Code @ 5: write the magic number first.

Code @ 6: write byte length, equal to the total remaining size of the current file.

Code @ 7: write null bytes, code @ 4muric7 is intended to write an empty Entry, fill in the magic number and size, easy to parse.

Code @ 8: if the current file is large enough to hold the log to be written, its physical offset is returned directly.

After the above code interpretation, it is easy to see the effect of this method, which is to return the initial physical offset to be written to the log.

2.2 append details of MmapFileList

Finally, the append method with four parameters is called with the following code: MmapFileList#append

Public long append (byte [] data, int pos, int len, boolean useBlank) {/ / @ 1 if (preAppend (len, useBlank) = =-1) {return-1;} MmapFile mappedFile = getLastMappedFile (); / / @ 2 long currPosition = mappedFile.getFileFromOffset () + mappedFile.getWrotePosition () / / @ 3 if (! mappedFile.appendMessage (data, pos, len)) {/ / @ 4 logger.error ("Append error for {}", storePath); return-1;} return currPosition;}

Code @ 1: first introduce the parameters:

Byte [] data the data to be written, that is, the log to be appended.

Where does int pos start reading from the data byte array.

The number of bytes to be written to the int len.

Whether boolean useBlank uses padding or not, the default is true.

Code @ 2: get the last file, the currently writable file.

Code @ 3: gets the current write pointer.

Code @ 4: append messages.

Finally, let's take a look at appendMessage, where the specific message appends the implementation logic.

DefaultMmapFile#appendMessage

Public boolean appendMessage (final byte [] data, final int offset, final int length) {int currentPos = this.wrotePosition.get (); if ((currentPos + length))

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report