Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Implementation of Fast Election FastLeaderElection in zk

2025-01-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

The main content of this article is to explain "the implementation of fast election FastLeaderElection in zk". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Now let the editor to take you to learn "the implementation of fast election FastLeaderElection in zk"!

Elections involve concepts

Server statu

vote

How to choose to vote?

Agreement

Election

How to conduct the election?

Epoch

Sender

Recipient

Send queue

Receiving queue

Server statu

Public enum ServerState {

LOOKING, look for the Leader status. When the service is in this state, there is no Leader in the current cluster, so you need to enter the Leader election.

FOLLOWING, follower status, indicates that it is currently Follower

LEADING, leader status, indicates that it is currently Leader

OBSERVING, observer

}

Vote Voting

Id

Recommended sid for leader

Zxid

Recommended leader transaction id

ElectionEpoch

To determine whether multiple votes are in the same round of election cycle, the server is a word increment sequence, and after entering the new round of voting, add 1 to the value.

PeerEpoch

Recommended epoch for leader

State

Current server status

Inner class

There is a Messenger ToSend Notification class

Notifications lets other nodes know that the vote of the specified node has changed, possibly because there is a higher zxid in the node election or vote or the same zxid has a higher serverid.

The ToSend class is used to wrap the sent information

Messenger is divided into

WorkerReceiver and WorkerSender

Mainly complete the setting of the information of these two objects

LinkedBlockingQueue sendqueue

LinkedBlockingQueue recvqueue

Public Vote lookForLeader () throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean (); MBeanRegistry.getInstance (). Register (self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn ("Failed to register with JMX", e); self.jmxLeaderElectionBean = null;} self.start_fle = Time.currentElapsedTime (); try {Map recvset = new HashMap (); Map outofelection = new HashMap () Int notTimeout = minNotificationInterval; synchronized (this) {logicalclock.incrementAndGet (); updateProposal (getInitId (), getInitLastLoggedZxid (), getPeerEpoch ());} LOG.info ("New election. My id= "+ self.getId () +", proposed zxid=0x "+ Long.toHexString (proposedZxid)); sendNotifications (); SyncedLearnerTracker voteSet / * * Loop in which we exchange notifications until we find a leader * / while ((self.getPeerState () = = ServerState.LOOKING) & & (! stop)) {/ * * Remove next notification from queue, times out after 2 times * the termination time * / Notification n = recvqueue.poll (notTimeout, TimeUnit.MILLISECONDS) / * Sends more notifications if haven't received enough. * Otherwise processes new notification. * / if (n = = null) {if (manager.haveDelivered ()) {sendNotifications ();} else {manager.connectAll ();} / * * Exponential backoff * / int tmpTimeOut = notTimeout * 2 NotTimeout = (tmpTimeOut

< maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1"); break; } if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); break; } // If notification >

Current, replace and send messages out if (n.electionEpoch > logicalclock.get ()) {logicalclock.set (n.electionEpoch); recvset.clear () If (totalOrderPredicate (n.leader, n.zxid, n.peerEpoch, getInitId (), getInitLastLoggedZxid (), getPeerEpoch ()) {updateProposal (n.leader, n.zxid, n.peerEpoch);} else {updateProposal (getInitId (), getInitLastLoggedZxid (), getPeerEpoch () } sendNotifications ();} else if (n.electionEpoch

< logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); if (voteSet.hasAllQuorums()) { // Verify if there is any change in the proposed leader while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)"); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); }}投票相关函数更新投票字段synchronized void updateProposal(long leader, long zxid, long epoch) { if (LOG.isDebugEnabled()) { LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + proposedLeader + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)"); } proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch;}生成投票的函数public synchronized Vote getVote() { return new Vote(proposedLeader, proposedZxid, proposedEpoch);}状态信息获取函数private ServerState learningState() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { LOG.debug("I am a participant: {}", self.getId()); return ServerState.FOLLOWING; } else { LOG.debug("I am an observer: {}", self.getId()); return ServerState.OBSERVING; }}获取参与投票服务器的标识idprivate long getInitId() { if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) { return self.getId(); } else { return Long.MIN_VALUE; }}获取最新的日志事务idprivate long getInitLastLoggedZxid() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { return self.getLastLoggedZxid(); } else { return Long.MIN_VALUE; }}获取保存在文件中当前epochpublic long getCurrentEpoch() throws IOException { if (currentEpoch == -1) { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } return currentEpoch;}选举相关函数判断当前 a pair (server id, zxid)是否赢得了当前选票,总而言之 ,当前选票和新选票,哪个id大就选哪一个protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { if (LOG.isDebugEnabled()) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); } if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch >

CurEpoch) | | (newEpoch = = curEpoch) & & ((newZxid > curZxid) | | (newZxid = = curZxid) & & (newId > curId);} determine whether it is Leader, and remove protected boolean checkLeader (Map votes, long leader, long electionEpoch) {boolean predicate = true if it is not leader. / * If everyone else thinks I'm the leader, I must be the leader. * The other two checks are just for the case in which I'm not the * leader. If I'm not the leader and I haven't received a message * from leader stating that it is leading, then predicate is false. * / if (leader! = self.getId ()) {if (votes.get (leader) = = null) {predicate = false;} else if (votes.get (leader). GetState ()! = ServerState.LEADING) {predicate = false;}} else if (logicalclock.get ()! = electionEpoch) {predicate = false;} return predicate;}

Start a new round of campaign work

Public Vote lookForLeader () throws InterruptedException

Data structure information classes involved in the election

The difference between electionEpoch and peerEpoch

ElectionEpoch is an election cycle, which is used to determine whether he created an election cycle, accumulating from 0.

PeerEpoch is the current cycle

Two vote comparison rules

Compare peerEpoch,zxid,sid in turn

PeerEpoch represents the cycle. The bigger the vote, the newer the vote.

When the peerEpoch is the same, zxid represents the transaction record in a cycle. The larger the vote, the newer the vote.

When the peerEpoch,zxid is the same, the sid wins the vote.

At this point, I believe you have a deeper understanding of "the implementation of fast election FastLeaderElection in zk". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report