In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
Editor to share with you how to implement Condition's await and signal wait / notify mechanism. I hope you will get something after reading this article. Let's discuss it together.
Introduction to 1.Condition
Any java object inherits from the Object class, and several methods of Object are often applied to realize communication between threads, such as wait (), wait (long timeout), wait (long timeout, int nanos) and notify (), notifyAll (). Similarly, there are still the same methods to implement wait / notification mechanism in java Lock system. On the whole, wait and notify/notify of Object cooperate with object monitor to complete the waiting / notification mechanism between threads, while Condition cooperates with Lock to complete the waiting notification mechanism, the former is at the bottom level of java, and the latter is at the language level and has higher controllability and expansibility. Apart from the differences in the way they are used, there are still many differences in functional features:
Condition can support not responding to interrupts, but not by using Object
Condition can support multiple waiting queues (new multiple Condition objects), while Object mode can only support one
Condition supports the setting of timeout, but Object does not
Referring to Object's wait and notify/notifyAll methods, Condition provides the same method:
Wait method for Object
Void await () throws InterruptedException: the current thread enters the waiting state. If another thread calls the signal or signalAll method of condition and the current thread gets Lock and returns from the await method, an interrupted exception will be thrown if it is interrupted in the waiting state.
Long awaitNanos (long nanosTimeout): the current thread enters a waiting state until notified, interrupted, or timed out
Boolean await (long time, TimeUnit unit) throws InterruptedException: same as the second, supports custom time units
Boolean awaitUntil (Date deadline) throws InterruptedException: the current thread enters a waiting state until notified, interrupted, or at a certain time
Notify/notifyAll method for Object
Void signal (): wakes up a thread waiting on the condition, transfers the thread from the waiting queue to the synchronization queue, and returns from the wait method if it can compete for Lock in the synchronization queue.
Void signalAll (): differs from 1 in that it wakes up all threads waiting on the condition.
Analysis of 2.Condition implementation principle 2.1 waiting queue
If you want to have an in-depth grasp of condition, you should still know how to implement it. Now let's take a look at the source code of condiiton. A condition object is created through lock.newCondition (), and this method actually new a ConditionObject object, which is an inner class of AQS, which you can take a look at if you are interested. As we said earlier, condition is to be used with lock, that is, condition and Lock are bound together, and the implementation of lock depends on AQS, so there is nothing wrong with ConditionObject as an inner class of AQS.
We know that in the implementation of the lock mechanism, AQS maintains a synchronization queue internally. If it is an exclusive lock, the tail of all threads that failed to acquire the lock is inserted into the synchronization queue. Similarly, a waiting queue is maintained inside condition in the same way, and all threads calling the condition.await method join the waiting queue and the thread state changes to waiting state. Also notice that there are two member variables in ConditionObject:
/ * First node of condition queue. * / private transient Node firstWaiter;/** Last node of condition queue. * / private transient Node lastWaiter
So we can see that ConditionObject manages the waiting queue by holding the header and tail pointers of the waiting queue. The main thing to note is that the Node class reuses the Node class in AQS, and its node status and related properties can be seen. If you read this article carefully, it will be easy to understand condition, and there will be a qualitative improvement in the implementation of the lock system. The Node class has this property:
/ / successor node Node nextWaiter
Further, the waiting queue is an one-way queue, while when I said AQS before, I knew that the synchronization queue was a two-way queue. Next, we use a demo to go in through debug to see if it fits our guess:
Public static void main (String [] args) {for (int I = 0; I)
< 10; i++) { Thread thread = new Thread(() ->{lock.lock (); try {condition.await ();} catch (InterruptedException e) {e.printStackTrace ();} finally {lock.unlock ();}}); thread.start ();}}
This code doesn't make any practical sense, even smelly, just to illustrate what we were just thinking. Ten new threads are created. No thread first acquires the lock, and then calls the condition.await method to release the lock to add the current thread to the waiting queue. Through debug control, when you go to the 10th thread, you can view the firstWaiter, that is, the head node in the waiting queue. The scenario in debug mode is as follows:
From this picture, we can see clearly the following points:
1. After calling the condition.await method, the thread is inserted into the waiting queue in turn. As shown in the figure, the thread reference in the queue is Thread-0,Thread-1,Thread-2....Thread-8.
two。 The waiting queue is an one-way queue. Through our conjecture and experimental verification, we can get the schematic diagram of the waiting queue as follows:
It is also important to note that we can call the lock.newCondition () method multiple times to create multiple condition objects, that is, a lock can hold multiple waiting queues. The previous way of using Object actually meant that there was only one synchronization queue and one wait queue on the object Object object monitor, while the Lock in the concurrent package had one synchronization queue and multiple wait queues. The schematic diagram is as follows:
As shown in the figure, ConditionObject is an inner class of AQS, so each ConditionObject can access the methods provided by AQS, as if each Condition has a reference to its own synchronizer.
2.2 principle of await implementation
When the condition.await () method is called, the thread that currently obtains the lock enters the waiting queue, and if the thread can return from the await () method, it must have obtained the lock associated with the condition. Next, we still look at it from the perspective of the source code. Only when we are familiar with the logic of the source code can we have the deepest understanding. The source code of the await () method is:
Public final void await () throws InterruptedException {if (Thread.interrupted ()) throw new InterruptedException (); / / 1\. Wrap the current thread as Node and insert the tail into the waiting queue Node node = addConditionWaiter (); / / 2\. Release the lock occupied by the current thread, which will wake up the next node in the synchronization queue int savedState = fullyRelease (node); int interruptMode = 0; while (! isOnSyncQueue (node)) {/ / 3\. The current thread enters the waiting state LockSupport.park (this); if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break;} / / 4\. Spin waits for getting the synchronization status (i.e. lock) if (acquireQueued (node, savedState) & & interruptMode! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter! = null) / / clean up if cancelled unlinkCancelledWaiters (); / / 5\. Processing interrupted cases if (interruptMode! = 0) reportInterruptAfterWait (interruptMode);}
The main logic of the code please see the comments, we all know that when the front-line program calls the condition.await () method, it will make the current thread release lock and then join the waiting queue, until it is signal/signalAll, it will make the current thread move from the waiting queue to the synchronous queue, and will not return from the await method until it has obtained the lock, or will do interrupt handling when it is interrupted while waiting. Then we will have the following questions about this implementation process: 1. How is the current thread added to the waiting queue? two。 The process of releasing the lock? 3. How can I exit from the await method? And the logic of this code is to tell us the answers to these three questions. For more information, call addConditionWaiter in step 1 to add the current thread to the waiting queue. The source code for this method is:
Private Node addConditionWaiter () {Node t = lastWaiter; / / If lastWaiter is cancelled, clean out. If (t! = null & & t.waitStatus! = Node.CONDITION) {unlinkCancelledWaiters (); t = lastWaiter;} / / wraps the current thread as Node Node node = new Node (Thread.currentThread (), Node.CONDITION); if (t = null) firstWaiter = node; else / / insert t.nextWaiter = node; / / Update lastWaiter lastWaiter = node; return node;}
This code is easy to understand, wrapping the current node as Node, and if the firstWaiter of the waiting queue is null (the waiting queue is listed as an empty queue), point the firstWaiter to the current Node, otherwise, update the lastWaiter (tail node). Is to insert the Node encapsulated by the current thread into the waiting queue by tail insertion. At the same time, you can see that the waiting queue is a chain queue without a lead node. Before we learned AQS, we knew that the synchronization queue was a chain queue with a lead node, which is a difference between the two. Inserting the current node after waiting for alignment causes the current thread to release lock, which is implemented by the fullyRelease method. The fullyRelease source code is:
Final int fullyRelease (Node node) {boolean failed = true; try {int savedState = getState (); if (release (savedState)) {/ / successfully released synchronization status failed = false; return savedState;} else {/ / failed to release synchronization status and threw an exception throw new IllegalMonitorStateException () }} finally {if (failed) node.waitStatus = Node.CANCELLED;}}
This code is easy to understand, calling the template method release of AQS releases the synchronization state of AQS and wakes up the thread referenced by the successor node of the header node in the synchronization queue, returns normally if the release is successful, and throws an exception if it fails. So far, these two pieces of code have solved the answers to the previous two questions, leaving the third question, how to exit from the await method? Looking back now, the await method has this logic:
While (! isOnSyncQueue (node)) {/ / 3. The current thread enters the waiting state LockSupport.park (this); if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break;}
Obviously, when a thread calls the condition.await () method for the first time, it enters the while () loop, and then puts the current thread into a waiting state through the LockSupport.park (this) method, so the first prerequisite for exiting the await method is naturally to exit the while loop first, leaving only two places for exit: 1. The logic goes to break and exits the while loop; 2. The logic judgment in the while loop is false.
The first condition of the code is that the current waiting thread is interrupted and the descendant code will go to break to exit, and the second case is that the current node is moved to the synchronization queue (that is, the signal or signalAll method of condition called by another thread), and the while loop ends after the logic in while is judged to be false. To sum up, after the current thread is interrupted or the current node is moved to the synchronization queue by calling the condition.signal/condition.signalAll method, this is a prerequisite for the current thread to exit the await method.
AcquireQueued (node, savedState) is called after exiting the while loop. As mentioned in the introduction to the underlying implementation of AQS, you can go if you are interested. The function of this method is that the thread keeps trying to get the synchronization state during the spin process until it succeeds (the thread gets the lock). This also indicates that the exit await method must be a lock that has already obtained a condition reference (association). So far, we have completely found the answers to the first three questions by reading the source code, and we have a deeper understanding of the await method. The schematic diagram of the await method is shown below:
As shown in the figure, the thread calling the condition.await method must have obtained the lock, that is, the current thread is the header node in the synchronization queue. Calling this method inserts the end of the Node encapsulated by the current thread into the waiting queue.
Support for timeout mechanism
Condition also supports a timeout mechanism that allows consumers to call the method awaitNanos,awaitUtil. The implementation principle of these two methods is basically the same as the tryAcquire method in AQS, about tryAcquire can be read carefully.
Support that does not respond to interrupts
To not respond to interrupts, you can call the condition.awaitUninterruptibly () method, whose source code is:
Public final void awaitUninterruptibly () {Node node = addConditionWaiter (); int savedState = fullyRelease (node); boolean interrupted = false; while (! isOnSyncQueue (node)) {LockSupport.park (this); if (Thread.interrupted ()) interrupted = true;} if (acquireQueued (node, savedState) | | interrupted) selfInterrupt ();}
This method is basically the same as the await method above, except that it reduces the handling of interrupts and omits the reportInterruptAfterWait method to throw interrupted exceptions.
2.3 principle of signal/signalAll implementation
Call the signal or signalAll method of condition to move the node with the longest waiting time in the waiting queue to the synchronous queue, so that the node can have a chance to get the lock. The waiting queue is first-in, first-out (FIFO), so the head node of the waiting queue must be the node with the longest waiting time, that is, every time the signal method of condition is called, the head node is moved to the synchronous queue. Let's look at the source code to see if this conjecture is correct. The source code of the signal method is:
Public final void signal () {/ 1\. First check whether the current thread has obtained lock if (! isHeldExclusively ()) throw new IllegalMonitorStateException (); / / 2\. Get the first node in the waiting queue, and all subsequent operations are for this node Node first = firstWaiter; if (first! = null) doSignal (first);}
The signal method first detects whether the current thread has obtained the lock, throws an exception directly if it does not get the lock, and gets the node referenced by the header pointer of the waiting queue if it is obtained, and the doSignal method of the subsequent operation is also based on this node. Let's take a look at what the doSignal method does. The source code for the doSignal method is:
Private void doSignal (Node first) {do {if ((firstWaiter = first.nextWaiter) = null) lastWaiter = null; / / 1\. Remove the header node from the waiting queue first.nextWaiter = null; / / 2\. The transferForSignal method in while does real processing on the header node} while (! transferForSignal (first) & & (first = firstWaiter)! = null);}
For the specific logic, please see the notes. The logic to really deal with the header node is put in transferForSignal. The source code of this method is:
Final boolean transferForSignal (Node node) {/ * * If cannot change waitStatus, the node has been cancelled. * / 1. Update status is 0 if (! compareAndSetWaitStatus (node, Node.CONDITION, 0)) return false; / * * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). * / / 2. Move the node to the synchronization queue to Node p = enq (node); int ws = p.waitStatus; if (ws > 0 | |! compareAndSetWaitStatus (p, ws, Node.SIGNAL) LockSupport.unpark (node.thread); return true;}
For the key logic, please see the comments. This code mainly does two things. Change the state of the header node to CONDITION;2. Call the enq method to insert the end of the node into the synchronization queue. For the enq method, see the article on the underlying implementation of AQS. Now we can conclude that the prerequisite for calling the signal of condition is that the current thread has obtained the lock, and this method will make the head node in the waiting queue, that is, the node with the longest waiting time, be moved to the synchronous queue, and only after moving to the synchronous queue will there be a chance to wake up the waiting thread, that is, to return from the LockSupport.park (this) method in the await method, so as to make the thread calling the await method exit successfully. The schematic diagram of signal execution is shown below:
SignalAll
The difference between the sigllAll method and the sigal method is reflected in the doSignalAll method. We already know that the doSignal method only operates on the head node of the waiting queue, while the source code of doSignalAll is:
Private void doSignalAll (Node first) {lastWaiter = firstWaiter = null; do {Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal (first); first = next;} while (first! = null);}
This method simply waits for each node in the queue to be moved to the synchronization queue, that is, to "notify" every thread currently calling the condition.await () method.
3. Thinking about the combination of await and signal/signalAll
At the beginning of the article, the wait / notification mechanism is mentioned, which can be realized by using the await and signal/signalAll methods provided by condition, and this mechanism can solve the most classic problem is "producer and consumer problem". After that, a separate article will be used to explain the "producer consumer problem", which is also a high-frequency test site for the interview. The await and signal and signalAll methods are like a switch that controls thread A (waiting party) and thread B (notifying party). The relationship between them can be shown more appropriately with the following diagram:
As shown in the figure, thread awaitThread first successfully acquires the lock through the lock.lock () method, then calls the condition.await method to enter the waiting queue, while another thread signalThread calls the condition.signal or signalAll method after successfully acquiring the lock through the lock.lock () method, so that the thread awaitThread can have the opportunity to move to the synchronous queue, and when other threads release the lock, the thread awaitThread can have the opportunity to obtain the lock. This allows the thread awaitThread to exit from the await method to perform subsequent operations. If awaitThread fails to get lock, it will go directly to the synchronization queue.
3. An example
Let's talk about the use of condition with a very simple example:
Public class AwaitSignal {private static ReentrantLock lock = new ReentrantLock (); private static Condition condition = lock.newCondition (); private static volatile boolean flag = false; public static void main (String [] args) {Thread waiter = new Thread (new waiter ()); waiter.start (); Thread signaler = new Thread (new signaler ()); signaler.start () } static class waiter implements Runnable {@ Override public void run () {lock.lock (); try {while (! flag) {System.out.println (Thread.currentThread (). GetName () + "current condition does not meet waiting"); try {condition.await () } catch (InterruptedException e) {e.printStackTrace ();}} System.out.println (Thread.currentThread () .getName () + "received notification condition met");} finally {lock.unlock () Static class signaler implements Runnable {@ Override public void run () {lock.lock (); try {flag = true; condition.signalAll ();} finally {lock.unlock ();}
The output is as follows:
The current condition of Thread-0 does not meet waiting for Thread-0 to receive notification. The condition is satisfied.
Open two threads waiter and signaler,waiter threads start execution because the conditions are not met, execute the condition.await method to make the thread enter the waiting state and release the lock, the explorer thread changes the condition after acquiring the lock, and notifies all waiting threads to release the lock. At this point, the waiterthread acquires the lock and continues to execute because the handler thread has changed the condition and the condition is satisfied relative to the waiter.
After reading this article, I believe you have a certain understanding of "how to achieve Condition await and signal waiting / notification mechanism". If you want to know more about it, you are welcome to follow the industry information channel. Thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.