Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The implementation of RocketMQ DLedger Log replication

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "the implementation of RocketMQ DLedger log replication". In daily operation, I believe that many people have doubts about the implementation of RocketMQ DLedger log replication. The editor consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "the implementation of RocketMQ DLedger log replication". Next, please follow the editor to study!

> Tip: due to the length of this article, in order to better understand its implementation, you can read through this article with the following questions: 1. There is a very important concept in the raft protocol: how to implement the submitted log serial number. 2. If the client sends a log to the DLedger cluster, it must be approved by most nodes in the cluster before it can be considered successful. 3. How to implement the append and submit actions in raft protocol.

Log replication (log forwarding) is implemented by DLedgerEntryPusher, as shown in the following class diagram:

It is mainly composed of the following four categories:

DLedgerEntryPusher DLedger log forwarding and processing core class, which starts the following three objects, each corresponding to a thread.

The EntryHandler log receives the processing thread and is activated when the node is a slave node.

QuorumAckChecker log appends ACK voting processing thread, which is activated when the current node is the primary node.

EntryDispatcher log forwarding thread, appended when the current node is the primary node.

Next, we will introduce the above four classes in detail to reveal the core implementation principle of log replication.

1. DLedgerEntryPusher1.1 core class diagram

The core implementation class of DLedger multi-copy log push, in which three core threads of EntryDispatcher, QuorumAckChecker and EntryHandler are created. Its core attributes are as follows:

DLedgerConfig dLedgerConfig multi-copy related configuration.

DLedgerStore dLedgerStore stores the implementation class.

MemberState memberState node state machine.

DLedgerRpcService dLedgerRpcService RPC service implementation class, which is used for network communication among other nodes in the cluster.

Map peerWaterMarksByTerm each node is marked based on the current watermark of the voting round. The key value is the voting round, and the value is ConcurrentMap.

Map pendingAppendResponsesByTerm is used to store the response result of the append request (Future mode).

EntryHandler entryHandler A thread opened from a node to receive push requests (append, commit, append) from the master node.

The append request voter on the QuorumAckChecker quorumAckChecker primary node.

Map dispatcherMap master node log request forwarder, copy messages to slave node, etc.

Next, let's introduce the implementation of its core method.

