In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
Most people do not understand the knowledge points of this article "what is the principle of InterProcessMutex to achieve zookeeper distributed lock", so the editor summarizes the following content, detailed content, clear steps, and has a certain reference value. I hope you can get something after reading this article. Let's take a look at this article "what is the principle of zookeeper distributed lock in InterProcessMutex?"
Introduction to the principle:
The principle of zookeeper to implement distributed locking is that multiple nodes create temporary session sequence nodes under a specified node at the same time. Whoever creates the node with the smallest sequence number will get the lock, and other nodes will listen to the node whose sequence number is smaller than their own. Once the node whose sequence number is smaller than their own is deleted, the other nodes will get the corresponding event and check whether they are the node with the lowest sequence number, if so. The lock is acquired.
Zookeeper Node Graph Analysis
The locking mechanism implemented by InterProcessMutex is fair and mutually exclusive, and the fair way is queued in the order of each request.
The InterProcessLock interface implemented by InterProcessMutex, InterProcessLock mainly standardizes the following methods:
/ / acquire mutex public void acquire () throws Exception;// acquire mutex public boolean acquire (long time, TimeUnit unit) throws Exception;// release lock processing public void release () throws Exception;// if the thread in this JVM acquires the mutex, return trueboolean isAcquiredInThisProcess ()
Next, let's take a look at the implementation in InterProcessMutex, what properties it has, and the implementation details.
Public class InterProcessMutex implements InterProcessLock, Revocable {/ / LockInternals is the class that really implements the operation of zookeeper. It contains the concrete implementation of CuratorFramework / / LockInternals connecting to the zookeeper client. Later, I will talk about that private final LockInternals internals; / / basePath is the root node of the lock, all temporary ordered nodes are children of basePath, private final String basePath; / / private final ConcurrentMap threadData = Maps.newConcurrentMap () / / LockData encapsulates the thread corresponding to the request (owningThread), the number of reentrants of the lock (lockCount), the temporary node corresponding to the thread (lockPath) private static class LockData {final Thread owningThread; final String lockPath; / / atomic final AtomicInteger lockCount = new AtomicInteger (1); private LockData (Thread owningThread, String lockPath) {this.owningThread = owningThread This.lockPath = lockPath;}} private static final String LOCK_NAME = "lock-"; / / acquire mutex, block [implementation of InterProcessLock] @ Override public void acquire () throws Exception {/ / acquire lock, and wait for if (! internalLock (- 1, null)) {throw new IOException ("Lost connection while trying to acquire lock:" + basePath) }} / / acquire the mutex, specify the time time [implementation of InterProcessLock] @ Override public boolean acquire (long time, TimeUnit unit) throws Exception {return internalLock (time, unit);} / / whether the current thread occupies the implementation of [InterProcessLock] @ Override public boolean isAcquiredInThisProcess () {return (threadData.size () > 0) in the lock } / / if the calling thread is the same as the thread that acquired the mutex, a mutex release is performed. If the thread has called acquire multiple times, the mutex will retain [implementation of InterProcessLock] @ Override public void release () throws Exception {Thread currentThread = Thread.currentThread (); / / current thread LockData lockData = threadData.get (currentThread); / / lock information if (lockData = = null) {throw new IllegalMonitorStateException ("You do not own the lock:" + basePath) corresponding to the thread when this method returns } / / because the acquired lock is reentrant, the lock int newLockCount = lockData.lockCount.decrementAndGet (); if (newLockCount > 0) {return;} if (newLockCount) will not be released until minus 1 lockCount lockcount 0.
< 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { // 到这里时lockCount=0,具体释放锁的操作交给LockInternals中的releaseLock方法实现 internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } // 获取basePath根结点下的所有临时节点的有序集合 public Collection getParticipantNodes() throws Exception { return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver()); } boolean isOwnedByCurrentThread() { LockData lockData = threadData.get(Thread.currentThread()); return (lockData != null) && (lockData.lockCount.get() >0);} protected String getLockPath () {LockData lockData = threadData.get (Thread.currentThread ()); return lockData! = null? The internalLock () method private boolean internalLock (long time, TimeUnit unit) called in lockData.lockPath: null;} / / acquire () throws Exception {Thread currentThread = Thread.currentThread (); LockData lockData = threadData.get (currentThread); if (lockData! = null) {/ / if the current thread has acquired the lock, lockCount+1 will be reentered and true lockData.lockCount.incrementAndGet () will be returned The return true;} / / attemptLock method is the real implementation of acquiring locks. LockPath is the node successfully created by the current thread under basePath. If lockPath is not empty, it means that the lock String lockPath = internals.attemptLock (time, unit, getLockNodeBytes ()) has been successfully acquired. If (lockPath! = null) {/ / lockPath is encapsulated in the lock information corresponding to the current thread LockData newLockData = newLockData (currentThread, lockPath); threadData.put (currentThread, newLockData); return true;} return false;}}
Let's take a look at the implementation details of the LockInternals class used in InterProcessMutex
Public class LockInternals {private final CuratorFramework client; / / the client private final String path; that connects to the zookeeper is equal to the private final String basePath; / / root node private final LockInternalsDriver driver passed in the basePath,InterProcessMutex / / operate the driver private final String lockName; of the zookeeper node / / "lock-" private final AtomicReference revocable = new AtomicReference (null) Private final CuratorWatcher revocableWatcher = new CuratorWatcher () {@ Override public void process (WatchedEvent event) throws Exception {if (event.getType () = = Watcher.Event.EventType.NodeDataChanged) {checkRevocableWatcher (event.getPath ());} / / listens to the listener of the node. If the listening node has any movement, wake up notifyFromWatcher () = > notifyAll (); private final Watcher watcher = new Watcher () {@ Override public void process (WatchedEvent event) {notifyFromWatcher ();}}; private volatile int maxLeases / / get the child nodes of basePath. Sorted public static List getSortedChildren (CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {List children = client.getChildren () .forPath (basePath); List sortedList = Lists.newArrayList (children) Collections.sort (sortedList, new Comparator () {@ Override public int compare (String lhs, String rhs) {return sorter.fixForSorting (lhs, lockName) .compareto (sorter.fixForSorting (rhs, lockName));}}) Return sortedList;} / / attempt to acquire lock [internalLock= > attemptLock] String attemptLock (long time, TimeUnit unit, byte [] lockNodeBytes) throws Exception {/ / start time final long startMillis = System.currentTimeMillis (); / / record waiting time final Long millisToWait = (unit! = null)? Unit.toMillis (time): null; final byte [] localLockNodeBytes = (revocable.get ()! = null)? New byte [0]: lockNodeBytes; / / number of retries int retryCount = 0; / / current node String ourPath = null; / / whether to acquire the lock flag boolean hasTheLock = false; / / whether to abandon the acquisition flag boolean isDone = false / / keep trying to get while (! isDone) {isDone = true; try {/ / create the node corresponding to the current thread ourPath = driver.createsTheLock (client, path, localLockNodeBytes); / / get hasTheLock = internalLockLoop (startMillis, millisToWait, ourPath) in internalLockLoop } catch (KeeperException.NoNodeException e) {/ / can I try if (client.getZookeeperClient (). GetRetryPolicy (). AllowRetry (retryCount++, System.currentTimeMillis ()-startMillis, RetryLoop.getDefaultRetrySleeper () {isDone = false } else {throw e;} / / after acquiring the lock, return the node path if (hasTheLock) {return ourPath;} return null created by the current thread } / / get [attemptLock= > internalLockLoop] private boolean internalLockLoop (long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false; / / whether to have a distributed lock boolean doDelete = false / / whether to delete the current node try {if (revocable.get ()! = null) {client.getData () .usingWatcher (revocableWatcher) .forPath (ourPath) } / / Loop attempts to acquire lock while ((client.getState () = = CuratorFrameworkState.STARTED) & &! haveTheLock) {/ / get the temporary child node List children = getSortedChildren () sorted under basePath / / get the child node String sequenceNodeName = ourPath.substring (basePath.length () + 1) corresponding to the current thread created previously; / / + 1 to include the slash / / determine whether the lock is acquired, and return the listening path PredicateResults predicateResults = driver.getsTheLock (client, children, sequenceNodeName, maxLeases) if not. / / successfully obtained if (predicateResults.getsTheLock ()) {haveTheLock = true } else {/ / did not acquire the lock, listening on the previous temporary sequence node String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch () Synchronized (this) {try {/ / if the last temporary sequence node is deleted Will wake up the current thread to continue contending for lock client.getData () .usingWatcher (watcher) .forPath (previousSequencePath) If (millisToWait! = null) {millisToWait-= (System.currentTimeMillis ()-startMillis); startMillis = System.currentTimeMillis () / / acquire lock timeout if (millisToWait
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.