In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article will explain in detail how to parse the source code of FastLeaderElection, the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Zookeeper, as a commonly used distributed coordinator and registry, will reselect the master to ensure the availability of the cluster in the case of initialization startup, lead outage, or heartbeat timeout with leader. The default algorithm is FastLeaderElection, which will be parsed in this article.
Select the main entrance
In the QuorumPeer.run () method, the main loop while (running) is performed. If the current state is LOOKING, enter the method lookForLeader that reselects the master.
/ / main cycle while (running) {. Switch (getPeerState ()) {case LOOKING:... / / call makeLEStrategy to obtain the selected master policy. Default is FastLeaderElection / / to enter the lookForLeader method and enable re-selecting master. / / the result returned by lookForLeader is cached by calling setCurrentVote. SetCurrentVote (makeLEStrategy (). LookForLeader ()); Break; case OBSERVING:... Break; case FOLLOWING:... Break; case LEADING: break;}} Select the main process
After entering FastLeaderElection#lookForLeader, the logic of the code is as follows
The 1.leader election cycle version number + 1, initiates the first vote, and chooses itself by default.
two。 Open a loop to receive voting notifications from other zk nodes
2.1 when votes from other nodes are not received, exponentially extend the reception wait time and re-execute the cycle of step 2
2.2 after receiving the votes of other nodes, determine the status of the source node of the vote notification
2.2.1 the voting source node is in LOOKING status. Compare the voting notification with the local voting information cache. Compare the voting cycle version number, the resolution zxid, and the sequence number of the zk node to determine which is more effective.
2.2.1.2 after comparison, recalculate the current voting information of the machine, including the selected leader id, the resolution zxid of the selected leader, the version number of the leader election cycle, and send it to other nodes in the cluster
2.2.1.3 judge the latest voting information of this machine, the number in all the voting notification caches received by this machine, and determine whether it is more than half of the total number of clusters
2.2.1.3.1 do not exceed half of the total number of clusters, re-execute the cycle in step 2
2.2.1.3.2 more than half of the total number of clusters, try to cycle to pull the remaining votes in the voting queue.
2.2.1.3.2.1 if another vote is pulled, filter out the invalid votes and re-execute the loop in step 2.
2.2.1.3.2.2 if no other votes are pulled, set the status of the current node, LEADING, FOLLOWING or OBSERVING, clear the cache, and return the final leader result.
2.2.2 the voting source node is OBSERVING, and the node in this state cannot vote. Discard this vote and continue with the cycle in step 2.
2.2.3 the voting source node is FOLLOWING, which means that the master node has been selected within the cluster. Give up your vote at this time, query the voting cache of other nodes in the outofelection, and directly determine the leader. Set the current node status to FOLLOWING or OBSERVING, and return the result of the ballot.
2.2.4 the voting source node is LEADING, which means that the master node has been selected within the cluster. Give up your vote at this time, query the voting cache of other nodes in the outofelection, and directly determine the leader. Set the current node status to FOLLOWING or OBSERVING, and return the result of the ballot.
/ * start a new round of leader elections. Call this method whenever the state of our service node changes to LOOKING, and then send notification * / public Vote lookForLeader () throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean (); MBeanRegistry.getInstance (). Register (self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn ("Failed to register with JMX", e) Self.jmxLeaderElectionBean = null;} self.start_fle = Time.currentElapsedTime (); ballots for the current leadership election of try {/ * are stored in the map recvset. In other words, the ballot Vote in recvset * must satisfy the condition v.electionEpoch = = logicalclock (determine whether the ballot belongs to this election cycle). * the current election participating nodes use recvset to infer whether most of the election participating nodes voted in favor. * / Map recvset = new HashMap (); / * * the number of votes in previous leader elections and the number of votes in this leader election will be stored in outofelection. * Note that notifications in the LOOKING state are not stored in outofelection. * * if the other nodes in the cluster have already elected leader when the current node participates in the election, then follow the cluster leader * / Map outofelection = new HashMap () directly according to the data in this map; int notTimeout = minNotificationInterval Synchronized (this) {/ / logical cycle of the election + 1, indicating the launch of a new election logicalclock.incrementAndGet () / / to update the voting information, the first parameter indicates who to choose (by default), the second parameter represents the latest resolution zxid in the current local log, and the third parameter represents the epoch number updateProposal (getInitId (), getInitLastLoggedZxid (), getPeerEpoch ()) of the new leader cycle;} LOG.info ("New election. My id= {}, proposed zxid=0x {} ", self.getId (), Long.toHexString (proposedZxid); / / send voting information and broadcast to other nodes sendNotifications (); SyncedLearnerTracker voteSet = null / * * in the loop, we exchange notifications with other zk nodes until we find a leader * / while ((self.getPeerState () = = ServerState.LOOKING) & & (! stop)) {/ / pull a vote notification from another node from the receiving queue The pull timeout is 200ms Notification n = recvqueue.poll (notTimeout, TimeUnit.MILLISECONDS) / / if no new voting notification is pulled in the 200ms, more voting notifications will be sent. / / if it is pulled, the new voting notice will be processed otherwise. If (n = = null) {if (manager.haveDelivered ()) {sendNotifications ();} else {manager.connectAll () Exponential extension of waiting time * / int tmpTimeOut = notTimeout * 2; notTimeout = Math.min (tmpTimeOut, maxNotificationInterval); self.getQuorumVerifier () .revalidateVoteset (voteSet, notTimeout! = minNotificationInterval) If (self.getQuorumVerifier () instanceof QuorumOracleMaj & & voteSet! = null & & voteSet.hasAllQuorums () & & notTimeout! = minNotificationInterval) {setPeerState (proposedLeader, voteSet); Vote endVote = new Vote (proposedLeader, proposedZxid, logicalclock.get (), proposedEpoch); leaveInstance (endVote); return endVote } LOG.info ("Notification time out: {} ms", notTimeout) } else if (validVoter (n.sid) & & validVoter (n.leader)) {/ / determine the status of the source node of this vote switch (n.state) {/ / the state in the election case LOOKING: / / the latest zxid in the current log is-1 If it is an exception, exit if directly (getInitLastLoggedZxid () = =-1) {LOG.debug ("Ignoring notification as our zxid is-1") The zxid in the break;} / / voting notification is-1, which is also an exception. Exit if (n.zxid = =-1) {LOG.debug ("Ignoring notification from member with-1 zxid {}", n.sid) directly. Break } / / determine whether the leader election cycle version number in the voting notice is greater than the local election cycle version number if (n.electionEpoch > logicalclock.get ()) {/ / the local leader election cycle version number is not up to date Updated to updated leader election cycle version number logicalclock.set (n.electionEpoch) / / clear the old ballot map recvset.clear () / / call the core method totalOrderPredicate / / compare the information in the voting notification with the local initialization information Determine whether the vote is valid if (totalOrderPredicate (n.leader, n.zxid, n.peerEpoch, getInitId (), getInitLastLoggedZxid (), getPeerEpoch ()) {/ / update the information in the received voting notice to the local voting information updateProposal (n.leader, n.zxid, n.peerEpoch) } else {/ / same as before, using the local voting information updateProposal (getInitId (), getInitLastLoggedZxid (), getPeerEpoch ()) } / / broadcast sends a new voting notification sendNotifications ();} else if (n.electionEpoch)
< logicalclock.get()) { //接收的投票通知中的leader选举周期版本号比本地的旧,则丢弃这个投票通知 LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}", Long.toHexString(n.electionEpoch), Long.toHexString(logicalclock.get())); break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //还是调用totalOrderPredicate方法 // 用投票通知中的信息,和本地当前缓存中最新的投票信息进行比对,如果投票中的比较新,则进入这里 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } LOG.debug( "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}", n.sid, n.leader, Long.toHexString(n.zxid), Long.toHexString(n.electionEpoch)); //在选票map中存储选票 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //proposedLeader、proposedZxid、proposedEpoch三个参数,表示当前zk服务根据已经接收到的投票,确定出的本机投选的leader信息 //调用getVoteTracker方法,对本机投选的leader的选票进行统计 voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); //本机投选的leader是否获取了半数以上的票 if (voteSet.hasAllQuorums()) { // 拉取投票接收队列中的剩余选票 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { //判断选票是否更有效 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //如果有更有效的选票,则放回队列 recvqueue.put(n); break; } } /** * 没有拉取到新的投票通知了,那么表示所有服务都不改票了 */ if (n == null) { //根据投票结果,设置当前节点的状态,LEADING、 FOLLOWING或者OBSERVING setPeerState(proposedLeader, voteSet); //创建最终选举结果的选票对象 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); //清空接收队列 leaveInstance(endVote); //返回最终选举结果 return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: //收到的投票通知中,投票来源的机器是FOLLOWING,说明在集群内部已经选出主节点了 //此时放弃自己的投票,查询outofelection中的投票,直接确定leader Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n); if (resultFN == null) { break; } else { return resultFN; } case LEADING: //收到的投票通知中,投票来源的机器是LEADING,说明在集群内部已经选出主节点了 //此时放弃自己的投票,查询outofelection中的投票,直接确定leader Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n); if (resultLN == null) { break; } else { return resultLN; } default: LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }核心投票有效性抉择逻辑 选LEADER主流程中,多次调用FastLeaderElection#totalOrderPredicate进行了投票有效性的比较,该逻辑是选主有效性抉择的核心逻辑。优先比较leader选举周期版本号epoch,相等时比较决议zxid,都相等时直接使用leader机器id大的。 /** * 检查接收到的投票通知,是否为有效投票 * @param newId 新投票中的leader节点id * @param newZxid 新投票中的决议id * @param newEpoch 新投票中的leader选举周期版本号 * @param curId 当前本机投票中的leader节点id * @param curZxid 当前本机投票中的决议id * @param curEpoch 当前本机投票中的leader选举周期版本号 * @return 新投票更有效时返回true,否则false */ protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug( "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}", newId, curId, Long.toHexString(newZxid), Long.toHexString(curZxid)); if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } /* *如果以下三种情况之一成立,则返回true: *1-投票中的leader选举周期版本号大于本地的 *2-leader选举版本号相同,但投票中的决议号zxid更高 *3-leader选举版本号和决议号zxid都相同,则比较zk机器本身的机器编号,判断投票中的机器编号是否大于当前机器编号 */ return ((newEpoch >CurEpoch) | | (newEpoch = = curEpoch) & & (newZxid > curZxid) | | (newZxid = = curZxid) & (newId > curId);} this is the end of sharing on how to parse the source code of FastLeaderElection. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.