1.2 Construction method public DLedgerEntryPusher (DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) {this.dLedgerConfig = dLedgerConfig; this.memberState = memberState; this.dLedgerStore = dLedgerStore; this.dLedgerRpcService = dLedgerRpcService; for (String peer: memberState.getPeerMap (). KeySet ()) {if (! peer.equals (memberState.getSelfId () {dispatcherMap.put (peer, new EntryDispatcher (peer, logger));}

The focus of the construction method is that the corresponding EntryDispatcher objects are built according to the nodes in the cluster.

1.3 startup

DLedgerEntryPusher#startup

Public void startup () {entryHandler.start (); quorumAckChecker.start (); for (EntryDispatcher dispatcher: dispatcherMap.values ()) {dispatcher.start ();}}

Start the EntryHandler, QuorumAckChecker, and EntryDispatcher threads in turn.

> Note: other core methods of DLedgerEntryPusher will be introduced in the process of analyzing the principle of log replication in detail.

Next, we will explain the implementation principle of RocketMQ DLedger (multiple copies) from EntryDispatcher, QuorumAckChecker and EntryHandler.

2. EntryDispatcher detailed explanation 2.1Core Class Diagram

Its core attributes are as follows.

AtomicReference type = new AtomicReference (PushEntryRequest.Type.COMPARE) the type of command to send to the slave node. Available values: PushEntryRequest.Type.COMPARE, TRUNCATE, APPEND, COMMIT, as detailed below.

Long lastPushCommitTimeMs =-1 timestamp of the last delivery type sent.

String peerId target node ID.

Long compareIndex =-1 log sequence number that has completed the comparison.

Long writeIndex =-1 log sequence number that has been written.

Int maxPendingSize = 1000 the maximum number of pending logs allowed.

Long term =-1 the current voting round of the Leader node.

String leaderId = null Leader node ID.

Long lastCheckLeakTimeMs = System.currentTimeMillis () the last time the leak was detected, the so-called leak is to see if the number of pending log requests has checked maxPendingSize.

ConcurrentMap pendingMap = new ConcurrentHashMap () records the pending time of the log, key: the sequence of logs (entryIndex), value: pending timestamp.

Quota quota = new Quota (dLedgerConfig.getPeerPushQuota ()) quota.

2.2 Push request type

The DLedger master node replicates the log to the slave node and defines a total of four request types. The enumeration type is PushEntryRequest.Type, and its values are COMPARE, TRUNCATE, APPEND, and COMMIT.

COMPARE if the Leader changes, the new Leader needs to be compared with its slave node's log entries in order to truncate excess data from the slave node.

TRUNCATE if Leader completes log comparison by index, Leader sends TRUNCATE to its slave node.

APPEND appends log entries to the slave node.

COMMIT typically, leader appends the submitted index to the append request, but if the append request is small and scattered, leader sends a separate request to notify the index submitted from the node.

Once we have a preliminary understanding of the request type of the master-slave node, we will start with the business processing entry doWork method of EntryDispatcher.

2.3 doWork method details public void doWork () {try {if (! checkAndFreshState ()) {/ / [@ 1] (https://my.oschina.net/u/1198) waitForRunning (1); return } if (type.get () = = PushEntryRequest.Type.APPEND) {/ / @ 2 doAppend ();} else {doCompare (); / / [@ 3] (https://my.oschina.net/u/2648711)} waitForRunning (1) } catch (Throwable t) {DLedgerEntryPusher.logger.error ("[Push- {}] Error in {} writeIndex= {} compareIndex= {}", peerId, getName (), writeIndex, compareIndex, t); DLedgerUtils.sleep;}}

Code @ 1: check the status to see if you can continue sending append or compare.

Code @ 2: if the push type is APPEND, the master node propagates the message request to the slave node.

Code @ 3: the master node sends a request to compare data differences to the slave node (this is often the first step when a new node is elected as the master node).

2.3.1 checkAndFreshState detailed explanation

EntryDispatcher#checkAndFreshState

Private boolean checkAndFreshState () {if (! memberState.isLeader ()) {/ / @ 1 return false;} if (term! = memberState.currTerm () | leaderId = = null | |! leaderId.equals (memberState.getLeaderId () {/ / @ 2 synchronized (memberState) {if (! memberState.isLeader ()) {return false } PreConditions.check (memberState.getSelfId (). Equals (memberState.getLeaderId ()), DLedgerResponseCode.UNKNOWN); term = memberState.currTerm (); leaderId = memberState.getSelfId (); changeState (- 1, PushEntryRequest.Type.COMPARE);}} return true;}

Code @ 1: if the state of the node is not the primary node, it returns false directly. Then the doWork method ends. Because only the master node needs to forward the log to the slave node.

Code @ 2: if the current node status is the master node, but the current voting round is not set to the state machine rotation or leaderId, or the leaderId is not equal to the state machine leaderId, the cluster usually triggers a re-election and synchronizes its term and leaderId with the state machine, and the COMPARE request is about to be sent.

Next, take a look at changeState (change state).

Private synchronized void changeState (long index, PushEntryRequest.Type target) {logger.info ("[Push- {}] Change state from {} to {} at {}", peerId, type.get (), target, index); switch (target) {case APPEND: / / @ 1 compareIndex =-1; updatePeerWaterMark (term, peerId, index); quorumAckChecker.wakeup (); writeIndex = index + 1 Break; case COMPARE: / / @ 2 if (this.type.compareAndSet (PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {compareIndex =-1; pendingMap.clear ();} break; case TRUNCATE: / / @ 3 compareIndex =-1; break Default: break;} type.set (target);}

Code @ 1: if the target type is set to append, reset compareIndex and set writeIndex to the current index plus 1.

Code @ 2: if the target type is set to COMPARE, the compareIndex is reset to minus one, and then a similar COMPARE request is sent to each slave node, and the pending request is cleared.

Code @ 3: if the target type is set to TRUNCATE, the compareIndex is reset to negative one.

Next, let's take a look at APPEND, COMPARE, TRUNCATE, and other requests.

2.3.2 append request for detailed explanation

EntryDispatcher#doAppend

Private void doAppend () throws Exception {while (true) {if (! checkAndFreshState ()) {/ / @ 1 break;} if (type.get ()! = PushEntryRequest.Type.APPEND) {/ / @ 2 break } if (writeIndex > dLedgerStore.getLedgerEndIndex ()) {/ / @ 3 doCommit (); doCheckAppendResponse (); break;} if (pendingMap.size () > = maxPendingSize | | (DLedgerUtils.elapsed (lastCheckLeakTimeMs) > 1000)) {/ / @ 4 long peerWaterMark = getPeerWaterMark (term, peerId) For (Long index: pendingMap.keySet ()) {if (index)

< peerWaterMark) { pendingMap.remove(index); } } lastCheckLeakTimeMs = System.currentTimeMillis(); } if (pendingMap.size() >

= maxPendingSize) {/ / @ 5 doCheckAppendResponse (); break;} doAppendInner (writeIndex); / / @ 6 writeIndex++;}}

Code @ 1: check the status, which has been described in detail above.

Code @ 2: if the request type is not APPEND, exit and end the current round of doWork method execution.

The code @ 3:writeIndex represents the sequence number currently appended to the slave node. Usually, when the master node sends the append request to the slave node, it will be accompanied by the submitted pointer of the master node. However, when the append request is sent less frequently, when the writeIndex is greater than the leaderEndIndex (because the pending request exceeds the queue length of its pending request (default is 1w), it will prevent the data from being appended. It is possible that the writeIndex is greater than the leaderEndIndex, and the COMMIT request will be sent separately.

Code @ 4: detect whether the pendingMap (number of pending requests) sends leaks, that is, whether the capacity in the pending queue exceeds the maximum allowed pending threshold. Get the current watermark of the current node for the current round (the log sequence number of the successful append request). If it is found that the log sequence number of the pending request is less than the watermark, it will be discarded.

Code @ 5: if the pending request (waiting for the result to be appended from the node) is greater than maxPendingSize, check and append an append request.

Code @ 6: specific append request.

2.3.2.1 doCommit sends a submission request

EntryDispatcher#doCommit

Private void doCommit () throws Exception {if (DLedgerUtils.elapsed (lastPushCommitTimeMs) > 1000) {/ / @ 1 PushEntryRequest request = buildPushRequest (null, PushEntryRequest.Type.COMMIT); / / @ 2 / / Ignore the results dLedgerRpcService.push (request) / / @ 3 lastPushCommitTimeMs = System.currentTimeMillis ();}}

Code @ 1: if the last time the request for commit was sent separately, the time interval between the current time and the request is less than 1s, this submission request is abandoned.

Code @ 2: build the submit request.

Code @ 3: sends a commit request to the slave node over the network.

Let's take a look at how to build the commit request package.

EntryDispatcher#buildPushRequest

Private PushEntryRequest buildPushRequest (DLedgerEntry entry, PushEntryRequest.Type target) {PushEntryRequest request = new PushEntryRequest (); request.setGroup (memberState.getGroup ()); request.setRemoteId (peerId); request.setLeaderId (leaderId); request.setTerm (term); request.setEntry (entry); request.setType (target); request.setCommitIndex (dLedgerStore.getCommittedIndex ()); return request;}

The submittal request field mainly contains the following fields: group to which the DLedger node belongs, id of the slave node, id of the master node, current voting round, log content, request type and committedIndex (log sequence number submitted by the master node).

2.3.2.2 doCheckAppendResponse checks and appends requests

EntryDispatcher#doCheckAppendResponse

Private void doCheckAppendResponse () throws Exception {long peerWaterMark = getPeerWaterMark (term, peerId); / / @ 1 Long sendTimeMs = pendingMap.get (peerWaterMark + 1); if (sendTimeMs! = null & & System.currentTimeMillis ()-sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs ()) {/ / @ 2 logger.warn ("[Push- {}] Retry to push entry at {}", peerId, peerWaterMark + 1); doAppendInner (peerWaterMark + 1);}}

The purpose of this method is to check whether the append request has timed out. The key implementation is as follows:

Gets the sequence number of a successful append.

Get the sending time of the next item from the pending request queue. If it is not empty and the append timeout is exceeded, the append request is sent again. The maximum timeout is 1 second by default, and the default value can be changed through maxPushTimeOutMs.

2.3.2.3 doAppendInner append request

Send an append request to the slave node.

EntryDispatcher#doAppendInner

Private void doAppendInner (long index) throws Exception {DLedgerEntry entry = dLedgerStore.get (index); / / @ 1 PreConditions.check (entry! = null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index); checkQuotaAndWait (entry); / / @ 2 PushEntryRequest request = buildPushRequest (entry, PushEntryRequest.Type.APPEND); / / @ 3 CompletableFuture responseFuture = dLedgerRpcService.push (request) / / @ 4 pendingMap.put (index, System.currentTimeMillis ()); / / @ 5 responseFuture.whenComplete ((x, ex)-> {try {PreConditions.check (ex = = null, DLedgerResponseCode.UNKNOWN); DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf (x.getCode ()) Switch (responseCode) {case SUCCESS: / / @ 6 pendingMap.remove (x.getIndex ()); updatePeerWaterMark (x.getTerm (), peerId, x.getIndex ()) QuorumAckChecker.wakeup (); break Case INCONSISTENT_STATE: / / @ 7 logger.info ("[Push- {}] Get INCONSISTENT_STATE when push index= {} term= {}", peerId, x.getIndex (), x.getTerm ()) ChangeState (- 1, PushEntryRequest.Type.COMPARE); break; default: logger.warn ("[Push- {}] Get error response code {} {}", peerId, responseCode, x.baseInfo ()); break }} catch (Throwable t) {logger.error (", t);}}); lastPushCommitTimeMs = System.currentTimeMillis ();}

Code @ 1: first query the log according to the serial number.

Code @ 2: check the quota. If the quota is exceeded, a certain limit will be imposed. The key implementation points are:

First of all, the trigger condition: the number of append pending requests has exceeded the maximum allowed pending number; based on file storage and the master-slave difference is more than 300m, it can be configured through peerPushThrottlePoint.

If more than 20m logs are appended per second (which can be configured through peerPushQuota), they will be appended in sleep 1s.

Code @ 3: build the PUSH request log.

Code @ 4: send a network request to the slave node through Netty, and the request received from the node will be processed (the network-related implementation details are not discussed in this article).

Code @ 5: use pendingMap to record the sending time of the log to be appended, which is used by the sender to determine whether it times out or not.

Code @ 6: the key implementation points of the successful processing logic of the request are as follows:

Removes the send timeout for this log in pendingMap.

Update the log sequence number that has been successfully appended (organized by voting round, and each slave server has a key-value pair).

Wake up the quorumAckChecker thread (mainly used to arbitrate append results), which will be described in more detail later.

If the code @ 7:Push request has an inconsistent status, an COMPARE request will be sent to compare whether the data of the master-slave node is consistent.

So much for the log forwarding append append request type, so let's move on to another request type, compare.

2.3.3 compare request for details

Requests of type COMPARE are sent by the doCompare method, which first runs in while (true), so when looking up the following code, you should pay attention to the conditions under which it exits the loop. EntryDispatcher#doCompare

If (! checkAndFreshState ()) {break;} if (type.get ()! = PushEntryRequest.Type.COMPARE & & type.get ()! = PushEntryRequest.Type.TRUNCATE) {break;} if (compareIndex = =-1 & & dLedgerStore.getLedgerEndIndex () = =-1) {break;}

Step1: verify whether it is executed, there are several key points as follows:

Determine whether it is the primary node, and if it is not, jump out directly.

If the request type is not a COMPARE or TRUNCATE request, jump out directly.

If the comparison index and ledgerEndIndex are both-1, indicating a new DLedger cluster, jump out directly.

EntryDispatcher#doCompare

If (compareIndex= =-1) {compareIndex= dLedgerStore.getLedgerEndIndex (); logger.info ("[Push- {}] [DoCompare] compareIndex=-1 means start to compare", peerId);} else if (compareIndex > dLedgerStore.getLedgerEndIndex () | | compareIndex

< dLedgerStore.getLedgerBeginIndex()) { logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex()); compareIndex = dLedgerStore.getLedgerEndIndex();} Step2:如果 compareIndex 为 -1 或compareIndex 不在有效范围内,则重置待比较序列号为当前已已存储的最大日志序号:ledgerEndIndex。 DLedgerEntry entry = dLedgerStore.get(compareIndex);PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);CompletableFuture responseFuture = dLedgerRpcService.push(request);PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS); Step3:根据序号查询到日志,并向从节点发起 COMPARE 请求,其超时时间为 3s。 EntryDispatcher#doCompare long truncateIndex = -1;if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) { // @1 if (compareIndex == response.getEndIndex()) { changeState(compareIndex, PushEntryRequest.Type.APPEND); break; } else { truncateIndex = compareIndex; }} else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() || response.getBeginIndex() >

DLedgerStore.getLedgerEndIndex () {/ / @ 2 truncateIndex = dLedgerStore.getLedgerBeginIndex ();} else if (compareIndex

< response.getBeginIndex()) { // @3 truncateIndex = dLedgerStore.getLedgerBeginIndex();} else if (compareIndex >

Response.getEndIndex () {/ / @ 4 compareIndex = response.getEndIndex ();} else {/ / @ 5 compareIndex--;} if (compareIndex

< dLedgerStore.getLedgerBeginIndex()) { // @6 truncateIndex = dLedgerStore.getLedgerBeginIndex();} Step4:根据响应结果计算需要截断的日志序号,其主要实现关键点如下: 代码@1:如果两者的日志序号相同,则无需截断,下次将直接先从节点发送 append 请求;否则将 truncateIndex 设置为响应结果中的 endIndex。 代码@2:如果从节点存储的最大日志序号小于主节点的最小序号,或者从节点的最小日志序号大于主节点的最大日志序号,即两者不相交,这通常发生在从节点崩溃很长一段时间,而主节点删除了过期的条目时。truncateIndex 设置为主节点的 ledgerBeginIndex,即主节点目前最小的偏移量。 代码@3:如果已比较的日志序号小于从节点的开始日志序号,很可能是从节点磁盘发送损耗,从主节点最小日志序号开始同步。 代码@4:如果已比较的日志序号大于从节点的最大日志序号,则已比较索引设置为从节点最大的日志序号,触发数据的继续同步。 代码@5:如果已比较的日志序号大于从节点的开始日志序号,但小于从节点的最大日志序号,则待比较索引减一。 代码@6:如果比较出来的日志序号小于主节点的最小日志需要,则设置为主节点的最小序号。 if (truncateIndex != -1) { changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE); doTruncate(truncateIndex); break;} Step5:如果比较出来的日志序号不等于 -1 ,则向从节点发送 TRUNCATE 请求。 2.3.3.1 doTruncate 详解private void doTruncate(long truncateIndex) throws Exception { PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN); DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex); PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN); logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos()); PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE); PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS); PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex); PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex); lastPushCommitTimeMs = System.currentTimeMillis(); changeState(truncateIndex, PushEntryRequest.Type.APPEND);} 该方法主要就是构建 truncate 请求到从节点。 关于服务端的消息复制转发就介绍到这里了,主节点负责向从服务器PUSH请求,从节点自然而然的要处理这些请求,接下来我们就按照主节点发送的请求,来具体分析一下从节点是如何响应的。 3、EntryHandler 详解 EntryHandler 同样是一个线程,当节点状态为从节点时激活。 3.1 核心类图 其核心属性如下: long lastCheckFastForwardTimeMs 上一次检查主服务器是否有 push 消息的时间戳。 ConcurrentMap>

WriteRequestMap append request processing queue.

BlockingQueue > compareOrTruncateRequests COMMIT, COMPARE, TRUNCATE related requests

3.2 handlePush

From the above, we know that the master node will actively propagate the log to the slave node, and the slave node will receive the request data for processing through the network. The call chain is shown in the figure:

The handlePush method of EntryHandler is eventually called.

EntryHandler#handlePush

Public CompletableFuture handlePush (PushEntryRequest request) throws Exception {/ / The timeout should smaller than the remoting layer's request timeout CompletableFuture future = new TimeoutFuture (1000) / / @ 1 switch (request.getType ()) {case APPEND: / / @ 2 PreConditions.check (request.getEntry ()! = null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); long index = request.getEntry () .getIndex () Pair old = writeRequestMap.putIfAbsent (index, new Pair (request, future); if (old! = null) {logger.warn ("[MONITOR] The index {} has already existed with {} and curr is {}", index, old.getKey (). BaseInfo (), request.baseInfo ()); future.complete (buildResponse (request, DLedgerResponseCode.REPEATED_PUSH.getCode () } break; case COMMIT: / / @ 3 compareOrTruncateRequests.put (new Pair (request, future)); break Case COMPARE: case TRUNCATE: / / @ 4 PreConditions.check (request.getEntry ()! = null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); writeRequestMap.clear (); compareOrTruncateRequests.put (new Pair (request, future)) Break; default: logger.error ("[BUG] Unknown type {} from {}", request.getType (), request.baseInfo ()); future.complete (buildResponse (request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode (); break;} return future;}

To process the push request of the master node from several points, the key points of its implementation are as follows.

Code @ 1: first build a response result Future with a default timeout of 1s.

Code @ 2: if it is an APPEND request, put it in the writeRequestMap collection. If the data structure already exists, the master node will push it repeatedly, and the construction will return the result. The status code is REPEATED_PUSH. Put it into the writeRequestMap, and the doWork method periodically processes the request to be written.

Code @ 3: if the request is submitted, the request is stored in the compareOrTruncateRequests request processing and processed asynchronously by the doWork method.

Code @ 4: if it is a COMPARE or TRUNCATE request, the queue writeRequestMap to be written is cleared and the request is placed in the compareOrTruncateRequests request queue, which is processed asynchronously by the doWork method.

Next, we focus on the implementation of the doWork method.

3.3 detailed explanation of doWork method

EntryHandler#doWork

Public void doWork () {try {if (! memberState.isFollower ()) {/ / @ 1 waitForRunning (1); return;} if (compareOrTruncateRequests.peek ()! = null) {/ / @ 2 Pair pair = compareOrTruncateRequests.poll (); PreConditions.check (pair! = null, DLedgerResponseCode.UNKNOWN) Switch (pair.getKey (). GetType ()) {case TRUNCATE: handleDoTruncate (pair.getKey (). GetEntry (). GetIndex (), pair.getKey (), pair.getValue ()); break; case COMPARE: handleDoCompare (pair.getKey (). GetEntry (). GetIndex (), pair.getKey (), pair.getValue ()) Break; case COMMIT: handleDoCommit (pair.getKey (). GetCommitIndex (), pair.getKey (), pair.getValue ()); break; default: break;}} else {/ / @ 3 long nextIndex = dLedgerStore.getLedgerEndIndex () + 1 Pair pair = writeRequestMap.remove (nextIndex); if (pair = = null) {checkAbnormalFuture (dLedgerStore.getLedgerEndIndex ()); waitForRunning (1); return;} PushEntryRequest request = pair.getKey (); handleDoAppend (nextIndex, request, pair.getValue ()) }} catch (Throwable t) {DLedgerEntryPusher.logger.error ("Error in {}", getName (), t); DLedgerUtils.sleep;}}

Code @ 1: if the status of the current node is not from the node, jump out.

Code @ 2: if the compareOrTruncateRequests queue is not empty, there are COMMIT, COMPARE, TRUNCATE and other requests, which are processed first. It is worth noting that non-blocking methods such as peek and poll are used here, and then the corresponding methods are called according to the type of request. It will be described in more detail later.

Code @ 3: if there is only an append class request, according to the largest message sequence number of the current node, try to get the next message replication request (ledgerEndIndex + 1) from the writeRequestMap container for key to find it. If it is not empty, the doAppend request is executed, and if it is empty, checkAbnormalFuture is called to handle the exception.

Next, let's focus on the details of each processing.

3.3.1 handleDoCommit

To handle the submission request, it is relatively simple to call the updateCommittedIndex of DLedgerStore to update its submitted offset, so let's take a look at the updateCommittedIndex method of DLedgerStore.

DLedgerMmapFileStore#updateCommittedIndex

Public void updateCommittedIndex (long term, long newCommittedIndex) {/ / @ 1 if (newCommittedIndex = =-1 | | ledgerEndIndex = =-1 | | term

< memberState.currTerm() || newCommittedIndex == this.committedIndex) { // @2 return; } if (newCommittedIndex < this.committedIndex || newCommittedIndex < this.ledgerBeginIndex) { // @3 logger.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex); return; } long endIndex = ledgerEndIndex; if (newCommittedIndex >

EndIndex) {/ / @ 4 / / If the node fall behind too much, the committedIndex will be larger than enIndex. NewCommittedIndex = endIndex;} DLedgerEntry dLedgerEntry = get (newCommittedIndex); / / @ 5 PreConditions.check (dLedgerEntry! = null, DLedgerResponseCode.DISK_ERROR); this.committedIndex = newCommittedIndex; this.committedPos = dLedgerEntry.getPos () + dLedgerEntry.getSize (); / / @ 6}

Code @ 1: first introduce the parameters of the method:

The current voting rotation of the long term master node.

Long newCommittedIndex: the submitted log sequence number when the primary node sends the log replication request.

Code @ 2: if the submission serial number to be updated is-1 or the voting round is less than the voting round of the slave node or the voting round of the master node is equal to the submitted serial number of the slave node, the submission action will be ignored directly.

Code @ 3: if the submitted log sequence number of the master node is less than the submitted log sequence number of the slave node or the submitted sequence number of the slave node is less than the minimum valid log sequence number of the current node, the warning log [MONITOR] is output and the submission action is ignored.

Code @ 4: if the slave node lags too much behind the master node, the commit index is reset to the current maximum valid log sequence number of the slave node.

Code @ 5: try to find data from the slave node based on the sequence number to be submitted, and throw a DISK_ERROR error if the data does not exist.

Code @ 6: update the commitedIndex and committedPos pointers. DledgerStore will regularly brush the submitted pointers into the checkpoint file to achieve the purpose of persisting the commitedIndex pointers.

3.3.2 handleDoCompare

The implementation of the COMPARE request sent by the master node is also relatively simple, and finally the buildResponse method is called to construct the response result.

EntryHandler#buildResponse

Private PushEntryResponse buildResponse (PushEntryRequest request, int code) {PushEntryResponse response = new PushEntryResponse (); response.setGroup (request.getGroup ()); response.setCode (code); response.setTerm (request.getTerm ()); if (request.getType ()! = PushEntryRequest.Type.COMMIT) {response.setIndex (request.getEntry (). GetIndex ());} response.setBeginIndex (dLedgerStore.getLedgerBeginIndex ()); response.setEndIndex (dLedgerStore.getLedgerEndIndex ()); return response;}

It mainly returns the current ledgerBeginIndex, ledgerEndIndex and voting rounds from which time points for the master node to judge and compare.

3.3.3 handleDoTruncate

The implementation of the handleDoTruncate method is relatively simple. Delete all the logs after the truncateIndex log sequence number on the slave node, and specifically call the truncate method of dLedgerStore. Because its storage design is basically similar to that of RocketMQ, this paper will not introduce it in detail, but briefly introduce the main points of its implementation: locate the log file according to the log sequence number, and modify the corresponding read-write pointer, brushing pointer, etc. And delete all files after the physical file. If you are interested, you can consult the author's "RocketMQ Technology Insider" Chapter 4: RocketMQ storage related content.

3.3.4 handleDoAppendprivate void handleDoAppend (long writeIndex, PushEntryRequest request, CompletableFuture future) {try {PreConditions.check (writeIndex = = request.getEntry (). GetIndex (), DLedgerResponseCode.INCONSISTENT_STATE); DLedgerEntry entry = dLedgerStore.appendAsFollower (request.getEntry (), request.getTerm (), request.getLeaderId ()); PreConditions.check (entry.getIndex () = = writeIndex, DLedgerResponseCode.INCONSISTENT_STATE); future.complete (buildResponse (request, DLedgerResponseCode.SUCCESS.getCode () DLedgerStore.updateCommittedIndex (request.getTerm (), request.getCommitIndex ());} catch (Throwable t) {logger.error ("[HandleDoWrite] writeIndex= {}", writeIndex, t); future.complete (buildResponse (request, DLedgerResponseCode.INCONSISTENT_STATE.getCode ());}}

Its implementation is also relatively simple, calling the appendAsFollower method of DLedgerStore to append the log, which is the same as the appendAsLeader in the log storage part, except that there is no need to forward the log from the slave node.

3.3.5 checkAbnormalFuture

This method is the focus of this section. DoWork's maximum valid log sequence number (ledgerEndIndex) + 1 stored on the server is called when it tries to get no corresponding request from the request to be written. For example, the master node does not PUSH the latest data to the slave node. Next, let's take a look at the implementation details of this method. EntryHandler#checkAbnormalFuture

If (DLedgerUtils.elapsed (lastCheckFastForwardTimeMs))

< 1000) { return;}lastCheckFastForwardTimeMs = System.currentTimeMillis();if (writeRequestMap.isEmpty()) { return;} Step1:如果上一次检查的时间距现在不到1s,则跳出;如果当前没有积压的append请求,同样跳出,因为可以同样明确的判断出主节点还未推送日志。 EntryHandler#checkAbnormalFuture for (Pair pair : writeRequestMap.values()) { long index = pair.getKey().getEntry().getIndex(); // @1 //Fall behind if (index 3000) { logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm)); lastPrintWatermarkTimeMs = System.currentTimeMillis();} Step1:如果离上一次打印 watermak 的时间超过3s,则打印一下当前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 这些数据日志。 QuorumAckChecker#doWork if (!memberState.isLeader()) { // @2 waitForRunning(1); return;} Step2:如果当前节点不是主节点,直接返回,不作为。 QuorumAckChecker#doWork if (pendingAppendResponsesByTerm.size() >

1) {/ / @ 1 for (Long term: pendingAppendResponsesByTerm.keySet ()) {if (term = = currTerm) {continue;} for (Map.Entry futureEntry: pendingAppendResponsesByTerm.get (term). EntrySet ()) {AppendEntryResponse response = new AppendEntryResponse (); response.setGroup (memberState.getGroup ()); response.setIndex (futureEntry.getKey ()) Response.setCode (DLedgerResponseCode.TERM_CHANGED.getCode ()); response.setLeaderId (memberState.getLeaderId ()); logger.info ("[TermChange] Will clear the pending response index= {} for term changed from {} to {}", futureEntry.getKey (), term, currTerm); futureEntry.getValue () .complete (response);} pendingAppendResponsesByTerm.remove (term) } if (peerWaterMarksByTerm.size () > 1) {for (Long term: peerWaterMarksByTerm.keySet ()) {if (term = = currTerm) {continue;} logger.info ("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm); peerWaterMarksByTerm.remove (term);}}

Step3: clean up the data of this voting round in pendingAppendResponsesByTerm and peerWaterMarksByTerm to avoid unnecessary memory use.

Map peerWaterMarks = peerWaterMarksByTerm.get (currTerm); long quorumIndex =-1 for (Long index: peerWaterMarks.values ()) {/ / @ 1 int num = 0; for (Long another: peerWaterMarks.values ()) {/ / @ 2 if (another > = index) {num++;} if (memberState.isQuorum (num) & & index > quorumIndex) {/ / @ 3 quorumIndex = index }} dLedgerStore.updateCommittedIndex (currTerm, quorumIndex); / / @ 4

Step4: arbitration is conducted to determine the submitted serial number according to the feedback progress of each slave node. In order to deepen the understanding of this code, let's talk about the role of peerWaterMarks, which stores the log sequence numbers that have been successfully appended by each slave node. For example, for a three-node DLedger cluster, the peerWaterMarks data storage is roughly as follows:

{"dledger_group_01_0": 100,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,101,10

Where dledger_group_01_0 is the ID of slave node 1, the currently replicated sequence number is 100, and dledger_group_01_1 is the ID of node 2, and the current replicated sequence number is 101. Plus the master node, how to determine the deliverable serial number?

Code @ 1: first iterate through the value collection of peerWaterMarks, that is, {100,101} in the above example, and use the temporary variable index to represent the log sequence number to be voted on. If the replicated sequence number of more than half of the nodes in the cluster exceeds this value, the log can be confirmed to be submitted.

Code @ 2: traverses all submitted sequence numbers in peerWaterMarks and compares them with the current value. If the submitted sequence number of the node is greater than or equal to the log sequence number (index) to be voted on, num plus one means to vote in favor.

Code @ 3: arbitrate index. If more than half of the index is greater than quorumIndex, the value of the update quorumIndex is index. QuorumIndex traverses to get the current largest deliverable log sequence number.

Code @ 4: update the committedIndex index to make it easier for DLedgerStore to regularly write committedIndex to checkpoint.

ConcurrentMap responses = pendingAppendResponsesByTerm.get (currTerm); boolean needCheck = false;int ackNum = 0 try if (quorumIndex > = 0) {for (Long I = quorumIndex; I > = 0 X I--) {/ / @ 1 try {CompletableFuture future = responses.remove (I) / / @ 2 if (future = = null) {/ / @ 3 needCheck = lastQuorumIndex! =-1 & & lastQuorumIndex! = quorumIndex & & I! = lastQuorumIndex; break } else if (! future.isDone ()) {/ / @ 4 AppendEntryResponse response = new AppendEntryResponse (); response.setGroup (memberState.getGroup ()); response.setTerm (currTerm); response.setIndex (I) Response.setLeaderId (memberState.getSelfId ()); response.setPos (AppendFuture) future). GetPos ()); future.complete (response);} ackNum++ / / @ 5} catch (Throwable t) {logger.error ("Error in ack to index= {} term= {}", I, currTerm, t);}}

Step5: to process a pending request before quorumIndex, you need to send a response to the client. The implementation steps are as follows:

Code @ 1: starting with quorumIndex, no one is processed, and the sequence number is minus one until it is greater than 0 or exits voluntarily. See the exit logic below.

Remove the pending request for the log entry in the code @ 2:responses.

Code @ 3: if no pending request is found, it means that all pending requests have been processed and are ready to exit. Before exiting, set the value of needCheck as follows (three conditions must be met at the same time):

The log sequence number of the last arbitration is not equal to-1

And the last time is not equal to the log serial number of this new arbitration.

The log sequence number of the last arbitration is not equal to the log of the last arbitration. Normally, condition one and condition two are usually true, but this high probability will return false.

Code @ 4: returns the result to the client.

The code @ 5:ackNum indicates the quantity confirmed this time.

If (ackNum = = 0) {for (long I = quorumIndex + 1; I

< Integer.MAX_VALUE; i++) { TimeoutFuture future = responses.get(i); if (future == null) { break; } else if (future.isTimeOut()) { AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode()); response.setTerm(currTerm); response.setIndex(i); response.setLeaderId(memberState.getSelfId()); future.complete(response); } else { break; } } waitForRunning(1);} Step6:如果本次确认的个数为0,则尝试去判断超过该仲裁序号的请求,是否已经超时,如果已超时,则返回超时响应结果。 if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) >

1000 | | needCheck) {updatePeerWaterMark (currTerm, memberState.getSelfId (), dLedgerStore.getLedgerEndIndex ()); for (Map.Entry futureEntry: responses.entrySet ()) {if (futureEntry.getKey () < quorumIndex) {AppendEntryResponse response = new AppendEntryResponse (); response.setGroup (memberState.getGroup ()); response.setTerm (currTerm); response.setIndex (futureEntry.getKey ()) Response.setLeaderId (memberState.getSelfId ()); response.setPos (AppendFuture) futureEntry.getValue ()). GetPos ()); futureEntry.getValue () .complete (response); responses.remove (futureEntry.getKey ());}} lastCheckLeakTimeMs = System.currentTimeMillis ();}

Step7: check to see if leaks are sent. The basis for judging the leak is that if the log sequence number of the pending request is less than the submitted sequence number, it is removed.

Step8: one log arbitration is over, and finally update lastQuorumIndex to the new submission value of this arbitration.

At this point, the study of "the implementation of RocketMQ DLedger log replication" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report