Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement Raft distributed consistency algorithm

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the knowledge of "how to implement Raft distributed consistency algorithm". Many people will encounter this dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

CAP theorem

Consistency: consistency

Availability: availability

Partition-tolerance: partition fault tolerance

CAP theorem points out that in the asynchronous network model, there is no system that can satisfy the above three attributes at the same time. In other words, the distributed system must discard one of these attributes. For systems that need to run under distributed conditions, the first thing to consider is how to choose between consistency, availability and partition fault tolerance, or which attribute to weaken.

For systems with high availability, strong consistency is often maintained. But for systems with strong consistency, there is a kind of algorithm dedicated to solving this problem-consensus algorithm. "consensus" means to ensure that all participants have the same cognition (which can be understood as strong consistency). Consensus algorithm itself can be divided into two categories according to whether there are malicious nodes. Most of the time, consensus algorithms refer to those without malicious nodes, that is, nodes in the system will not send malicious requests, such as spoofing requests, to other nodes. The most famous consensus algorithm is the Paxos algorithm. Followed by Raft and ZAB algorithms (implementation in Zookeeper)

Raft core algorithm

The core of Raft algorithm is election and log replication.

When multiple servers serve at the same time, how to synchronize the server changes becomes a problem. Generally, the master-slave model is adopted, that is, one master server (Leader) and multiple slave servers (Follower). All requests are processed by the Leader server, and the Follower server is only responsible for backing up the data. But suppose the Leader server goes down, which of the Follower servers becomes the new Leader server? Theoretically, the replica data of all Follower servers should be the same as that of Leader servers, but due to problems such as data delay and inconsistent sending order, the data owned by each Follower server may be different at some point. The resulting problems need to be dealt with from the following two aspects.

Use log writes instead of direct modifications to ensure that synchronization requests to the Follower server are orderly and can recalculate the current state, that is, the log state machine model.

When writing, more than half of the server writes are considered as overall success, that is, the Quorum mechanism.

Log state machine model

The current state of log indexing operation is 1X = 1 {Xburex 1} 2Y = 2 {XRO 1MagYRO Y} 2} 3X = 3 {XRO 3pm YRX 2} 4Z = 4 {XRO 3ML YRO 2JZ 4}

In the state machine model, logs are appended from top to bottom, and any point in time in the current state can be calculated from the log with index 1. With the state machine model, the problem of distributed consistency is transformed into the problem of how to ensure that all participating nodes are written in the same order.

Writing based on Quorum mechanism

In some master/slave modes, some master do not care about the progress of slave replication. Master is only responsible for constantly writing its own logs and synchronizing changes to the slave server through some means of transmission. In some strict full replication, the slave server continues to write when all master servers are synchronized. Master-slave replication loses data after the master server goes down, while full replication performs very poorly. In contrast, the Quorum mechanism of more than half writes can reduce the risk of data loss and the performance is not too poor.

Now suppose you have three servers, nodes A, B, and C. At this time, values are being written to the three servers, where the value of node An is 2 (the latest value), while the values of nodes B and C are both the old values of 1. At this time, when the client takes a value from the cluster, if the client reads the data of any two nodes, the data version read by the client may be as follows.

Node An and BRV 2 and 1

Node An and Cpurs 2 and 1

Node B and CRV 1 and 1

At this point, we can see that when reading B and C, the client does not read the latest data.

< 8) return; //标记ByteBuf的读取位置 in.markReaderIndex(); int messageType = in.readInt(); int payloadLength = in.readInt(); if (in.readableBytes() < payloadLength) { in.resetReaderIndex(); return; }byte[] payload = new byte[payloadLength]; in.readBytes(payload); switch (messageType) {case MessageConstants.MSG_TYPE_NODE_ID: out.add(new NodeId(new String(payload))); break; case MessageConstants.MSG_TYPE_REQUEST_VOTE_RPC: Protos.RequestVoteRpc protoRVRpc = Protos.RequestVoteRpc.parseFrom(payload); RequestVoteRpc rpc = new RequestVoteRpc(); rpc.setTerm(protoRVRpc.getTerm()); rpc.setCandidateId(new NodeId(protoRVRpc.getCandidateId())); rpc.setLastLogIndex(protoRVRpc.getLastLogIndex()); rpc.setLastLogTerm(protoRVRpc.getLastLogTerm()); out.add(rpc); break; case MessageConstants.MSG_TYPE_REQUEST_VOTE_RESULT: Protos.RequestVoteResult protoRVResult = Protos.RequestVoteResult.parseFrom(payload); out.add(new RequestVoteResult(protoRVResult.getTerm(), protoRVResult.getVoteGranted())); break; case MessageConstants.MSG_TYPE_APPEND_ENTRIES_RPC: Protos.AppendEntriesRpc protoAERpc = Protos.AppendEntriesRpc.parseFrom(payload); AppendEntriesRpc aeRpc = new AppendEntriesRpc(); aeRpc.setMessageId(protoAERpc.getMessageId()); aeRpc.setTerm(protoAERpc.getTerm()); aeRpc.setLeaderId(new NodeId(protoAERpc.getLeaderId())); aeRpc.setLeaderCommit(protoAERpc.getLeaderCommit()); aeRpc.setPrevLogIndex(protoAERpc.getPrevLogIndex()); aeRpc.setPrevLogTerm(protoAERpc.getPrevLogTerm()); aeRpc.setEntries(protoAERpc.getEntriesList().stream().map(e ->

< role.getTerm()) {log.debug("term from rpc < current term,don't vote ({} < {})", rpc.getTerm(),role.getTerm()); return new RequestVoteResult(role.getTerm(),false); }//决定投票 boolean voteForCandidate = true; //如果消息的term大于当前节点的term if (rpc.getTerm() >

