In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the knowledge of "how to use JUC's AQS queue Synchronizer". In the operation of actual cases, many people will encounter this dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
AbstractQueuedSynchronizer is referred to as AQS, which may not be used directly, but it is the core basic component of JUC, supporting the implementation of java locks and synchronizers, such as ReentrantLock, ReentrantReadWriteLock, CountDownLatch, and Semaphore. When designing the JUC package, Doug Lea hopes to abstract a basic and general component to support the implementation of the upper module, so AQS arises at the historic moment.
AQS is essentially a two-way queue of FIFO, and threads are packaged in the form of nodes, waiting in the queue to acquire resources based on the spin mechanism (the resources here can be simply understood as object locks). AQS implements two types of queues in design, namely synchronous queues and conditional queues, in which synchronous queues serve threads blocking waiting for resources, while conditional queues serve threads entering a waiting state because some conditions are not satisfied. The thread in the conditional queue has actually obtained the resource, but there is no condition to continue execution, so it is put into the conditional queue and releases the held resources to allow other threads to execute. If the conditions are met at some point in the future, the thread will be transferred from the conditional queue to the synchronous queue and continue to compete for resources to continue to execute downwards.
In this paper, we mainly analyze the design and implementation of AQS, including LockSupport utility class, synchronization queue, conditional queue, and the general process of AQS resource acquisition and release. AQS uses the template method to design the pattern, and the specific process of obtaining and releasing resources is implemented by subclasses. The analysis of these methods will be left to later articles to analyze specific subclasses.
LockSupport utility class
The LockSupport utility class is the basic component of JUC, which is mainly used to block and wake up threads, and the underlying layer depends on the implementation of the Unsafe class. LockSupport mainly defines two types of methods: park and unpark, where the park method is used to block the current thread and the unpark method is used to wake up the specified thread in the blocking state.
The following example demonstrates the basic use of the park and unpark methods:
Thread thread = new Thread (()-> {System.out.println ("Thread start:" + Thread.currentThread (). GetName ()); LockSupport.park (); / / blocking oneself System.out.println ("Thread end:" + Thread.currentThread (). GetName ();}); thread.setName ("A"); thread.start (); System.out.println ("Main thread sleep 3 second:" + Thread.currentThread (). GetId ()) TimeUnit.SECONDS.sleep (3); LockSupport.unpark (thread); / / Wake up thread A
Thread A calls the LockSupport#park method to block itself after startup, and the main thread calls the LockSupport#unpark method thread A to wake up after a 3-second break. Running result:
Thread start: AMain thread sleep 3 second: 1Thread end: A
LockSupport provides several implementations for the park method, as follows:
Public static void park () public static void park (Object blocker) public static void parkNanos (long nanos) public static void parkNanos (Object blocker, long nanos) public static void parkUntil (long deadline) public static void parkUntil (Object blocker, long deadline)
It is not difficult to see from the method name that both parkNanos and parkUntil belong to the timeout version of the park method. The difference is that the parkNanos method receives a time value in nanosecond units, which is used to specify the length of time for blocking. For example, when setting nanos=3000000000, the thread will wake up after blocking for 3 seconds, while the parkUntil method receives a timestamp, and the parameter deadline is used to specify the expiration time of blocking.
All park methods provide overloaded versions that contain the Object blocker parameter, which refers to the lock object that causes the current thread to block waiting, which is convenient for problem troubleshooting and system monitoring, but this was ignored when LockSupport was originally designed, resulting in the inability to provide information about blocking objects during thread dump, which was improved in java 6. If the LockSupport utility class is used in actual development, it is recommended to use the version with the blocker parameter.
Let's take the LockSupport#park (java.lang.Object) method as an example to look at the specific implementation, as follows:
Public static void park (Object blocker) {/ / get the current thread object Thread t = Thread.currentThread (); / / record the lock object of the current thread blocking waiting (set the parkBlocker of the thread object as the blocker object specified by the parameter) setBlocker (t, blocker); / / block thread UNSAFE.park (false, 0L) / / the thread resumes running and clears the lock object setBlocker (t, null) recorded by the parkBlocker parameter;}
The specific implementation is relatively simple, and the operation of blocking threads depends on the implementation of the Unsafe class. The above method calls the LockSupport#setBlocker method to record the blocker object specified by the parameter into the Thread#parkBlocker field of the current thread object based on the Unsafe class, then enters the blocking state, and clears the corresponding Thread#parkBlocker field after being awakened.
When a thread calls the park method to enter the blocking state, it wakes up from the blocking state when one of the following three conditions is met:
Other threads call the unpark method to wake up the current thread.
Other threads interrupt the blocking state of the current thread.
The method park exits for some illogical reason.
The thread does not carry a specific return reason when it returns from the park method, and the caller needs to detect it himself, such as checking again whether the conditions for the previous call to the park method are still met to speculate.
The implementation of the method LockSupport#unpark is also based on the Unsafe class implementation, which is different from the multi-version implementation of park. LockSupport only provides a single implementation for the unpark method, as follows:
Public static void unpark (Thread thread) {if (thread! = null) {UNSAFE.unpark (thread);}}
It is important to note that if the unpark method is called in advance for a thread, the thread continuing to call the park method does not enter a blocking state, but returns immediately, and the park method is non-reentrant.
Synchronization queue
The role of the synchronization queue is to manage the threads of competing resources. when a thread fails to compete for resources, it will be recorded at the end of the synchronization queue and cycle through the spin to check that the resources can be successfully obtained. The synchronization queue of AQS is designed and implemented based on the idea of CLH (Craig, Landin, and Hagersten) lock. CLH lock is a kind of scalable, high-performance and fair spin lock based on linked list. Threads are organized in the form of linked list nodes, perform spins independently of each other during waiting, and constantly poll the status of the precursor node, and attempt to obtain if it is found that the thread on the precursor node has released resources.
CLH lock is the basis of the implementation of AQS queue synchronizer. AQS defines the synchronization queue node in the form of internal class Node, including the conditional queue introduced in the next section. The node is also defined by Node. The fields of Node are defined as follows:
Static final class Node {/ * * pattern definition * / static final Node SHARED = new Node (); static final Node EXCLUSIVE = null; / * * Thread status * / static final int CANCELLED = 1; static final int SIGNAL =-1; static final int CONDITION =-2; static final int PROPAGATE =-3; / * Thread wait status * / volatile int waitStatus; / * * precursor Node * / volatile Node prev / * * Post node * / volatile Node next; / * * Thread object * / volatile Thread thread; / * * for exclusive mode, point to the next node in CONDITION waiting state For shared mode, the SHARED node * / Node nextWaiter; / /... Omit method definition}
As you can see from the above field definition, threads in the CLH linked list are waiting for resources in two modes, SHARED and EXCLUSIVE, where SHARED represents shared mode and EXCLUSIVE represents exclusive mode. The main difference between shared mode and exclusive mode is that only one thread can obtain resources at a time in exclusive mode, while multiple threads can obtain resources in shared mode at the same time. A typical scenario is a read-write lock, where multiple threads can obtain the read lock resource at the same time, while only one thread can obtain the write lock resource at the same time, and other threads will be blocked when they try to obtain the resource.
AQS's CLH lock defines four states for threads in the CLH linked list, including CANCELLED, SIGNAL, CONDITION, and PROPAGATE, which are recorded in the Node#waitStatus field. The meanings of these four states are:
CANCELLED: indicates that the current thread is in a canceled state, usually because the wait timed out or interrupted, and the canceled thread will no longer participate in the competition and remain in this state all the time.
SIGNAL: indicates that the thread on the successor node of the current node is waiting to be awakened. If the current thread releases the held resources or is cancelled, the thread on the successor node needs to be awakened.
CONDITION: indicates that the current thread is waiting for a condition. When a thread calls the Condition#signal method, the current node will be transferred from the conditional queue to the synchronization queue to compete for resources.
PROPAGATE: when a thread in this state releases a shared resource or receives a signal to release a shared resource, it needs to notify the successor node to prevent notification loss.
When a node is created, the initial value of the field Node#waitStatus is 0, indicating that the thread on the node is not in any of the above states.
In addition to the basic constructor, the Node class only defines two methods: Node#isShared and Node#predecessor. The former is used to return whether the current node is waiting in shared mode, and the latter is used to return the precursor node of the current node.
After introducing the definition of queue nodes, how to implement synchronization queues? This also depends on two field definitions in the AbstractQueuedSynchronizer class, namely:
Private transient volatile Node head;private transient volatile Node tail
Where head represents the head node of the synchronization queue, and tail represents the tail node of the synchronization queue, as shown in the following figure:
When calling the acquire method of AQS to obtain resources, if the resources are insufficient, the current thread will be encapsulated as a Node node and added to the end of the synchronization queue. The header node head is used to record the thread node that is currently holding the resource, and the successor node of head is the next thread node to be scheduled. When the release method is called, the thread on this node will be awakened and continue to obtain resources.
The implementation of synchronous queue nodes in and out of the queue will not be expanded first, but will be analyzed later when analyzing the process of obtaining and releasing AQS resources.
Conditional queue
In addition to the synchronization queue described above, a conditional queue is defined in AQS. The inner class ConditionObject implements the organizational form of the conditional queue, including a start node (firstWaiter) and an end node (lastWaiter), and also defines the node with the Node class described above, as follows:
Public class ConditionObject implements Condition, Serializable {/ * * points to the start node of the conditional queue * / private transient Node firstWaiter; / * * points to the end node of the conditional queue * / private transient Node lastWaiter; / /. Omit method definition}
Earlier, when analyzing the Node inner class, you can see that the Node class also defines a Node#nextWaiter field to point to the next waiting node in the conditional queue. From this, we can describe the organization of the conditional queue as follows:
The ConditionObject class implements the Condition interface, which defines thread communication methods related to Lock locks, which are mainly divided into two categories: await and signal.
When a thread calls the await method, the thread is wrapped as a node added to the end of the conditional queue and the held resources are released. When the condition is satisfied, the method signal can transfer one or all of the thread nodes in the conditional queue from the conditional queue to the synchronous queue to compete for resources. An application can create multiple ConditionObject objects, each of which corresponds to a conditional queue. For the same conditional queue, the threads in the queue wait for the same condition.
The definition of Condition interface is as follows:
Public interface Condition {void await () throws InterruptedException; void awaitUninterruptibly (); long awaitNanos (long nanosTimeout) throws InterruptedException; boolean await (long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil (Date deadline) throws InterruptedException; void signal (); void signalAll ();} wait: await
Let's take a look at the ConditionObject class's implementation of the Condition interface method. First, let's take a look at the ConditionObject#await method, which is used to add the current thread to a conditional queue to wait while supporting response interrupts. The method is implemented as follows:
Public final void await () throws InterruptedException {if (Thread.interrupted ()) {/ / immediately respond to interrupt throw new InterruptedException ();} / / add the current thread to the end of the waiting queue with the wait status of CONDITION Node node = this.addConditionWaiter (); / / release the resources held by the current thread int savedState = fullyRelease (node); int interruptMode = 0 While (! isOnSyncQueue (node)) {/ / Loop / blocks the current thread LockSupport.park (this) if the current node is in a conditional queue; / / exit loop if ((interruptMode = this.checkInterruptWhileWaiting (node))! = 0) {break if the thread is interrupted during blocking }} / / if it is interrupted while waiting in the synchronization queue, and the previous interrupt status is not THROW_IE if (acquireQueued (node, savedState) & & interruptMode! = THROW_IE) {interruptMode = REINTERRUPT;} if (node.nextWaiter! = null) {/ / clear all nodes in the conditional queue whose state is not CONDITION this.unlinkCancelledWaiters () } / / if the waiting period is interrupted, respond to the interrupt if (interruptMode! = 0) {this.reportInterruptAfterWait (interruptMode);}}
Because the ConditionObject#await method supports responding to interrupts, the method starts by checking whether the current thread is interrupted, and if so, throwing an InterruptedException exception, otherwise the current thread will continue to be added to the conditional queue to wait. The overall implementation process can be summarized as follows:
Add the current thread to the end of the conditional queue and set the wait state to CONDITION
Release resources held by the current thread to avoid hunger or deadlock
Wait in the conditional queue based on the spin mechanism until it is transferred to the synchronous queue by other threads or interrupted during the wait
If the waiting period is interrupted, the interrupt is responded.
ConditionObject defines two interrupt response modes: REINTERRUPT and THROW_IE. If it is REINTERRUPT, the thread will call the Thread#interrupt method to interrupt itself; if it is THROW_IE, the thread will directly throw an InterruptedException exception.
Let's continue to analyze several other methods that support the operation of ConditionObject#await, including addConditionWaiter, fullyRelease, isOnSyncQueue, and unlinkCancelledWaiters.
The method ConditionObject#addConditionWaiter is used to add the current thread as a Node node object to the end of the conditional queue, during which the thread node in the canceled state (waiting state is not CONDITION) in the conditional queue is cleared. The method is implemented as follows:
Private Node addConditionWaiter () {/ / get the end node of the condition queue Node t = lastWaiter; / / if the status of the end node is not CONDITION, the corresponding thread has cancelled the wait and needs to perform the cleanup operation if (t! = null & & t.waitStatus! = Node.CONDITION) {/ / clear all nodes in the condition queue whose state is not CONDITION () T = lastWaiter;} / / build the Node node corresponding to the current thread, wait status is CONDITION, and add to the end of the conditional queue Node node = new Node (Thread.currentThread (), Node.CONDITION); if (t = = null) {firstWaiter = node;} else {t.nextWaiter = node;} lastWaiter = node; return node;}
The process of adding the current thread object to the condition queue is essentially a simple linked list insert operation. Before performing the insert operation, the above method performs a clean operation on the condition queue to clear those nodes whose state is not CONDITION. The specific implementation is found in the ConditionObject#unlinkCancelledWaiters method:
Private void unlinkCancelledWaiters () {Node t = firstWaiter; Node trail = null; / / record the last node while (t! = null) {Node next = t.nextWaiter.If the thread wait state on the node is not CONDITION, delete the corresponding node if (t.waitStatus! = Node.CONDITION) {t.nextWaiter = null If (trail = = null) {firstWaiter = next;} else {trail.nextWaiter = next;} if (next = = null) {lastWaiter = trail;}} else {trail = t;} t = next;}}
Method AbstractQueuedSynchronizer#fullyRelease is used to release the resources held by the current thread, which is also very easy to understand. after all, the current thread is about to enter a waiting state, and if the held resources are not released, the program may eventually starve to death or deadlock. The method is implemented as follows:
Final int fullyRelease (Node node) {boolean failed = true; try {/ / get the synchronization status of the current thread, which can be understood as the number of resources held by the current thread int savedState = this.getState (); / / attempt to release the resources held by the current thread if (this.release (savedState)) {failed = false; return savedState } else {/ / failed to release resources throw new IllegalMonitorStateException ();}} finally {if (failed) {/ / if releasing resources fails, cancel the current thread node.waitStatus = Node.CANCELLED;}
If the resource release fails, the above method sets the state of the current thread to CANCELLED to exit the wait state.
The method AbstractQueuedSynchronizer#isOnSyncQueue is used to detect whether the current node is in the synchronization queue. The method is implemented as follows:
Final boolean isOnSyncQueue (Node node) {/ / return false if (node.waitStatus = = Node.CONDITION | | node.prev = = null) {return false;} / / If has successor, it must be on queue if (node.next! = null) {return true;} / * * node.prev can be non-null, but not yet on queue because the CAS to place it on queue can fail if the node is in the waiting queue. * So we have to traverse from tail to make sure it actually made it. It will always be near the tail in calls to this method, * and unless the CAS failed (which is unlikely), it will be there, so we hardly ever traverse much. * / / detect whether the target node is in the synchronization queue return this.findNodeFromTail (node) from back to front;}
If the condition that a thread is waiting for is met, the thread that triggers the condition transfers one or all of the thread nodes waiting for the condition from the conditional queue to the synchronization queue, at which point the threads exit from the ConditionObject#await method to compete for resources.
Methods ConditionObject#awaitNanos, ConditionObject#awaitUntil and ConditionObject#await (long, TimeUnit) introduce the timeout mechanism based on the ConditionObject#await method introduced above. When a thread waits in the conditional queue for more than the set value, the thread node will be transferred from the conditional queue to the synchronous queue to compete for resources. The other execution procedures are the same as the ConditionObject#await method, so they are no longer expanded.
Let's take a look at the ConditionObject#awaitUninterruptibly method, and you can see from the method name that the difference between this method and the ConditionObject#await method is that it does not respond to interrupts while waiting. The method is implemented as follows:
Public final void awaitUninterruptibly () {/ / add the current thread to the end of the waiting queue with the wait status of CONDITION Node node = this.addConditionWaiter (); / / release the resources held by the current thread int savedState = fullyRelease (node); boolean interrupted = false / / if the current node is in a conditional queue, the loop while (! isOnSyncQueue (node)) {/ / blocks the current thread LockSupport.park (this); if (Thread.interrupted ()) {/ / identifies that the thread is interrupted while waiting, but does not immediately respond to interrupted = true }} / / the resource is acquired by spin. If true is returned, the if (acquireQueued (node, savedState) | | interrupted) {/ / response interrupt selfInterrupt ();}}
If the thread is interrupted during the wait, the above method is recorded with a field and processed centrally at the end, without exiting the wait state because of the interrupt.
Notification: signal
Calling the await method adds the thread object itself to the conditional queue for waiting, while the signal notification method is used to transfer one or all waiting threads from the conditional queue to the synchronous queue to compete for resources. ConditionObject defines two notification methods: signal and signalAll, the former is used to transfer the header node of the conditional queue (that is, the node with the longest waiting time) from the conditional queue to the synchronous queue, and the latter is used to transfer all the nodes in the waiting state of the conditional queue from the conditional queue to the synchronous queue. Let's analyze the implementation of these two methods respectively.
The implementation of method ConditionObject#signal is as follows:
Public final void signal () {/ / first detects whether the current thread has acquired the lock, otherwise it is not allowed to continue to execute if (! isHeldExclusively ()) {throw new IllegalMonitorStateException ();} / / get the conditional queue header node, that is, the node with the longest waiting time Node first = firstWaiter If (first! = null) {/ / transfer the header node from the conditional queue to the synchronization queue to compete for the resource this.doSignal (first);}}
The thread calling the ConditionObject#signal method must be in the critical section, that is, it must first hold an exclusive lock, so the above method initially verifies this condition, and the method AbstractQueuedSynchronizer#isHeldExclusively is a template method that is implemented by a subclass. If the execution condition is met, the above method calls the ConditionObject#doSignal method to transfer the header node of the conditional queue from the conditional queue to the synchronous queue.
Private void doSignal (Node first) {/ / traverses from front to back until the first node that is not null is encountered and transferred from the conditional queue to the synchronization queue do {if ((firstWaiter = first.nextWaiter) = = null) {lastWaiter = null;} first.nextWaiter = null;} while (! transferForSignal (first) & & (first = firstWaiter)! = null) } / / AbstractQueuedSynchronizer#transferForSignalfinal boolean transferForSignal (Node node) {/ / update the waiting status of the current node: CONDITION-> 0 if (! compareAndSetWaitStatus (node, Node.CONDITION, 0)) {/ / update failed, indicating that the thread on the corresponding node has been canceled return false;} / * * Splice onto queue and try to set waitStatus of predecessor to indicate that thread is (probably) waiting. * If cancelled or attempt to set waitStatus fails, wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong). * / / add the node to the end of the synchronization queue and return the precursor node of the node Node p = this.enq (node); int ws = p.waitStatus; / / if the precursor node is canceled or failed to set the status of the precursor node to SIGNAL, wake up the thread if on the current node (ws > 0 |! compareAndSetWaitStatus (p, ws, Node.SIGNAL)) {LockSupport.unpark (node.thread) } return true;}
The method ConditionObject#doSignal traverses the conditional queue from going back, looking for the first node that is not null, and uses the AbstractQueuedSynchronizer#transferForSignal method to try to transfer it from the conditional queue to the synchronous queue.
Before entering the synchronization queue, the method AbstractQueuedSynchronizer#transferForSignal clears the CONDITION state of the node based on the CAS mechanism. If the cleanup fails, the thread on the node has been cancelled, and the ConditionObject#doSignal method will continue to look for the next node that can be awakened. If the state of the node is cleared successfully, the node is then added to the end of the synchronization queue, and the thread on the current node is awakened based on the status of the precursor node.
Let's move on to the implementation of the ConditionObject#signalAll method, which is characterized by waking up all wait nodes in the conditional queue that are not null compared to the ConditionObject#signal method described above. The method is implemented as follows:
Public final void signalAll () {if (! isHeldExclusively ()) {/ / first check whether the current thread has acquired the lock, otherwise throw new IllegalMonitorStateException () is not allowed to continue;} / / get conditional queue header node Node first = firstWaiter If (first! = null) {/ / transfer all nodes from conditional queue to synchronous queue to compete for resource this.doSignalAll (first);} private void doSignalAll (Node first) {lastWaiter = firstWaiter = null; do {Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal (first); first = next;} while (first! = null);}
In fact, it is natural to understand the operation mechanism of ConditionObject#doSignal, and then understand the operation mechanism of ConditionObject#signalAll.
Acquisition and release of resources
In the previous section, we analyzed the LockSupport utility class, as well as the design and implementation of AQS synchronization queue and conditional queue, which are the basic components that support the operation of AQS. In this section, we will formally begin to analyze the implementation mechanism of AQS.
The AbstractQueuedSynchronizer implementation class corresponding to AQS mainly contains four fields in the attribute definition (as follows). ExclusiveOwnerThread is inherited from the parent class AbstractOwnableSynchronizer and is used to record the thread object that currently holds an exclusive lock, while the head and tail fields point to the head node and tail node of the synchronization queue, respectively:
Private transient Thread exclusiveOwnerThread;private transient volatile Node head;private transient volatile Node tail;private volatile int state
The field state is used to describe the synchronization status and has different uses for different implementation classes:
For ReentrantLock, the number of times the current thread acquired the lock.
For ReentrantReadWriteLock, a high 16 bits indicates the number of times a read lock is acquired, and a low 16 bit indicates the number of times a write lock is acquired.
For Semaphore, the number of signals currently available.
For CountDownLatch, represents the current value of the counter.
We will explain the details in a later article that analyzes the implementation mechanism of the corresponding components.
AbstractQueuedSynchronizer is an abstract class, and the template method design pattern is introduced into the method design. The following code block lists all template methods that need to be implemented specifically by the subclass according to its own running mechanism:
Protected boolean tryAcquire (int arg) protected boolean tryRelease (int arg) protected int tryAcquireShared (int arg) protected boolean tryReleaseShared (int arg) protected boolean isHeldExclusively ()
Here is a brief description of the role of each method, and the specific implementation will be left for further analysis in the articles that analyze the various AQS-based implementation components:
TryAcquire: attempts to get the resource in exclusive mode, returns true if the acquisition is successful, and returns false otherwise.
TryRelease: attempts to release the resource in exclusive mode and returns true if the release is successful, false otherwise.
TryAcquireShared: try to acquire resources in shared mode. If a positive number is returned, the resource acquisition is successful, and there are remaining resources available. If 0 is returned, the resource acquisition is successful, but there are no remaining resources available. If a negative number is returned, the resource acquisition failed.
TryReleaseShared: attempts to release the resource in shared mode and returns true if the release is successful, false otherwise.
IsHeldExclusively: determines whether the current thread is monopolizing the resource, and returns true if so, false otherwise.
According to the functional division, the method implementation in AbstractQueuedSynchronizer can be divided into two categories: acquiring resources (acquire) and releasing resources (release), while distinguishing between exclusive mode and shared mode. The following sections mainly analyze the ways to obtain and release resources to distinguish between exclusive mode and shared mode.
Exclusive access to resources
For obtaining resources in exclusive mode, AbstractQueuedSynchronizer defines several versions of acquire method implementation, including acquire, acquireInterruptibly, and tryAcquireNanos. AcquireInterruptibly is the interrupt version of acquire, which supports responding to interrupt requests while waiting for resources to be acquired. TryAcquireNanos not only supports responding to interrupts, but also introduces a timeout waiting mechanism.
The following mainly analyzes the implementation of AbstractQueuedSynchronizer#acquire, understands the implementation mechanism of this method, and naturally understands the implementation mechanism of the other two versions. The implementation of method AbstractQueuedSynchronizer#acquire is as follows:
Public final void acquire (int arg) {if (! this.tryAcquire (arg) / / attempt to acquire a resource / / if the acquisition of a resource fails, the current thread is added to the end of the synchronization queue (exclusive mode) and waits for the resource to be acquired based on the spin mechanism & & this.acquireQueued (this.addWaiter (Node.EXCLUSIVE), arg) {/ / has been interrupted during the period of waiting for the resource to be acquired Respond to the interrupt selfInterrupt () after the resource is successfully obtained }}
The function of the method AbstractQueuedSynchronizer#tryAcquire has been briefly introduced earlier, which is used to attempt to obtain a resource. If the resource acquisition fails, the current thread is added to the synchronization queue and waits for the resource acquisition based on the spin mechanism.
The method AbstractQueuedSynchronizer#addWaiter is used to encapsulate the current thread object as a node and add it to the end of the synchronization queue, and finally return the thread node object:
Private Node addWaiter (Node mode) {/ / create a node object Node node = new Node (Thread.currentThread (), mode) for the current thread; / / try to quickly add nodes to the end of the synchronization queue based on CAS mechanism Node pred = tail; if (pred! = null) {node.prev = pred; if (this.compareAndSetTail (pred, node)) {pred.next = node Return node;}} / / Quick add failed. Continue to try to add the node to the end of the synchronization queue. Initialize this.enq (node) if the synchronization queue is not initialized; / / return the node object return node;} corresponding to the current thread
In the above method, when adding nodes, if the synchronization queue already exists, try to quickly add the current node to the end of the synchronization queue based on the CAS operation. If the add fails, or the queue does not exist, you need to call the AbstractQueuedSynchronizer#enq method again to perform the add operation, which initializes the synchronization queue when it is determined that the queue does not exist, and then attempts to insert a thread node into the end of the synchronization queue based on the CAS mechanism. The method is implemented as follows:
Private Node enq (final Node node) {for (;;) {/ / get the synchronization queue end node Node t = tail; / / if the node does not exist, initialize if (t = = null) {/ / Must initialize if (this.compareAndSetHead (new Node () {tail = head }} else {/ / append node.prev = t; if (this.compareAndSetTail (t, node)) {t.next = node; return t;}
After completing the synchronization queue operation of the node, the AbstractQueuedSynchronizer#acquireQueued method is called to wait for the resource to be obtained based on the spin mechanism. Instead of responding to the interrupt during the waiting period, the interrupt flag is recorded and the response is delayed after the resource acquisition is successful. The method is implemented as follows:
Final boolean acquireQueued (final Node node, int arg) {boolean failed = true; try {boolean interrupted = false; / / whether it is interrupted during the spin process / / based on the spin mechanism waiting to obtain the resource for (;) {/ / get the precursor node final Node p = node.predecessor () / / if the precursor node is the header node, the current node is at the top of the synchronization queue. You can try to obtain the resource if (p = = head & & this.tryAcquire (arg)) {/ / obtain the resource successfully, and update the header node this.setHead (node). / / the header node generally records the thread node holding the resource p.next = null; / / help GC failed = false; return interrupted / / whether it is interrupted during spinning} / / if it is not the current node's turn, or failed to obtain resources if (shouldParkAfterFailedAcquire (p, node) / / determine whether the current thread needs to be blocked & & this.parkAndCheckInterrupt ()) {/ / enter the blocking state if necessary And check the interrupt status when awakening / / indicate that the waiting period is interrupted interrupted = true } finally {/ / failed to acquire the resource, indicating that an exception was executed and the current node canceled the process of obtaining the resource if (failed) {this.cancelAcquire (node);}
The above method will iteratively check whether the current node is at the front end of the synchronization queue, and if so, call the AbstractQueuedSynchronizer#tryAcquire method to attempt to obtain the resource. The specific process of obtaining the resource is implemented by the subclass. If it is not the turn to schedule the current thread node during the spin, or if the attempt to obtain resources fails, the AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire method will be called to detect whether the current thread needs to be blocked. The specific decision process depends on the waiting state of the precursor node, as shown below:
Private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) {/ / get the status of the precursor node int ws = pred.waitStatus; if (ws = = Node.SIGNAL) {/ / the status of the precursor node is SIGNAL, indicating that the current node needs to be blocked return true } if (ws > 0) {/ / precursor node is canceled, then keep looking for the waiting node and do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0) behind it; pred.next = node } else {/ * * the status of the precursor node is 0 or PROPAGATE, but the current node needs a wake-up signal. * therefore, the wait state of the precursor node is set to SIGNAL based on CAS. Before blocking, the caller needs to retry to confirm that the resource cannot be obtained. * / compareAndSetWaitStatus (pred, ws, Node.SIGNAL);} return false;}
The above method first obtains the waiting state of the precursor node and makes a decision based on the specific state value:
If the wait state of the precursor node is SIGNAL, the current node needs to be blocked, so it returns true directly.
Otherwise, if the waiting state of the precursor node is greater than 0 (that is, it is canceled), it keeps looking for the node that has not been cancelled, and ranks the current node after it. In this case, directly return to false and try to get the resource again.
Otherwise, the status of the precursor node is 0 or PROPAGATE (it cannot be CONDITION, because it is currently in a synchronous queue). Because the current node needs a wake-up signal, the status of the precursor node is changed to SIGNAL. In this case, false is also returned to confirm that the resource cannot be obtained.
If the above check returns true, the AbstractQueuedSynchronizer#parkAndCheckInterrupt method is then called, blocking the current thread based on the LockSupport tool, and checking the interrupt status when the thread wakes up. If the period is interrupted, the interrupt flag is recorded without responding immediately until the resource is successfully obtained, or an abnormal exit spin occurs during the period. The method AbstractQueuedSynchronizer#acquireQueued eventually returns this interrupt flag and responds on the perimeter.
If an exception occurs during the spin, the above method executes AbstractQueuedSynchronizer#cancelAcquire to cancel the process of the current node waiting for resources, including setting the wait state of the node to CANCELLED, waking up the successor node, and so on.
Exclusive release of resources
For the exclusive mode of releasing resources, AbstractQueuedSynchronizer defines a single implementation, that is, the AbstractQueuedSynchronizer#release method, which is essentially a scheduling process, and the specific operation of releasing resources is completed by the tryRelease method and implemented by subclasses. The method AbstractQueuedSynchronizer#release is implemented as follows:
Public final boolean release (int arg) {/ / attempt to release resource if (this.tryRelease (arg)) {Node h = head; / / if the release is successful, attempt to wake up the successor node if (h! = null & & h.waitStatus! = 0) {this.unparkSuccessor (h);} return true;} return false;}
If tryRelease releases resources successfully, the above method attempts to wake up threads on the node closest to the header node from back to front in the synchronization queue. The implementation of method AbstractQueuedSynchronizer#unparkSuccessor is as follows:
Private void unparkSuccessor (Node node) {/ / get the current node status int ws = node.waitStatus; if (ws)
< 0) { // 如果当前结点未被取消,则基于 CAS 更新结点等待状态为 0 compareAndSetWaitStatus(node, ws, 0); } /* * Thread to unpark is held in successor, which is normally just the next node. * But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor. */ Node s = node.next; // 获取后继结点 // 如果后继结点为 null,或者被取消 if (s == null || s.waitStatus >0) {s = null; / / search from back to front for an uncancelled thread node for (Node t = tail; t! = null & & t! = node; t = t.prev) {if (t.waitStatus 0 / / there are remaining available resources | | h = = null | | h.waitStatus
< 0 // 此时 h 是之前的头结点 || (h = head) == null || h.waitStatus < 0) { // 此时 h 已经更新为当前头结点 Node s = node.next; // 如果后继结点以共享模式在等待,或者后继结点未知,则尝试唤醒后继结点 if (s == null || s.isShared()) { this.doReleaseShared(); } }} 因为当前结点已经获取到资源,所以需要将当前结点记录到头结点中。此外,如果满足以下 2 种情况之一,还需要唤醒后继结点: 参数 propagate >0, that is, there are remaining resources available
The previous or current header node does not exist, or indicates that the successor node needs to be awakened.
If one of the above conditions is met, and the status of the successor node is unknown or waiting in shared mode, the AbstractQueuedSynchronizer#doReleaseShared method is called to wake up the successor node, leaving the implementation of the method to the next section for analysis.
Share and release resources
For resource release in shared mode, AbstractQueuedSynchronizer also defines a single implementation, that is, AbstractQueuedSynchronizer#releaseShared method, which is essentially a scheduling process. The specific operation of releasing resources is completed by tryReleaseShared method and implemented by subclasses. The method AbstractQueuedSynchronizer#releaseShared is implemented as follows:
Public final boolean releaseShared (int arg) {/ / attempt to release resource if (this.tryReleaseShared (arg)) {/ / release resource successfully, wake up successor node this.doReleaseShared (); return true;} return false;} private void doReleaseShared () {/ * * Ensure that a release propagates, even if there are other in-progress acquires/releases. * This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal. * But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added while we are doing this. * Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking. * / for (;;) {Node h = head; if (h! = null & & h! = tail) {int ws = h.waitStatus / / if the header node state is SIGNAL, attempt to clear the status of the current node if (ws = = Node.SIGNAL) {if (! compareAndSetWaitStatus (h, Node.SIGNAL, 0)) {/ / loop to recheck cases continue before waking up the successor node } / / Wake up successor this.unparkSuccessor (h) } / * * if the successor node does not need to be awakened temporarily, try to change the waitStatus of the target node from 0 to PROPAGATE based on CAS, * to ensure that the subsequent wake-up notification arrives Can pass notification to * / else if (ws = = 0 & &! compareAndSetWaitStatus (h, 0, Node.PROPAGATE)) {/ / loop on failed CAS continue }} / / if the header node has not changed, the thread holding the lock has not changed during the period. Being able to go to this step indicates that the previous operation has successfully completed if (h = = head) {break } / / if the header node changes, the thread holding the lock has changed during the period, and you need to retry to ensure the successful execution of the wake-up action}}
If the resource is released successfully, it needs to be processed separately according to the current waiting status of the header node:
If the wait state of the header node is SIGNAL, it indicates that the successor node needs to be awakened and the wait state needs to be cleared before performing the wake-up operation.
If the status of the header node is 0, it means that the successor node does not need to be awakened, and the status of the node needs to be changed to PROPAGATE to ensure that the wake-up notification can be delivered later.
That's all for "how to use JUC's AQS queue Synchronizer". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.