In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article is to share with you the content of the sample analysis of AbstractQueuedSynchronizer source code monopoly mode of Java concurrency. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
How nodes are queued in the synchronization queue in exclusive mode and what will be done before leaving the synchronization queue. AQS provides three ways to acquire locks in exclusive mode and shared mode respectively: do not respond to thread interrupt acquisition, response thread interrupt acquisition, set timeout acquisition. The overall steps of the three ways are roughly the same, with only a few differences, so understanding one way and looking at the other ways of implementation is more or less the same. In this article, I will focus on how to get non-responsive thread interrupts, and the other two ways will also talk about inconsistencies.
1. How do I acquire a lock without responding to thread interrupts?
/ / get (exclusive mode) public final void acquire (int arg) {if (! tryAcquire (arg) & & acquireQueued (addWaiter (Node.EXCLUSIVE), arg)) {selfInterrupt ();}}
Although it looks simple, the above code performs the four steps shown in the following figure in order. Next, we will demonstrate and analyze step by step.
Step 1:! tryAcquire (arg)
/ / attempt to acquire lock (exclusive mode) protected boolean tryAcquire (int arg) {throw new UnsupportedOperationException ();}
At this time, a man came. He first tried to knock on the door. If he found the door unlocked (tryAcquire (arg) = true), he went straight in. If the door is found to be locked (tryAcquire (arg) = false), proceed to the next step. This tryAcquire method determines when the lock is open and when the lock is closed. This method must allow the subclass to override and rewrite the judgment logic.
Step 2: addWaiter (Node.EXCLUSIVE)
/ / package the current thread as a node and add it to the tail of the synchronization queue private Node addWaiter (Node mode) {/ / specify the mode holding the lock Node node = new Node (Thread.currentThread (), mode); / / get the synchronization queue tail node reference Node pred = tail; / / if the tail node is not empty, it indicates that the synchronization queue already has a node if (pred! = null) {/ / 1. Point to the current tail node node.prev = pred; / / 2. Set the current node to the tail node if (compareAndSetTail (pred, node)) {/ / 3. Point the successor of the old tail node to the new tail node pred.next = node; return node;}} / / otherwise it indicates that the synchronization queue has not initialized enq (node); return node;} / / node queue operation private Node enq (final Node node) {for (;;) {/ / get the synchronization queue tail node reference Node t = tail / / if the tail node is empty, the synchronization queue has not been initialized if (t = = null) {/ / initialize the synchronization queue if (compareAndSetHead (new Node () {tail = head;}} else {/ / 1. Point to the current tail node node.prev = t; / / 2. Set the current node to the tail node if (compareAndSetTail (t, node)) {/ / 3. Point the successor of the old tail node to the new tail node t.next = node; return t;}
The execution of this step indicates that the lock failed for the first time, then the person will get a number plate for himself to enter the queuing area and queue up, and will declare how he wants to occupy the room (exclusive mode or sharing mode). Notice that he didn't sit down and rest (hang himself) at this time.
Step 3: acquireQueued (addWaiter (Node.EXCLUSIVE), arg)
/ / acquire lock (exclusive mode) final boolean acquireQueued (final Node node, int arg) {boolean failed = true; try {boolean interrupted = false; for (;;) {/ / get the reference final Node p = node.predecessor () of the previous node of a given node in an uninterruptible manner / / if the current node is the first node in the synchronization queue, try to acquire the lock if (p = = head & & tryAcquire (arg)) {/ / set the given node to the head node setHead (node); / / to help with garbage collection, empty the successor of the previous head node p.next = null / / set to get the success status failed = false; / / return the status of the interrupt. The exit return interrupted is only here when the whole loop is executed. } / / otherwise, the state of the lock is still unobtainable, so it is judged whether the current thread can be suspended / / if the result is true, the current thread can be suspended, otherwise the loop will continue, during which the thread does not respond to the interrupt if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt ()) {interrupted = true } finally {/ / in the end, make sure that if the acquisition fails, cancel the acquisition of if (failed) {cancelAcquire (node);} / / determine whether the current node can be suspended private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) {/ / get the wait state of the previous node int ws = pred.waitStatus / / if the status of the previous node is SIGNAL, it indicates that the previous node will wake up the current node, so the current node can safely suspend if (ws = = Node.SIGNAL) {return true;} if (ws > 0) {/ / the following operation is to clean up all cancelled previous nodes in the synchronization queue do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0) Pred.next = node;} else {/ / here means that the status of the previous node is not SIGNAL, and it is likely to be equal to 0, so the previous node will not wake up the current node / / so the current node must ensure that the status of the previous node is SIGNAL before it can safely suspend its own compareAndSetWaitStatus (pred, ws, Node.SIGNAL);} return false } / / suspend the current thread private final boolean parkAndCheckInterrupt () {LockSupport.park (this); return Thread.interrupted ();}
After getting the number plate and entering the queuing area, this method will be implemented immediately. When a node enters the queuing area for the first time, there are two situations: one is to find that the person in front of him has left his seat and entered the room, then he will not sit down and rest. He will knock on the door again to see if the kid is done. If the person inside happens to come out, he will rush in without asking himself. Otherwise, you will have to consider sitting down and having a rest for a while, but he is still worried. What if no one reminds him after he sits down and falls asleep? He left a small note on the man's seat in front of him so that the person who came out of it could wake him up when he saw it. In another case, when he enters the queuing area and finds several people waiting in front of him, he can sit down and take a nap for a while. But before that, he will leave a note on the seat of the person in front (who is already asleep) so that he can wake himself up before leaving. When everything is done, he sleeps with peace of mind. Notice that there is only one exit for the whole for loop, that is, after the thread successfully acquires the lock, it is hung in the parkAndCheckInterrupt () method of the for loop until the lock is acquired. It is also from this place that the thread continues to execute the for loop after being awakened.
Step 4: selfInterrupt ()
/ / the current thread will interrupt private static void selfInterrupt () {Thread.currentThread () .interrupt ();}
Since the whole thread above has been hanging in the parkAndCheckInterrupt () method of the for loop, it does not respond to any form of thread interruption until the lock is successfully acquired, and only when the thread successfully acquires the lock and comes out of the for loop will he check to see if anyone has asked to interrupt the thread during this period, and if so, call the selfInterrupt () method to suspend himself.
two。 How do I acquire a lock in response to a thread interrupt?
/ / acquire locks in interruptible mode (exclusive mode) private void doAcquireInterruptibly (int arg) throws InterruptedException {/ / add the current thread as a node to the synchronization queue final Node node = addWaiter (Node.EXCLUSIVE); boolean failed = true; try {for (;;) {/ / get the previous node of the current node final Node p = node.predecessor () / / if p is a head node, then the current thread attempts to acquire the lock if again (p = = head & & tryAcquire (arg)) {setHead (node); p.next = null; / / help GC failed = false; / / returns return after successfully acquiring the lock } / / suspend the current thread if the condition is met, respond to the interrupt and throw an exception if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt ()) {/ / if the thread wakes up and throws an exception throw new InterruptedException () if it finds an interrupt request;} finally {if (failed) {cancelAcquire (node);}
The lock acquisition process is roughly the same for the response thread interrupt mode and the non-response thread interrupt mode. The only difference is that the thread wakes up from the parkAndCheckInterrupt method to check whether the thread is interrupted, and if so, it throws an InterruptedException exception, while acquiring the lock without responding to the thread interrupt only sets the interrupt state after receiving the interrupt request, and does not immediately end the current method of acquiring the lock, until the node successfully acquires the lock before deciding whether to suspend itself according to the interrupt state.
3. How do I set the timeout to acquire the lock?
/ / acquire the lock (exclusive mode) private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException {/ / acquire the current time of the system long lastTime = System.nanoTime () with a limited timeout; / / package the current thread as a node and add it to the synchronization queue final Node node = addWaiter (Node.EXCLUSIVE); boolean failed = true; try {for (;) {/ / get the successor node of the current node final Node p = node.predecessor () / / if the successor is the head node, the current thread attempts to acquire the lock if again (p = = head & & tryAcquire (arg)) {/ / update the head node setHead (node); p.next = null; failed = false; return true } / / exit the loop if (nanosTimeout spinForTimeoutThreshold) directly when the timeout runs out {/ / suspend the current thread for a period of time, and then wake up to LockSupport.parkNanos (this, nanosTimeout);} / / get the current system time long now = System.nanoTime () / / the timeout subtracts the lock acquisition interval nanosTimeout each time-= now-lastTime; / / update lastTime lastTime = now; / / throw an exception if (Thread.interrupted ()) {throw new InterruptedException () on receiving an interrupt request during lock acquisition;} finally {if (failed) {cancelAcquire (node);}
Setting timeout acquisition will first acquire the lock, and after the first failure to acquire the lock, depending on the situation, if the timeout time passed is greater than the spin time, then the thread will be suspended for a period of time, otherwise it will spin. After each lock acquisition, the timeout time is subtracted from the time it takes to acquire a lock. Until the timeout is less than 0, which means that the timeout has run out, then the lock acquisition operation ends and the failure flag is returned. Note that thread interrupt requests can be responded to in the process of acquiring a lock with a timeout.
4. How does a thread release the lock and leave the synchronization queue?
/ / release lock operation (exclusive mode) public final boolean release (int arg) {/ / switch the password lock to see if you can unlock if (tryRelease (arg)) {/ / get the head node Node h = head / / if the head node is not empty and the wait state is not equal to 0, wake up the successor node if (h! = null & & h.waitStatus! = 0) {/ / wake the successor node unparkSuccessor (h);} return true;} return false;} / / Wake up the successor node private void unparkSuccessor (Node node) {/ / get the wait state int ws = node.waitStatus of the given node / / Update the wait status to 0 if (ws
< 0) { compareAndSetWaitStatus(node, ws, 0); } //获取给定结点的后继结点 Node s = node.next; //后继结点为空或者等待状态为取消状态 if (s == null || s.waitStatus >0) {s = null; / / traverse the queue from back to forward to find the first node for that is not canceled (Node t = tail; t! = null & & t! = node; t = t.prev) {if (t.waitStatus)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.