Role.getTerm () {/ / convert the current node to a slave node and vote becomeFollower (rpc.getTerm (), voteForCandidate? Rpc.getCandidateId (): null,null,true); return new RequestVoteResult (rpc.getTerm (), voteForCandidate);} / / if the term of the message is equal to the term of the current node, the switch (role.getName ()) {case FOLLOWER: / / if the slave node FollowerNodeRole follower = (FollowerNodeRole) role is processed according to the role of the current node / / get the voting node Id (that is, one of the elected nodes Id) NodeId votedFor = follower.getVotedFor () / / look at the electoral node sender Id that has not voted or the message is the voting node Id if ((votedFor = = null & & voteForCandidate) | | Objects.equals (votedFor,rpc.getCandidateId () {/ / turn the current node into a slave node and record the voting node. And vote becomeFollower (role.getTerm (), rpc.getCandidateId (), null,true) for the sending node of the election Return new RequestVoteResult (rpc.getTerm (), true);} / / otherwise do not vote return new RequestVoteResult (rpc.getTerm (), false); case CANDIDATE: / / if it is an election node or master node, do not vote case LEADER:return new RequestVoteResult (role.getTerm (), false) Default:throw new IllegalArgumentException ("unexpected node role [" + role.getName () + "]") }} / * becomes the slave node * @ param term current term * @ param votedFor voted node Id * @ param leaderId master node Id * @ param scheduleElectionTimeout whether to run the timeout task * / private void becomeFollower (int term, NodeId votedFor,NodeId leaderId,boolean scheduleElectionTimeout) {/ / cancel the current node timeout task role.cancelTimeoutOrTask () / / if there is a master node, print out the master node and term if (leaderId! = null & &! leaderId.equals (role.getLeaderId (context.getSelfId () {log.info ("current leader is {}, term {}", leaderId,term);} / / whether to run the timeout task and get the timeout task ElectionTimeout electionTimeout = scheduleElectionTimeout? ScheduleElectionTimeout (): ElectionTimeout.NONE; / / converts the current node role to slave node changeToRole (new FollowerNodeRole (term,votedFor,leaderId,electionTimeout));} / * processing of receiving voting result messages * @ param result * / @ Subscribe public void onReceiveRequestVoteResult (RequestVoteResult result) {context.getTaskExecutor () .submit (()-> processRequestVoteResult (result) Asynchronous processing of voting result messages * @ param result * / private void processRequestVoteResult (RequestVoteResult result) {context.getTaskExecutor () .submit (()-> doProcessRequestVoteResult (result)) } / * Voting result message processing process * @ param result * / private void doProcessRequestVoteResult (RequestVoteResult result) {/ / if the term of the message is greater than the term / / of the current node, the current node degenerates to the slave node if (result.getTerm () > role.getTerm ()) {becomeFollower (result.getTerm (), null, null, true); return } / / if the current node is not an election node, end processing if (role.getName ()! = RoleName.CANDIDATE) {log.debug ("receive request vote result and current role is not candidate, ignore"); return;} / / if the message returns no vote, end processing if (! result.isVoteGranted ()) {return } / / add the number of votes of the current elected node to 1 int currentVotesCount = ((CandidateNodeRole) role). GetVotesCount () + 1; / / get the number of main members of the cluster int countOfMajor = context.getGroup (). GetCountOfMajor (); log.debug ("votes count {}, major node count {}", currentVotesCount, countOfMajor); / / cancel election timeout task role.cancelTimeoutOrTask () / / if the current election node has more than half of the election votes if (currentVotesCount > countOfMajor / 2) {log.info ("become leader, term {}", role.getTerm ()); / / resets the replication progress of other nodes resetReplicatingStates () / / convert the current node to the master node and start sending log replication messages or heartbeat messages changeToRole (new LeaderNodeRole (role.getTerm (), scheduleLogReplicationTask ()); context.getLog () .appendEntry (role.getTerm (); / / reset all inbound channels context.getConnector () .resetChannels () } else {/ / if the number of votes in the election is less than half, it will still be converted to the election node, and the election timeout task changeToRole (new CandidateNodeRole (role.getTerm (), currentVotesCount, scheduleElectionTimeout ()) will be enabled;}} / * reset the replication progress of other nodes * / private void resetReplicatingStates () {context.getGroup () .resetReplicatingStates (context.getLog () .getNextIndex () } / * requests for processing heartbeat messages * @ param rpcMessage * / @ Subscribe public void onReceiveAppendEntriesRpc (AppendEntriesRpcMessage rpcMessage) {context.getTaskExecutor () .submit (()-> context.getConnector () .replyAppendEntries (doProcessAppendEntriesRpc (rpcMessage), context.getGroup () .getMember (rpcMessage.getSourceNodeId ()) .getEndpoint () } / * * heartbeat message processing process * @ param rpcMessage * @ return * / private AppendEntriesResult doProcessAppendEntriesRpc (AppendEntriesRpcMessage rpcMessage) {AppendEntriesRpc rpc = rpcMessage.getRpc (); if (rpc.getTerm ())

< role.getTerm()) {return new AppendEntriesResult(rpc.getMessageId(),role.getTerm(),false); }if (rpc.getTerm() >

Role.getTerm () {becomeFollower (rpc.getTerm (), null,rpc.getLeaderId (), true); return new AppendEntriesResult (rpc.getMessageId (), rpc.getTerm (), appendEntries (rpc));} assert rpc.getTerm () = = role.getTerm (); switch (role.getName ()) {case FOLLOWER: becomeFollower (rpc.getTerm (), ((FollowerNodeRole) role). GetVotedFor (), rpc.getLeaderId (), true) Return new AppendEntriesResult (rpc.getMessageId (), rpc.getTerm (), appendEntries (rpc)); case CANDIDATE: becomeFollower (rpc.getTerm (), null,rpc.getLeaderId (), true); return new AppendEntriesResult (rpc.getMessageId (), rpc.getTerm (), appendEntries (rpc)); case LEADER:log.warn ("receive append entries rpc from another leader {}, ignore", rpc.getLeaderId ()) Return new AppendEntriesResult (rpc.getMessageId (), rpc.getTerm (), false); default:throw new IllegalArgumentException ("unexpected node role [" + role.getName () + "]");}} / * append log * @ param rpc * @ return * / private boolean appendEntries (AppendEntriesRpc rpc) {return true * @ param resultMessage * / @ Subscribe public void onReceiveAppendEntriesResult (AppendEntriesResultMessage resultMessage) {context.getTaskExecutor (). Submit (()-> doProcessAppendEntriesResult (resultMessage));} / * process of log replication message response * @ param resultMessage * / private void doProcessAppendEntriesResult (AppendEntriesResultMessage resultMessage) {AppendEntriesResult result = resultMessage.getResult () If (result.getTerm () > role.getTerm ()) {becomeFollower (result.getTerm (), null,null,true); return;} if (role.getName ()! = RoleName.LEADER) {log.warn ("receive append entries result from node {} but current" + "node is not leader,ignore", resultMessage.getSourceNodeId ());}

Consistent (core) component construction class

/ * consistent (core) component construction class * / public class NodeBuilder {private final NodeGroup group; private final NodeId selfId; private final EventBus eventBus; private Scheduler scheduler = null; private Connector connector = null; private TaskExecutor taskExecutor = null; private NodeStore nodeStore = null; private Log log = null; private NodeConfig config = new NodeConfig (); public NodeBuilder (Collection endpoints,NodeId selfId) {group = new NodeGroup (endpoints,selfId); this.selfId = selfId EventBus = new EventBus (selfId.getValue ());} public NodeBuilder (NodeEndpoint endpoint) {this (Collections.singletonList (endpoint), endpoint.getId ());} / * set communication component * @ param connector * @ return * / public NodeBuilder setConnector (Connector connector) {this.connector = connector; return this } / * set timer * @ param scheduler * @ return * / public NodeBuilder setScheduler (Scheduler scheduler) {this.scheduler = scheduler; return this;} / * set task executor * @ param taskExecutor * @ return * / public NodeBuilder setTaskExecutor (TaskExecutor taskExecutor) {this.taskExecutor = taskExecutor; return this } / * set memory * @ param nodeStore * @ return * / public NodeBuilder setNodeStore (NodeStore nodeStore) {this.nodeStore = nodeStore; return this;} / * set log * @ param log * @ return * / public NodeBuilder setLog (Log log) {this.log = log; return this } / * build Node instance * @ return * / public Node build () {return new NodeImpl (buildContext ()) } / * build context * @ return * / private NodeContext buildContext () {return NodeContext.builder () .group (group) .selfId (selfId) .eventBus (eventBus) .organizer (scheduler! = null? Scheduler: new DefaultScheduler (config)) .connector (connector) .taskExecutor (taskExecutor! = null? TaskExecutor: new SingleThreadTaskExecutor ("node") .store (nodeStore! = null? NodeStore: new FileNodeStore (new File (". / example/node.bin")) .log (log! = null? Log: new MemoryLog () .build ();}

We will add later in the Log section here.

Unit testing

Public class NodeImplTest {private NodeBuilder newNodeBuilder (NodeId selfId, NodeEndpoint...) Endpoints) {return new NodeBuilder (Arrays.asList (endpoints), selfId) .setScheduler (new NullScheduler ()) .setConnector (new MockConnector ()) .setTaskExecutor (new DirectTaskExecutor ()) .setNodeStore (new MemoryNodeStore ()) } / * start test * / @ Test public void testStart () {NodeImpl node = (NodeImpl) newNodeBuilder (NodeId.of ("A"), new NodeEndpoint (new NodeId ("A"), "localhost", 2333)) .build (); node.start (); FollowerNodeRole role = (FollowerNodeRole) node.getRole () AssertEquals (role.getTerm (), 0); assertEquals (role.getVotedFor (), null) } / * Test received election result message * / @ Test public void testOnReceiveRequestVoteResult () {NodeImpl node = (NodeImpl) newNodeBuilder (NodeId.of ("A"), new NodeEndpoint (new NodeId ("A"), "localhost", 2333), new NodeEndpoint (new NodeId ("B"), "localhost", 2334), new NodeEndpoint (new NodeId ("C") "localhost", 2335) .build () Node.start (); node.electionTimeout (); node.onReceiveRequestVoteResult (new RequestVoteResult (1) true); LeaderNodeRole role = (LeaderNodeRole) node.getRole (); assertEquals (role.getTerm (), 1) } / * Test log replication * / @ Test public void testReplicateLog () {NodeImpl node = (NodeImpl) newNodeBuilder (NodeId.of ("A"), new NodeEndpoint (new NodeId ("A"), "localhost", 2333), new NodeEndpoint (new NodeId ("B"), "localhost", 2334), new NodeEndpoint (new NodeId ("C"), "localhost" 2335) .build () Node.start (); node.electionTimeout (); node.onReceiveRequestVoteResult (new RequestVoteResult); node.replicateLog (); MockConnector mockConnector = (MockConnector) node.getContext (). GetConnector (); assertEquals (mockConnector.getMessageCount (), 3); List messages = mockConnector.getMessages () Set destinationNodeIds = messages.subList (1BI 3). Stream () .map (MockConnector.Message::getDestinationNodeId) .map (Collectors.toSet ()); assertEquals (destinationNodeIds.size (), 2); assertTrue (destinationNodeIds.contains (NodeId.of ("B"); assertTrue (destinationNodeIds.contains (NodeId.of ("C"); AppendEntriesRpc rpc = (AppendEntriesRpc) messages.get (2). GetRpc () AssertEquals (rpc.getTerm (), 1) } / * Test the handling of replication log request messages * / @ Test public void testOnReceiveAppendEntriesRpcFollower () {NodeImpl node = (NodeImpl) newNodeBuilder (NodeId.of ("A"), new NodeEndpoint (new NodeId ("A"), "localhost", 2333), new NodeEndpoint (new NodeId ("B"), "localhost", 2334) New NodeEndpoint (new NodeId ("C"), localhost, 2335) .build () Node.start (); AppendEntriesRpc rpc = new AppendEntriesRpc (); rpc.setTerm (1); rpc.setLeaderId (NodeId.of ("B")); node.onReceiveAppendEntriesRpc (new AppendEntriesRpcMessage (rpc,NodeId.of ("B"), null)); MockConnector connector = (MockConnector) node.getContext (). GetConnector (); AppendEntriesResult result = (AppendEntriesResult) connector.getResult (); assertEquals (result.getTerm (), 1) AssertTrue (result.isSuccess ()); FollowerNodeRole role = (FollowerNodeRole) node.getRole (); assertEquals (role.getTerm (), 1); assertEquals (NodeId.of ("B"), role.getLeaderId ()) } / * processing of test log replication reply messages * / @ Test public void testOnReceiveAppendEntriesNormal () {NodeImpl node = (NodeImpl) newNodeBuilder (NodeId.of ("A"), new NodeEndpoint (new NodeId ("A"), "localhost", 2333), new NodeEndpoint (new NodeId ("B"), "localhost", 2334) New NodeEndpoint (new NodeId ("C"), localhost, 2335) .build () Node.start (); node.electionTimeout (); node.onReceiveRequestVoteResult (new RequestVoteResult (1 Magne true)); node.replicateLog (); node.onReceiveAppendEntriesResult (new AppendEntriesResultMessage (new AppendEntriesResult ("", 1 Magi true), NodeId.of ("B"), new AppendEntriesRpc ());}}

The testStart results are as follows

The testOnReceiveRequestVoteResult results are as follows

The testReplicateLog results are as follows

The testOnReceiveAppendEntriesRpcFollower results are as follows

The testOnReceiveAppendEntriesNormal results are as follows

Log implementation

Log has always been a very important basic component in distributed consistency algorithms, whether in the Paxos algorithm, which is compared with the Raft algorithm, or in the Paxos variant algorithm. The log system required by these algorithms is different from the general database WAL (Write-Ahead Log), that is, the log system that only appends logs, and the logs written in operation may be discarded or overwritten because of conflicts. The log does not care about what the upper-level service is, and the content stored in the log has nothing to do with the service. You can convert a request for a service into a general storage method, such as binary storage.

Log entry interface

/ * log entry * / public interface Entry {/ / log entry type int KIND_NO_OP = 0; / / the first blank log int KIND_GENERAL added to the elected new Leader node is 1; / / ordinary log entry, log generated by upper layer service / * acquisition type * @ return * / int getKind () / * get index * @ return * / int getIndex (); / * get term * @ return * / int getTerm (); / * get meta information (kind,term and index) * @ return * / EntryMeta getMeta () / * get log load * @ return * / byte [] getCommandBytes ();} / * log entry meta-information * / @ AllArgsConstructor@Getterpublic class EntryMeta {private final int kind; private final int index; private final int term;}

Log entry abstract class

/ * * log entry abstract class * / @ AllArgsConstructorpublic abstract class AbstractEntry implements Entry {/ / log type private final int kind; / / log index protected final int index; protected final int term; @ Override public int getKind () {return this.kind;} @ Override public int getIndex () {return index;} @ Override public int getTerm () {return term;} @ Override public EntryMeta getMeta () {return new EntryMeta (kind, index, term) }}

Normal log entry

/ * General log entry * / public class GeneralEntry extends AbstractEntry {/ / Log payload private final byte [] commandBytes; public GeneralEntry (int index, int term, byte [] commandBytes) {super (KIND_GENERAL, index, term); this.commandBytes = commandBytes;} @ Override public byte [] getCommandBytes () {return this.commandBytes;} @ Override public String toString () {return "GeneralEntry {" + "index=" + index + ", term=" + term +'}' }}

Empty log entry

/ * empty log entry * / public class NoOpEntry extends AbstractEntry {public NoOpEntry (int index, int term) {super (KIND_NO_OP, index, term);} @ Override public byte [] getCommandBytes () {return new byte [0];} @ Override public String toString () {return "NoOpEntry {" + "index=" + index + ", term=" + term +'};}

Journal

Log interface

/ * log * / public interface Log {/ / the total number of entries int ALL_ENTRIES =-1; / * get the meta-information of the last log * generally used to select the beginning, when sending a message * / EntryMeta getLastEntryMeta () / * create AppendEntries message * when Leader sends log replication message to Follower * * @ param term current term * @ param selfId from node Id * @ param nextIndex next index * @ param maxEntries maximum entries * / AppendEntriesRpc createAppendEntriesRpc (int term, NodeId selfId, int nextIndex, int maxEntries) / * get the next log index * / int getNextIndex (); / * get the currently submitted index * / int getCommitIndex () / * determine whether the lastLogIndex and LastLogTerm of the object are newer than your own * * @ param lastLogIndex last log index * @ param lastLogTerm last log term * / boolean isNewerThan (int lastLogIndex, int lastLogTerm) / * add an empty log entry * the first empty log after the upper service operation or the current node becomes Leader * * @ param term * / NoOpEntry appendEntry (int term); / * * add an ordinary log entry * * @ param term * @ param command * / GeneralEntry appendEntry (int term, byte [] log) / * * append log entries from Leader * when receiving a log replication request from a Leader server * * @ param prevLogIndex log entry previous index * @ param prevLogTerm log replication previous term * @ param entries log entry collection * @ return true if success, false if previous log check failed * / boolean appendEntriesFromLeader (int prevLogIndex, int prevLogTerm) List entries) / * * push commitIndex * when receiving a log replication request from a Leader server * * @ param newCommitIndex the new commitIndex * @ param currentTerm current term * / void advanceCommitIndex (int newCommitIndex, int currentTerm); / * * close * / void close ();}

Log entry sequence

/ * Log entry sequence * / public interface EntrySequence {/ * determine whether it is empty * @ return * / boolean isEmpty (); / * get the index of the first log * @ return * / int getFirstLogIndex (); / * get the index of the last log * @ return * / int getLastLogIndex () / * get the index of the next log * @ return * / int getNextLogIndex (); / * get the child view of the sequence, to the last log * @ param fromIndex * @ return * / List subView (int fromIndex) / * get the child view of the sequence, and specify the range [fromIndex, toIndex) * @ param fromIndex * @ param toIndex * @ return * / List subList (int fromIndex, int toIndex); / * check whether a log entry exists * @ param index * @ return * / boolean isEntryPresent (int index) / * get meta information of a log entry * @ param index * @ return * / EntryMeta getEntryMeta (int index); / * get a log entry * @ param index * @ return * / Entry getEntry (int index); / * get the last log entry * @ log * / Entry getLastEntry () / * append log entries * @ param entry * / void append (Entry entry); / * append multiple logs * @ param entries * / void append (List entries); / * * promote commitIndex * @ param index * / void commit (int index) / * get the current commitIndex * @ return * / int getCommitIndex (); / * remove the log entry * @ param index * / void removeAfter (int index) after an index is removed; / * close the log sequence * / void close ();}

Log entry sequence abstract class

/ * Log entry sequence Abstract Class * / public abstract class AbstractEntrySequence implements EntrySequence {/ / Log Index offset protected int logIndexOffset; / / Index of the next log protected int nextLogIndex; public AbstractEntrySequence (int logIndexOffset) {this.logIndexOffset = logIndexOffset; this.nextLogIndex = logIndexOffset } / * Log index offset when the current log entry sequence does not start at 1, regardless of whether the first log exists * initially: log index offset = index of the next log = 1 * @ return * / @ Override public boolean isEmpty () {return logIndexOffset = = nextLogIndex;} @ Override public int getFirstLogIndex () {if (isEmpty ()) {throw new EmptySequenceException () } return doGetFirstLogIndex ();} / * get log index offset * @ return * / protected int doGetFirstLogIndex () {return logIndexOffset;} @ Override public int getLastLogIndex () {if (isEmpty ()) {throw new EmptySequenceException ();} return doGetLastLogIndex () } / * get the index of the last log * @ return * / protected int doGetLastLogIndex () {return nextLogIndex-1;} @ Override public boolean isEntryPresent (int index) {return! isEmpty () & & index > = doGetFirstLogIndex () & & index doGetLastLogIndex () {return Collections.emptyList ();} return subList (fromIndex, doGetFirstLogIndex (), nextLogIndex) } / / [fromIndex, toIndex) @ Override public List subList (int fromIndex, int toIndex) {if (isEmpty ()) {throw new EmptySequenceException ();} if (fromIndex)

< doGetFirstLogIndex() || toIndex >

DoGetLastLogIndex () + 1 | | fromIndex > toIndex) {throw new IllegalArgumentException ("illegal from index" + fromIndex + "or to index" + toIndex);} return doSubList (fromIndex, toIndex);} protected abstract List doSubList (int fromIndex, int toIndex); @ Override public int getNextLogIndex () {return nextLogIndex;} @ Override public void append (List entries) {for (Entry entry: entries) {append (entry) } @ Override public void append (Entry entry) {if (entry.getIndex ()! = nextLogIndex) {throw new IllegalArgumentException ("entry index must be" + nextLogIndex);} doAppend (entry); nextLogIndex++;} protected abstract void doAppend (Entry entry); @ Override public void removeAfter (int index) {if (isEmpty () | index > = doGetLastLogIndex ()) {return;} doRemoveAfter (index) } protected abstract void doRemoveAfter (int index);}

Log entry sequence based on memory implementation

/ * * memory-based log entry sequence * / public class MemoryEntrySequence extends AbstractEntrySequence {private final List entries = new ArrayList (); private int commitIndex = 0; public MemoryEntrySequence () {this (1);} public MemoryEntrySequence (int logIndexOffset) {super (logIndexOffset);} @ Override protected List doSubList (int fromIndex, int toIndex) {return entries.subList (fromIndex-logIndexOffset, toIndex-logIndexOffset);} @ Override protected Entry doGetEntry (int index) {return entries.get (index-logIndexOffset) @ Override protected void doAppend (Entry entry) {entries.add (entry);} @ Override public void commit (int index) {commitIndex = index;} @ Override public int getCommitIndex () {return commitIndex;} @ Override protected void doRemoveAfter (int index) {if (index)

< doGetFirstLogIndex()) {entries.clear(); nextLogIndex = logIndexOffset; } else {entries.subList(index - logIndexOffset + 1, entries.size()).clear(); nextLogIndex = index + 1; } }@Override public void close() { }@Override public String toString() {return "MemoryEntrySequence{" +"logIndexOffset=" + logIndexOffset +", nextLogIndex=" + nextLogIndex +", entries.size=" + entries.size() +'}'; }} 基于文件实现的日志条目序列 日志条目文件结构 日志条目文件按照记录行的方式组织文件。每一行的内容有日志类型(4个字节),日志索引(4个字节),日志term(4个字节),命令长度(4个字节)和具体的命令内容(变长) /** * 日志条目文件 */@AllArgsConstructorpublic class EntriesFile {//可定位文件 private final SeekableFile seekableFile; public EntriesFile(File file) throws FileNotFoundException {this(new RandomAccessFileAdapter(file)); }/** * 追加日志条目 * @param entry * @return * @throws IOException */ public long appendEntry(Entry entry) throws IOException {long offset = seekableFile.size(); seekableFile.seek(offset); seekableFile.writeInt(entry.getKind()); seekableFile.writeInt(entry.getIndex()); seekableFile.writeInt(entry.getTerm()); byte[] commandBytes = entry.getCommandBytes(); seekableFile.writeInt(commandBytes.length); seekableFile.write(commandBytes); return offset; }/** * 从指定偏移加载日志条目 * @param offset * @param factory * @return * @throws IOException */ public Entry loadEntry(long offset, EntryFactory factory) throws IOException {if (offset >

SeekableFile.size () {throw new IllegalArgumentException ("offset > size");} seekableFile.seek (offset); int kind = seekableFile.readInt (); int index = seekableFile.readInt (); int term = seekableFile.readInt (); int length = seekableFile.readInt (); byte [] bytes = new byte [length]; seekableFile.read (bytes); return factory.create (kind, index, term, bytes) } public long size () throws IOException {return seekableFile.size ();} public void clear () throws IOException {truncate (0L);} public void truncate (long offset) throws IOException {seekableFile.truncate (offset);} public void close () throws IOException {seekableFile.close () }} / * log entry factory * / public class EntryFactory {/ * create log entry object * @ param kind * @ param index * @ param term * @ param commandBytes * @ return * / public Entry create (int kind, int index, int term, byte [] commandBytes) {switch (kind) {case Entry.KIND_NO_OP:return new NoOpEntry (index, term) Case Entry.KIND_GENERAL:return new GeneralEntry (index, term, commandBytes); default:throw new IllegalArgumentException ("unexpected entry kind" + kind);}

Log entry index file

EntryIndexFile begins with the start index and the end index. Next is the meta-information of the log entry, which is not included in the log index, which can be obtained by calculation. For example, the index of the first log entry meta-information is minEntryIndex, the next is minEntryIndex+1, and the last log entry meta-information index is maxEntryIndex.

/ * log entry index file * / public class EntryIndexFile implements Iterable {/ / offset of maximum entry index private static final long OFFSET_MAX_ENTRY_INDEX = Integer.BYTES; / / length of meta-information of single log entry private static final int LENGTH_ENTRY_INDEX_ITEM = 16; / / locatable file private final SeekableFile seekableFile; / / number of log entries @ Getter private int entryIndexCount / / minimum log index private int minEntryIndex; / / maximum log index private int maxEntryIndex; / / Log entry container private Map entryIndexMap = new HashMap (); public EntryIndexFile (File file) throws IOException {this (new RandomAccessFileAdapter (file));} public EntryIndexFile (SeekableFile seekableFile) throws IOException {this.seekableFile = seekableFile; load () } / * load all log meta information * @ throws IOException * / private void load () throws IOException {if (seekableFile.size () = = 0L) {entryIndexCount = 0; return;} minEntryIndex = seekableFile.readInt (); maxEntryIndex = seekableFile.readInt (); updateEntryIndexCount (); / / load log meta information into container long offset; int kind one by one Int term; for (int I = minEntryIndex; I = maxEntryIndex) {return;} / / determine whether the new maxEntryIndex is smaller than minEntryIndex / / if so, remove all if (newMaxEntryIndex)

< minEntryIndex) { clear(); return; }//修改maxEntryIndex seekableFile.seek(OFFSET_MAX_ENTRY_INDEX); seekableFile.writeInt(newMaxEntryIndex); //裁剪文件 seekableFile.truncate(getOffsetOfEntryIndexItem(newMaxEntryIndex + 1)); //移除容器中的元信息 for (int i = newMaxEntryIndex + 1; i maxEntryIndex) {throw new IllegalArgumentException("index < min or index >

Max ");} return entryIndexMap.get (entryIndex);} / * traverse all log entry meta-information in the file * @ return * / @ Override public Iterator iterator () {if (isEmpty ()) {return Collections.emptyIterator ();} return new EntryIndexIterator (entryIndexCount, minEntryIndex);} public void close () throws IOException {seekableFile.close () } / * Log entry index iterator * / @ AllArgsConstructor private class EntryIndexIterator implements Iterator {/ / Total number of entries private final int entryIndexCount; / / current index private int currentEntryIndex; / * whether there is a next * @ return * / @ Override public boolean hasNext () {checkModification () Return currentEntryIndex = toIndex) {break;} if (index > = fromIndex) {result.add (entry);} return result } / * get the log entry * @ param index * @ return * / @ Override protected Entry doGetEntry (int index) {if (! pendingEntries.isEmpty ()) {int firstPendingEntryIndex = pendingEntries.getFirst (). GetIndex (); if (index > = firstPendingEntryIndex) {return pendingEntries.get (index-firstPendingEntryIndex);}} assert! entryIndexFile.isEmpty (); return getEntryInFile (index) } / * get log meta information * @ param index * @ return * / @ Override public EntryMeta getEntryMeta (int index) {if (! isEntryPresent (index)) {return null;} if (entryIndexFile.isEmpty ()) {return pendingEntries.get (index-doGetFirstLogIndex ()). GetMeta ();} return entryIndexFile.get (index). ToEntryMeta () } / * obtain log entries in the file by index * @ param index * @ return * / private Entry getEntryInFile (int index) {long offset = entryIndexFile.getOffset (index); try {return entriesFile.loadEntry (offset, entryFactory);} catch (IOException e) {throw new LogException ("failed to load entry" + index, e) }} / * get the last log * @ return * / @ Override public Entry getLastEntry () {if (isEmpty ()) {return null;} if (! pendingEntries.isEmpty ()) {return pendingEntries.getLast ();} assert! entryIndexFile.isEmpty (); return getEntryInFile (entryIndexFile.getMaxEntryIndex ()) } / * append log entries * @ param entry * / @ Override protected void doAppend (Entry entry) {pendingEntries.add (entry);} / * submit commitIndex * @ param index * / @ Override public void commit (int index) {/ / check commitIndex if (index)

< commitIndex) {throw new IllegalArgumentException("commit index < " + commitIndex); }if (index == commitIndex) {return; }if (pendingEntries.isEmpty() || pendingEntries.getLast().getIndex() < index) {throw new IllegalArgumentException("no entry to commit or commit index exceed"); }long offset; Entry entry = null; try {for (int i = commitIndex + 1; i = pendingEntries.getFirst().getIndex() - 1) {//移除指定数目的日志条目 //循环方向是从小到大,但是移除是从后往前 //最终移除指定数量的日志条目 for (int i = index + 1; i = doGetFirstLogIndex()) {//索引比日志缓冲中的第一条日志小 pendingEntries.clear(); entriesFile.truncate(entryIndexFile.getOffset(index + 1)); entryIndexFile.removeAfter(index); nextLogIndex = index + 1; commitIndex = index; } else {//如果索引比第一条日志的索引都小,则清除所有数据 pendingEntries.clear(); entriesFile.clear(); entryIndexFile.clear(); nextLogIndex = logIndexOffset; commitIndex = logIndexOffset - 1; } } catch (IOException e) {throw new LogException(e); } }/** * 关闭文件序列 */ @Override public void close() {try {entriesFile.close(); entryIndexFile.close(); } catch (IOException e) {throw new LogException("failed to close", e); } }} 日志实现 日志抽象类 /** * 日志抽象类 */@Slf4jpublic abstract class AbstractLog implements Log {//日志条目序列 protected EntrySequence entrySequence; @Override public EntryMeta getLastEntryMeta() {if (entrySequence.isEmpty()) {return new EntryMeta(Entry.KIND_NO_OP, 0,0); }return entrySequence.getLastEntry().getMeta(); }@Override public AppendEntriesRpc createAppendEntriesRpc(int term, NodeId selfId, int nextIndex, int maxEntries) {int nextLogIndex = entrySequence.getNextLogIndex(); if (nextIndex >

NextLogIndex) {throw new IllegalArgumentException ("illegal next index" + nextIndex);} AppendEntriesRpc rpc = new AppendEntriesRpc (); rpc.setMessageId (UUID.randomUUID (). ToString ()); rpc.setTerm (term); rpc.setLeaderId (selfId); rpc.setLeaderCommit (entrySequence.getCommitIndex ()); Entry entry = entrySequence.getEntry (nextIndex-1) If (entry! = null) {rpc.setPrevLogIndex (entry.getIndex ()); rpc.setPrevLogTerm (entry.getTerm ());} if (! entrySequence.isEmpty ()) {int maxIndex = (maxEntries = = ALL_ENTRIES? NextLogIndex: Math.min (nextLogIndex, nextIndex + maxEntries); rpc.setEntries (entrySequence.subList (nextIndex, maxIndex));} return rpc;} @ Override public int getNextIndex () {return entrySequence.getNextLogIndex ();} @ Override public int getCommitIndex () {return entrySequence.getCommitIndex ();} @ Override public boolean isNewerThan (int lastLogIndex, int lastLogTerm) {EntryMeta lastEntryMeta = getLastEntryMeta () Log.debug ("last entry ({}, {}), candidate ({}, {})", lastEntryMeta.getIndex (), lastEntryMeta.getTerm (), lastLogIndex, lastLogTerm); return lastEntryMeta.getTerm () > lastLogTerm | | lastEntryMeta.getIndex () > lastLogIndex;} @ Override public NoOpEntry appendEntry (int term) {NoOpEntry entry = new NoOpEntry (entrySequence.getNextLogIndex (), term); entrySequence.append (entry); return entry } @ Override public GeneralEntry appendEntry (int term, byte [] command) {GeneralEntry entry = new GeneralEntry (entrySequence.getNextLogIndex (), term, command); entrySequence.append (entry); return entry;} / * * append log entries from the master node * inconsistent log entries need to be removed before appending. Start with the last matching log entry when removing * then all conflicting log entries will be removed * @ param prevLogIndex log entry's previous index * @ param prevLogTerm log copy's previous term * @ param leaderEntries * @ return * / @ Override public boolean appendEntriesFromLeader (int prevLogIndex, int prevLogTerm, List leaderEntries) {/ / check whether the previous log matches if (! checkIfPreviousLogMatches (prevLogIndex, prevLogTerm)) {return false } / / the log entry passed by the Leader node is empty if (leaderEntries.isEmpty ()) {return true;} assert prevLogIndex + 1 = = leaderEntries.get (0). GetIndex (); / / remove conflicting log entries and return the next log entry to be appended (if any) EntrySequenceView newEntries = removeUnmatchedLog (new EntrySequenceView (leaderEntries)); / / append only log appendEntriesFromLeader (newEntries) Return true;} / * append all logs * @ param leaderEntries * / private void appendEntriesFromLeader (EntrySequenceView leaderEntries) {if (leaderEntries.isEmpty ()) {return;} log.debug ("append entries from leader from {} to {}", leaderEntries.getFirstLogIndex (), leaderEntries.getLastLogIndex ()); Iterator leaderEntriesIterator = leaderEntries.iterator (); while (leaderEntriesIterator.hasNext ()) {entrySequence.append (leaderEntriesIterator.next ()) }} / * remove conflicting log entries * @ param leaderEntries * @ return * / private EntrySequenceView removeUnmatchedLog (EntrySequenceView leaderEntries) {/ / entries from Leader node should not be empty assert! leaderEntries.isEmpty (); / / find the first mismatched log index int firstUnmatched = findFirstUnmatchedLog (leaderEntries); / / there is no mismatched log if (firstUnmatched)

< 0) {return new EntrySequenceView(Collections.emptyList()); }//移除不匹配的日志索引开始的所有日志 removeEntriesAfter(firstUnmatched - 1); //返回之后追加的日志条目 return leaderEntries.subView(firstUnmatched); }/** * 查找第一条不匹配的日志 * @param leaderEntries * @return */ private int findFirstUnmatchedLog(EntrySequenceView leaderEntries) {//Leader节点过来的entries不应该为空 assert !leaderEntries.isEmpty(); int logIndex; EntryMeta followerEntryMeta; Iterator entryIterator = leaderEntries.iterator(); while (entryIterator.hasNext()) { Entry leaderEntry = entryIterator.next(); logIndex = leaderEntry.getIndex(); //按照索引查找日志条目信息 followerEntryMeta = entrySequence.getEntryMeta(logIndex); //日志不存在或者term不一致 if (followerEntryMeta == null || followerEntryMeta.getTerm() != leaderEntry.getTerm()) {return logIndex; } }return -1; }/** * 检查前一条日志是否匹配 * @param prevLogIndex * @param prevLogTerm * @return */ private boolean checkIfPreviousLogMatches(int prevLogIndex, int prevLogTerm) {//检查指定索引的日志条目 Entry entry = entrySequence.getEntry(prevLogIndex); //日志不存在 if (entry == null) {log.debug("previous log {} not found", prevLogIndex); return false; }int term = entry.getTerm(); if (term != prevLogTerm) {log.debug("different term of previous log, local {}, remote {}", term, prevLogTerm); return false; }return true; }/** * 移除不匹配的索引之后的日志条目 * @param index */ private void removeEntriesAfter(int index) {if (entrySequence.isEmpty() || index >

= entrySequence.getLastLogIndex () {return;} log.debug ("remove entries after {}", index); entrySequence.removeAfter (index);} / * * promote commitIndex * @ param newCommitIndex's new commitIndex * @ param currentTerm current term * / @ Override public void advanceCommitIndex (int newCommitIndex, int currentTerm) {if (! validateNewCommitIndex (newCommitIndex, currentTerm)) {return } log.debug ("advance commit index from {} to {}", entrySequence.getCommitIndex (), newCommitIndex); entrySequence.commit (newCommitIndex);} / * check the new commitIndex * @ param newCommitIndex * @ param currentTerm * @ return * / private boolean validateNewCommitIndex (int newCommitIndex, int currentTerm) {/ / less than the current commitIndex if (newCommitIndex lastLogIndex) {return null } return entries.get (index-firstLogIndex);} boolean isEmpty () {return entries.isEmpty ();} EntrySequenceView subView (int fromIndex) {if (entries.isEmpty () | | fromIndex > lastLogIndex) {return new EntrySequenceView (Collections.emptyList ());} return new EntrySequenceView (entries.subList (fromIndex-firstLogIndex, entries.size () } @ Override public Iterator iterator () {return entries.iterator ();}

Memory-based log

/ * memory-based log * / public class MemoryLog extends AbstractLog {public MemoryLog (EntrySequence entrySequence) {this.entrySequence = entrySequence;} public MemoryLog () {this (new MemoryEntrySequence ());}}

File-based log

/ * Abstract file address * / @ AllArgsConstructorpublic abstract class AbstractLogDir implements LogDir {protected final File dir; @ Override public void initialize () {if (! dir.exists () & &! dir.mkdir ()) {throw new LogException ("failed to create directory" + dir);} try {Files.touch (getEntriesFile ()); Files.touch (getEntryOffsetIndexFile ()) } catch (IOException e) {throw new LogException ("failed to create file", e);} @ Override public boolean exists () {return dir.exists ();} @ Override public File getEntriesFile () {return new File (dir, RootDir.FILE_NAME_ENTRIES);} @ Override public File getEntryOffsetIndexFile () {return new File (dir, RootDir.FILE_NAME_ENTRY_OFFSET_INDEX);} @ Override public File get () {return dir } @ Override public boolean renameTo (LogDir logDir) {return dir.renameTo (logDir.get ()) }} / * log generation * log-root * |-log-1 * | |-entries.bin * | /-entries.idx * /-log-100 * |-entries.bin * /-entries.idx * above log-1 and log-100 are two log generations The number is the log index offset lastIncludedIndex * / public class LogGeneration extends AbstractLogDir implements Comparable {/ / prefix regular match private static final Pattern DIR_NAME_PATTERN = Pattern.compile ("log- (\\ d +)") / / latest log generation index offset @ Getter private final int lastIncludedIndex; public LogGeneration (File baseDir, int lastIncludedIndex) {super (new File (baseDir, generateDirName (lastIncludedIndex)); this.lastIncludedIndex = lastIncludedIndex;} public LogGeneration (File dir) {super (dir); Matcher matcher = DIR_NAME_PATTERN.matcher (dir.getName ()) If (! matcher.matches ()) {throw new IllegalArgumentException ("not a directory name of log generation, [" + dir.getName () + "]");} lastIncludedIndex = Integer.parseInt (matcher.group (1));} / * whether the log address is valid * @ param dirName * @ return * / public static boolean isValidDirName (String dirName) {return DIR_NAME_PATTERN.matcher (dirName). Matches () } / * get the log name * @ param lastIncludedIndex * @ return * / private static String generateDirName (int lastIncludedIndex) {return "log-" + lastIncludedIndex;} / * compare the size of the log generation * @ param o * @ return * / @ Override public int compareTo (LogGeneration o) {return Integer.compare (lastIncludedIndex, o.lastIncludedIndex) }} / * normal log address * / @ ToStringpublic class NormalLogDir extends AbstractLogDir {public NormalLogDir (File dir) {super (dir);}} / * root directory * / @ Slf4jpublic class RootDir {/ / log entry file name public static final String FILE_NAME_ENTRIES = "entries.bin"; / / log index entry file name public static final String FILE_NAME_ENTRY_OFFSET_INDEX = "entries.idx" / / subdirectory name private static final String DIR_NAME_GENERATING = "generating"; / / root directory private final File baseDir; public RootDir (File baseDir) {if (! baseDir.exists ()) {throw new IllegalArgumentException ("dir" + baseDir + "not exists");} this.baseDir = baseDir;} public LogDir getLogDirForGenerating () {return getOrCreateNormalLogDir (DIR_NAME_GENERATING) } / * obtain or create a normal log address * @ param name * @ return * / private NormalLogDir getOrCreateNormalLogDir (String name) {NormalLogDir logDir = new NormalLogDir (new File (baseDir, name)); if (! logDir.exists ()) {logDir.initialize ();} return logDir } / * rename log generation * @ param dir * @ param lastIncludedIndex * @ return * / public LogDir rename (LogDir dir, int lastIncludedIndex) {LogGeneration destDir = new LogGeneration (baseDir, lastIncludedIndex); if (destDir.exists ()) {throw new IllegalStateException ("failed to rename, dest dir" + destDir + "exists");} log.info ("rename dir {} to {}", dir, destDir) If (! dir.renameTo (destDir)) {throw new IllegalStateException ("failed to rename" + dir + "to" + destDir);} return destDir;} / * create the first log generation * @ return * / public LogGeneration createFirstGeneration () {LogGeneration generation = new LogGeneration (baseDir, 0); generation.initialize (); return generation } / * get the latest log generation * @ return * / public LogGeneration getLatestGeneration () {File [] files = baseDir.listFiles (); if (files = = null) {return null;} LogGeneration latest = null; String fileName; LogGeneration generation; for (File file: files) {if (! file.isDirectory ()) {continue } fileName = file.getName (); if (DIR_NAME_GENERATING.equals (fileName) | |! LogGeneration.isValidDirName (fileName)) {continue;} generation = new LogGeneration (file); if (latest = = null | | generation.compareTo (latest) > 0) {latest = generation;}} return latest }} / * File based logs * / public class FileLog extends AbstractLog {private final RootDir rootDir; public FileLog (File baseDir) {rootDir = new RootDir (baseDir); LogGeneration latestGeneration = rootDir.getLatestGeneration (); if (latestGeneration! = null) {entrySequence = new FileEntrySequence (latestGeneration, latestGeneration.getLastIncludedIndex ());} else {LogGeneration firstGeneration = rootDir.createFirstGeneration () EntrySequence = new FileEntrySequence (firstGeneration,1);} "how to implement the Raft distributed consistency algorithm" ends here. Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report