In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to understand ReentrantLock". In daily operation, I believe many people have doubts about how to understand ReentrantLock. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "how to understand ReentrantLock"! Next, please follow the editor to study!
ReentrantLock is a reentrant lock
What is a reentrant lock? For example, after thread 1 acquires the lock by calling the lock () method, and then calls lock, it will no longer block to acquire the lock, but will directly increase the number of retries.
Remember synchronized? It has two instructions, monitorenter and monitorexit, to guarantee the lock, and their function can be understood as that each lock object has a lock counter, that is, if the lock () method is called again, the counter will be added by one.
So, both synchronized and ReentrantLock are reentrant locks
The difference between ReentrantLock and synchronized
Since both synchronized and ReentrantLock are reentrant locks, what's the difference between ReentrantLock and synchronized?
Synchronized is the syntax provided at the Java language level, so there is no need to consider exceptions; ReentrantLock is a lock implemented by Java code, so you must first acquire the lock and then release the lock correctly
Synchronized must wait all the time to acquire the lock; there is no additional attempt mechanism; ReentrantLock can try to acquire the lock (as you will see later when analyzing the source code)
ReentrantLock supports fair and unfair choices when acquiring locks
No more BB, just go to the source code
Detailed explanation of lock & NonfairSync & FairSync
Public void lock () {sync.lock ();}
Among them, sync is the static inner class of ReentrantLock, which inherits AQS to implement the logic of reentering locks. Sync has two concrete implementation classes: NonfairSync and FairSync.
NonfairSync
Let's take a look at NonfairSync:
Static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L; / * * Performs lock. Try immediate barge, backing up to normal * acquire on failure. * / / rewrite the lock method of Sync final void lock () {/ / regardless of other things, CAS first, try to preempt lock if (compareAndSetState (0,1)) / / if preemption succeeds, you get lock setExclusiveOwnerThread (Thread.currentThread ()) Else / / did not preempt successfully, call the acquire () method, go to the logic acquire (1);} / / rewrite AQS's tryAcquire method protected final boolean tryAcquire (int acquires) {return nonfairTryAcquire (acquires);}}
FairSync
Next, take a look at FairSync:
Static final class FairSync extends Sync {private static final long serialVersionUID =-3000897897090466540L; / / override the lock method final void lock () {acquire (1);} / * Fair version of tryAcquire of Sync. Don't grant access unless * recursive call or no waiters or is first. * / / overridden Sync's tryAcquire method protected final boolean tryAcquire (int acquires) {/ / get the currently executing thread final Thread current = Thread.currentThread (); / / get the value of state int c = getState () / / in lock-free state if (c = = 0) {/ / if (! hasQueuedPredecessors () & & compareAndSetState (0, acquires)) {/ / saves the thread that currently acquired the lock when there is no precursor node and the value of state is successfully replaced. Next time, there is no need to try a competitive lock. SetExclusiveOwnerThread (current) can be reentered directly. Return true;}} else if (current = = getExclusiveOwnerThread ()) {/ / if the lock is acquired by the same thread, int nextc = c + acquires; / / nextc is less than 0 and if (nextc) is thrown by directly increasing the number of reentrants.
< 0) throw new Error("Maximum lock count exceeded"); setState(nextc); // 获取锁成功 return true; } // 获取锁失败 return false; } } 总结 NonfairSync 与 FairSync 到这里,应该就比较清楚了, Sync 有两个具体的实现类,分别是: NonfairSync :可以抢占锁,调用 NonfairSync 时,不管当前队列上有没有其他线程在等待,上来我就先 CAS 操作一番,成功了就获得了锁,没有成功就走 acquire 的逻辑;在释放锁资源时,走的是 Sync.nonfairTryAcquire 方法 FairSync :所有线程按照 FIFO 来获取锁,在 lock 方法中,没有 CAS 尝试,直接就是 acquire 的逻辑;在释放资源时,走的是自己的 tryAcquire 逻辑 接下来咱们看看 NonfairSync 和 FairSync 是如何获取锁的 ReentrantLock 获取锁 NonfairSync.lock() 在 NonfairSync 中,获取锁的方法是: final void lock() { // 不管别的,上来就先 CAS 操作,尝试抢占一下锁 if (compareAndSetState(0, 1)) // 如果抢占成功,就获得了锁 setExclusiveOwnerThread(Thread.currentThread()); else // 没有抢占成功,调用 acquire() 方法,走里面的逻辑 acquire(1); } if 里面没啥说的,咱们来看看 acquire() 方法 AQS.acquire() acquire 是 AQS 的核心方法: public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 在这里,会先 tryAcquire 去尝试获取锁,如果获取成功,那就返回 true ,如果失败就通过 addWaiter 方法,将当前线程封装成 Node 插入到等待队列中 先来看 tryAcquire 方法: NonfairSync.tryAcquire(arg) 在 AQS 中 tryAcquire 方法没有具体实现,只是抛出了异常: protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } NonfairSync 中的 tryAcquire() 方法,才是我们想要看的: final boolean nonfairTryAcquire(int acquires) { // 获取当前执行的线程 final Thread current = Thread.currentThread(); // 获取 state 的值 int c = getState(); // 当 state 为 0 是,说明此时为无锁状态 if (c == 0) { // CAS 替换 state 的值,如果 CAS 成功,则获取锁成功 if (compareAndSetState(0, acquires)) { // 保存当前获得锁的线程,当该线程再次获得锁时,直接重入即可 setExclusiveOwnerThread(current); return true; } } // 判断是否是同一个线程来竞争锁 else if (current == getExclusiveOwnerThread()) { // 如果是,直接增加重入次数 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // 获取锁成功 return true; } // 获取锁失败 return false; } 有没有一种似曾相识的赶脚?在 FairSync 那里,分析过 90% 的代码(好像说分析过 99% 的代码也不过分),只是 FairSync 多了一个判断就是,是否有前驱节点 tryAcquire 分析完毕了,接下来看 addWaiter 方法 AQS.addWaiter 如果 tryAcquire() 方法获取锁成功,那就直接执行线程的任务就可以了,执行完毕释放锁 如果获取锁失败,就会调用 addWaiter 方法,将当前线程插入到等待队列中,插入的逻辑大概是这样的: 将当前线程封装成 Node 节点 当前链表中 tail 节点(也就是下面的 pred )是否为空,如果不为空,则 CAS 操作将当前线程的 node 添加到 AQS 队列 如果为空,或者 CAS 操作失败,则调用 enq 方法,再次自旋插入 咱们看具体的代码实现: private Node addWaiter(Node mode) { // 生成该线程所对应的 Node 节点 Node node = new Node(Thread.currentThread(), mode); // 将 Node 插入队列中 Node pred = tail; // 如果 pred 不为空 if (pred != null) { node.prev = pred; // 使用 CAS 操作,如果成功就返回 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果 pred == null 或者 CAS 操作失败,则调用 enq 方法再次自旋插入 enq(node); return node; } // 自旋 CAS 插入等待队列 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 必须初始化,使用 CAS 操作进行初始化 if (compareAndSetHead(new Node())) // 初始化状态时,头尾节点指向同一节点 tail = head; } else { node.prev = t; // 如果刚开始就是初始化好的,直接 CAS 操作,将 Node 插入到队尾即可 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } AQS.acquireQueued 通过 addWaiter 将当前线程加入到队列中之后,会走 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法 acquireQueued 方法实现的主要逻辑是: 获取当前节点的前驱节点 p 如果节点 p 为 head 节点,说明当前节点为第二个节点,那么它就可以尝试获取锁,调用 tryAcquire 方法尝试进行获取 调用 tryAcquire 方法获取锁成功之后,就将 head 指向自己,原来的节点 p 就需要从队列中删除 如果获取锁失败,则调用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法来决定后面操作 最后,通过 cancelAcquire 方法取消获得锁 看具体的代码实现: final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 如果 Node 的前驱节点 p 是 head,说明 Node 是第二个节点,那么它就可以尝试获取锁 if (p == head && tryAcquire(arg)) { // 如果锁获取成功,则将 head 指向自己 setHead(node); // 锁获取成功之后,将 next 指向 null ,即将节点 p 从队列中移除 p.next = null; // help GC failed = false; return interrupted; } // 节点进入等待队列后,调用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法 // 进入阻塞状态,即只有头结点的线程处于活跃状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } shouldParkAfterFailedAcquire 线程获取锁失败之后,会通过调用 shouldParkAfterFailedAcquire 方法,来决定这个线程要不要挂起 shouldParkAfterFailedAcquire 方法实现的主要逻辑: 首先判断 pred 的状态是否为 SIGNAL ,如果是,则直接挂起即可 如果 pred 的状态大于 0 ,说明该节点被取消了,那么直接从队列中移除即可 如果 pred 的状态不是 SIGNAL 也不大于 0 ,进行 CAS 操作修改节点状态为 SIGNAL ,返回 false ,也就是不需要挂起 看一下代码是如何实现的: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取 pred 的状态 int ws = pred.waitStatus; // 如果状态为 SIGNAL ,那么直接返回 true ,挂起线程即可 if (ws == Node.SIGNAL) return true; // 如果状态大于 0 ,说明线程被取消 if (ws >0) {/ / remove the cancel thread from the linked list and use a loop to ensure that the removal is successful do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); pred.next = node;} else {/ / CAS operation modifies the pred node status to SIGNAL compareAndSetWaitStatus (pred, ws, Node.SIGNAL) } / / there is no need to suspend thread return false;}
At this point, the acquisition lock for NonfairSync is over.
Let's take a look at the difference between FairSync's acquisition lock and it.
FairSync.lock ()
In the FairSync.lock () method, it looks like this:
Final void lock () {acquire (1);}
Because FairSync is a fair lock, there is no CAS operation to compete, just calling the acquire method
The logic is the same as above, so I'm not going to repeat it here.
Let's see how ReentrantLock releases the lock.
ReentrantLock release lock
When ReentrantLock releases the lock, the sync.release () method is called:
Public void unlock () {sync.release (1);}
Click in and find that the release method of AQS is called.
AQS.release ()
AQS's release method is easy to understand, so let's just look at the source code:
Public final boolean release (int arg) {/ / if the lock is released successfully if (tryRelease (arg)) {/ / get the header node of the AQS queue Node h = head; / / if the header node is not empty and the status! = 0 if (h! = null & & h.waitStatus! = 0) / / call the unparkSuccessor method to wake up the subsequent node unparkSuccessor (h) Return true;} return false;}
ReentrantLock.tryRelease ()
The tryRelease method in AQS only throws an exception, indicating that the specific implementation is implemented by the subclass ReentrantLock
Just look at the tryRelease method in ReentrantLock.
The main logic of implementing the tryRelease method in ReentrantLock is:
First of all, if the same lock is acquired by the same thread, it may be reentered multiple times, so you need to get the number of reentrants of the thread to be released, namely getState (), and then determine whether the thread is the thread that acquired the lock. Only the thread that acquired the lock will release the lock.
Release the lock by unlock, that is, if the value of state is reduced to 0, the lock can be released. Only then can owner be set to null and return true.
Take a look at the implementation:
Protected final boolean tryRelease (int releases) {int c = getState ()-releases; / / determines whether the current thread is the thread that acquired the lock, and if not, throws an exception / / only the thread that acquired the lock releases the lock if (Thread.currentThread ()! = getExclusiveOwnerThread ()) throw new IllegalMonitorStateException (); boolean free = false / / the number of times is 0, saying that the lock has been released if (c = = 0) {free = true; / / after the release, the current thread is set to null setExclusiveOwnerThread (null);} / / update the number of reentrants setState (c); return free;}
AQS.unparkSuccessor
After the lock is successfully released, the next thing to do is to wake up the following process, which is implemented in AQS
The main logic is:
Gets the current node status, which is set to 0 if it is less than 0
Gets the next node of the current node, and wakes up directly if it is not empty
If it is empty, or if the node state is greater than 0, look for the next node whose state is less than 0
The concrete implementation of the code
Private void unparkSuccessor (Node node) {/ / get the status of the current node int ws = node.waitStatus; / / if the node state is less than 0, set the CAS operation to 0 if (ws < 0) compareAndSetWaitStatus (node, ws, 0); / / get the next node of the current node s Node s = node.next / / if s is empty, start from the tail node, or the s.waitStatus is greater than 0, indicating that the node is canceled / / starting from the tail node, find the nearest waitStatus 0 to the head node) {s = null; for (Node t = tail; t! = null & & t! = node; t = t.prev) if (t.waitStatus)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.