In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the knowledge of "how to realize the Curator distributed lock of ZooKeeper". Many people will encounter this dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
There is a more standard and standardized distributed lock implementation in Curator. Instead of implementing it ourselves, we should use Curator directly. By learning the source code of Curator, we can also understand the best practices for implementing distributed locks.
There are all kinds of distributed locks in Curator. This article chooses one of them-InterProcessMutex to explain.
Let's first take a look at the comments on InterProcessMutex in the Curator code:
Reentrant mutex that works across JVM. Use ZooKeeper to control locks. Any process in all JVM will become part of the cross-process as long as it uses the same lock path. In addition, the exclusive lock is "fair" and each user gets the exclusive lock in the order in which they apply.
It can be seen that both InterProcessMutex and our own implementation examples are an exclusive lock and can also be reentered.
How to use InterProcessMutex
Before analyzing the InterProcessMutex code, let's take a look at how it works. The following code briefly shows the use of InterProcessMutex:
Public static void soldTickWithLock (CuratorFramework client) throws Exception {/ / creates a distributed lock. The root node path of the lock space is / curator/lock InterProcessMutex mutex = new InterProcessMutex (client, "/ curator/locks"); mutex.acquire (); / / the lock is obtained and the business process / / executes for a period of time on behalf of the complex logic int sleepMillis = (int) (Math.random () * 2000) Thread.sleep (sleepMillis); / / complete the business process and release the lock mutex.release ();}
The lock is used in the same way as the lock we wrote ourselves, first acquiring the lock through mutex.acquire (), which blocks the process until the lock is acquired, then executes your business method, and finally releases the lock through mutex.release ().
Next, let's get down to business and analyze Curator's implementation of distributed locks:
Realization idea
The way Curator is designed is similar to the way we implemented it ourselves:
1. Create ordered temporary nodes
2. Trigger the "try to take the lock logic". If you are the first in the sequence of temporary locking nodes, you will acquire the lock and get the lock successfully.
3. If you are not the first in the sequence, listen for the change of the previous lock node. Block threads at the same time.
4. When the current lock node changes, resume the thread through watcher, and then go to step 2 "try to get the lock logic" again.
As shown in the following figure:
Overview of Code implementation
Curator's top-level implementation logic for exclusive locks is in the InterProcessMutex class, which exposes the use of locks to the client, such as acquiring and releasing locks. However, the above implementation logic of the lock is specifically implemented by the LockInternals object he holds. LockInternals uses the methods in the StandardLockInternalsDriver class to do some processing.
To put it simply, let's take an example. Curator is like a company that undertakes all kinds of business. InterProcessMutex is the boss. After receiving the needs of his own customers (client), he assigns them to his subordinate LockInternals to complete them. At the same time, he is given a tool StandardLockInternalsDriver so that he can use it in the process of doing tasks. As shown in the following figure:
Next we'll take a closer look at the InterProcessMutex, LockInternals, and StandardLockInternalsDriver classes.
InterProcessMutex source code analysis
The InterProcessMutex class is an exclusive lock class in curator, and the client is directly dealing with InterProcessMutex. So let's start at the top level and analyze InterProcessMutex first.
Implementation interface
InterProcessMutex implements two interfaces:
Public class InterProcessMutex implements InterProcessLock, Revocable
InterProcessLock is a distributed lock interface. Distributed locks must implement the following methods in the interface:
1. Acquire the lock until the lock is available
Public void acquire () throws Exception
2. Acquire the lock within the specified waiting time.
Public boolean acquire (long time, TimeUnit unit) throws Exception
3. Release the lock
Public void release () throws Exception
4. Whether the current thread has acquired the lock
Boolean isAcquiredInThisProcess ()
The above methods are also exposed by InterProcessMutex and can be called by clients when using distributed locks.
Revocable, which implements the lock of the interface, and the lock can be undone. This article focuses on the implementation mechanism of the lock and does not discuss the revocation part.
Attribute
The InterProcessMutex attribute is as follows:
The type name indicates that the implementation of the LockInternalsinternals lock is all in this class. InterProcessMutex uses this method to implement the lock StringbasePath lock node's root path in zk, the ConcurrentMapThreadDatathread and its own lock-related data mapping StringLOCK_NAME constant with a value of "lock-". Represents the prefix of the lock node
It also has an internal static class LockData, which is also the value stored in threadData, which defines the data related to the lock, including the thread to which the lock belongs, the full path of the lock, and the number of times the thread is locked (InterProcessMutex is a reentrant lock). The code is as follows:
Private static class LockData {final Thread owningThread; final String lockPath; final AtomicInteger lockCount = new AtomicInteger (1); private LockData (Thread owningThread, String lockPath) {this.owningThread = owningThread; this.lockPath = lockPath;} Construction method
There are three construction methods for InterProcessMutex. Nested calls are made according to the input parameters. The final construction method is as follows:
InterProcessMutex (CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {basePath = PathUtils.validatePath (path); internals = new LockInternals (client, driver, path, lockName, maxLeases);}
You can see that the constructor finally initializes two properties, and basePath is set to the value we passed in "/ curator/lock", which is the root node of the lock. In addition, the internals is initialized, and as mentioned earlier, internals is the object that really implements the locking function. The one who really works is internals.
After constructing the InterProcessMutex object, let's see how it works.
Method
InterProcessMutex implements the InterProcessLock interface, and several methods for distributed locks are in this interface. Let's take a look at how InterProcessMutex is implemented.
Get the lock
There are two ways to acquire a lock, and the difference is whether the length of time to wait for the lock is limited. In fact, it all ends up with a private method called internalLock (). The code that does not limit the waiting time is as follows:
Public void acquire () throws Exception {if (! internalLock (- 1, null)) {throw new IOException ("Lost connection while trying to acquire lock:" + basePath);}}
You can see that when internalLock () returns false, it is only possible that the connection times out, otherwise it will be waiting for the lock to be acquired.
The internalLock logic is as follows:
Gets the lockData of the current thread in threadData
If there is lock data for this thread, it means lock reentry, lockData.lockCount plus 1, and return true directly. Lock acquired successfully
If the lock data for the thread does not exist, the lock is acquired through internals.attemptLock (), and the thread is blocked until the lock is obtained
After the lock is acquired successfully, the lock information is saved to threadData.
If the lock is not acquired, false is returned.
The complete code is as follows:
Private boolean internalLock (long time, TimeUnit unit) throws Exception {/ * Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary * / Thread currentThread = Thread.currentThread (); LockData lockData = threadData.get (currentThread); if (lockData! = null) {/ / re-entering lockData.lockCount.incrementAndGet (); return true } String lockPath = internals.attemptLock (time, unit, getLockNodeBytes ()); if (lockPath! = null) {LockData newLockData = newLockData (currentThread, lockPath); threadData.put (currentThread, newLockData); return true;} return false;}
You can see that the core code for acquiring the lock is internals.attemptLock
Release lock
The method to release the lock is release (), and the logic is as follows:
To get the lock data of the current thread from threadData, there are the following situations:
Does not exist, throw an exception without this lock
Exists, and is greater than zero after lockCount-1, indicating that the thread lock has been reentered, so it is returned directly and is not released in the zk.
It exists, and it is less than zero after lockCount-1, which means that some kind of exception occurs, and the exception is thrown directly.
Exists, and lockCount-1 equals zero, which is the correct state with no reentry. All you need to do is delete the temporary node from the zk and remove the thread's data from the threadData through internals.releaseLock (), regardless of the result.
Summary of InterProcessMutex
Distributed lock is mainly used in the above two methods, InterProcessMutex and some other methods, here do not do specific explanation, you can see for yourself, the implementation is not complex.
Through the explanation of InterProcessMutex, I believe that we have learned about the acquisition and release of locks, and we should also realize that it is the LockInternals class that really implements locks. Next we will focus on LockInternals.
LockInternals source code analysis
The core logic of Curator to realize distributed lock through zk is in LockInternals. We analyze the source code of LockInternals step by step under the guidance of the process of acquiring lock to releasing lock.
Acquire lock
In the code analysis of InterProcessMutex acquiring the lock, we can see that it acquires the lock through internals.attemptLock (time, unit, getLockNodeBytes ());, so we use this method as the entry point. The logic of this method is relatively simple, as follows:
The lock node path is obtained by creating the lock node on the zk through driver.
Block the process through the internalLockLoop () method until the lock is successfully acquired.
The core code is as follows:
OurPath = driver.createsTheLock (client, path, localLockNodeBytes); hasTheLock = internalLockLoop (startMillis, millisToWait, ourPath)
Let's move on to the internalLockLoop method, where the core logic of acquiring the lock is in this method.
Through while spin in internalLockLoop, it is determined that if the lock is not acquired, it will keep trying to acquire the lock.
The logic in the while loop is as follows:
Check whether the sequence number of the current lock node is in the first place through driver. If it is in the first place, it means that the lock was successfully taken and jumped out of the loop.
If it is not in the first place, listen to its own prelock node and then block the thread.
The current sequence node releases the lock, the listener is triggered, the thread is restored, and the main thread returns to the first step in the while.
Repeat the above logic until you get the lock (the serial number of your own lock comes first).
The core code of the internalLockLoop method is as follows:
While ((client.getState () = = CuratorFrameworkState.STARTED) & &! haveTheLock) {List children = getSortedChildren (); String sequenceNodeName = ourPath.substring (basePath.length () + 1); / / + 1 to include the slash PredicateResults predicateResults = driver.getsTheLock (client, children, sequenceNodeName, maxLeases); if (predicateResults.getsTheLock ()) {haveTheLock = true } else {String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch (); synchronized (this) {try {/ / use getData () instead of exists () to avoid leaving unneeded watchers which is a type of resource leak client.getData () .usingWatcher (watcher) .forPath (previousSequencePath) If (millisToWait! = null) {millisToWait-= (System.currentTimeMillis ()-startMillis); startMillis = System.currentTimeMillis (); if (millisToWait
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.