In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
The main content of this article is to explain "the implementation of fast election FastLeaderElection in zk". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Now let the editor to take you to learn "the implementation of fast election FastLeaderElection in zk"!
Elections involve concepts
Server statu
vote
How to choose to vote?
Agreement
Election
How to conduct the election?
Epoch
Sender
Recipient
Send queue
Receiving queue
Server statu
Public enum ServerState {
LOOKING, look for the Leader status. When the service is in this state, there is no Leader in the current cluster, so you need to enter the Leader election.
FOLLOWING, follower status, indicates that it is currently Follower
LEADING, leader status, indicates that it is currently Leader
OBSERVING, observer
}
Vote Voting
Id
Recommended sid for leader
Zxid
Recommended leader transaction id
ElectionEpoch
To determine whether multiple votes are in the same round of election cycle, the server is a word increment sequence, and after entering the new round of voting, add 1 to the value.
PeerEpoch
Recommended epoch for leader
State
Current server status
Inner class
There is a Messenger ToSend Notification class
Notifications lets other nodes know that the vote of the specified node has changed, possibly because there is a higher zxid in the node election or vote or the same zxid has a higher serverid.
The ToSend class is used to wrap the sent information
Messenger is divided into
WorkerReceiver and WorkerSender
Mainly complete the setting of the information of these two objects
LinkedBlockingQueue sendqueue
LinkedBlockingQueue recvqueue
Public Vote lookForLeader () throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean (); MBeanRegistry.getInstance (). Register (self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn ("Failed to register with JMX", e); self.jmxLeaderElectionBean = null;} self.start_fle = Time.currentElapsedTime (); try {Map recvset = new HashMap (); Map outofelection = new HashMap () Int notTimeout = minNotificationInterval; synchronized (this) {logicalclock.incrementAndGet (); updateProposal (getInitId (), getInitLastLoggedZxid (), getPeerEpoch ());} LOG.info ("New election. My id= "+ self.getId () +", proposed zxid=0x "+ Long.toHexString (proposedZxid)); sendNotifications (); SyncedLearnerTracker voteSet / * * Loop in which we exchange notifications until we find a leader * / while ((self.getPeerState () = = ServerState.LOOKING) & & (! stop)) {/ * * Remove next notification from queue, times out after 2 times * the termination time * / Notification n = recvqueue.poll (notTimeout, TimeUnit.MILLISECONDS) / * Sends more notifications if haven't received enough. * Otherwise processes new notification. * / if (n = = null) {if (manager.haveDelivered ()) {sendNotifications ();} else {manager.connectAll ();} / * * Exponential backoff * / int tmpTimeOut = notTimeout * 2 NotTimeout = (tmpTimeOut
< maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1"); break; } if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); break; } // If notification >Current, replace and send messages out if (n.electionEpoch > logicalclock.get ()) {logicalclock.set (n.electionEpoch); recvset.clear () If (totalOrderPredicate (n.leader, n.zxid, n.peerEpoch, getInitId (), getInitLastLoggedZxid (), getPeerEpoch ()) {updateProposal (n.leader, n.zxid, n.peerEpoch);} else {updateProposal (getInitId (), getInitLastLoggedZxid (), getPeerEpoch () } sendNotifications ();} else if (n.electionEpoch
< logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); if (voteSet.hasAllQuorums()) { // Verify if there is any change in the proposed leader while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)"); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); }}投票相关函数更新投票字段synchronized void updateProposal(long leader, long zxid, long epoch) { if (LOG.isDebugEnabled()) { LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + proposedLeader + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)"); } proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch;}生成投票的函数public synchronized Vote getVote() { return new Vote(proposedLeader, proposedZxid, proposedEpoch);}状态信息获取函数private ServerState learningState() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { LOG.debug("I am a participant: {}", self.getId()); return ServerState.FOLLOWING; } else { LOG.debug("I am an observer: {}", self.getId()); return ServerState.OBSERVING; }}获取参与投票服务器的标识idprivate long getInitId() { if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) { return self.getId(); } else { return Long.MIN_VALUE; }}获取最新的日志事务idprivate long getInitLastLoggedZxid() { if (self.getLearnerType() == LearnerType.PARTICIPANT) { return self.getLastLoggedZxid(); } else { return Long.MIN_VALUE; }}获取保存在文件中当前epochpublic long getCurrentEpoch() throws IOException { if (currentEpoch == -1) { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } return currentEpoch;}选举相关函数判断当前 a pair (server id, zxid)是否赢得了当前选票,总而言之 ,当前选票和新选票,哪个id大就选哪一个protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { if (LOG.isDebugEnabled()) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); } if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch >CurEpoch) | | (newEpoch = = curEpoch) & & ((newZxid > curZxid) | | (newZxid = = curZxid) & & (newId > curId);} determine whether it is Leader, and remove protected boolean checkLeader (Map votes, long leader, long electionEpoch) {boolean predicate = true if it is not leader. / * If everyone else thinks I'm the leader, I must be the leader. * The other two checks are just for the case in which I'm not the * leader. If I'm not the leader and I haven't received a message * from leader stating that it is leading, then predicate is false. * / if (leader! = self.getId ()) {if (votes.get (leader) = = null) {predicate = false;} else if (votes.get (leader). GetState ()! = ServerState.LEADING) {predicate = false;}} else if (logicalclock.get ()! = electionEpoch) {predicate = false;} return predicate;}
Start a new round of campaign work
Public Vote lookForLeader () throws InterruptedException
Data structure information classes involved in the election
The difference between electionEpoch and peerEpoch
ElectionEpoch is an election cycle, which is used to determine whether he created an election cycle, accumulating from 0.
PeerEpoch is the current cycle
Two vote comparison rules
Compare peerEpoch,zxid,sid in turn
PeerEpoch represents the cycle. The bigger the vote, the newer the vote.
When the peerEpoch is the same, zxid represents the transaction record in a cycle. The larger the vote, the newer the vote.
When the peerEpoch,zxid is the same, the sid wins the vote.
At this point, I believe you have a deeper understanding of "the implementation of fast election FastLeaderElection in zk". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.