Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand Abstract synchronization queue AQS

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the knowledge of "how to understand Abstract synchronization queue AQS". Many people will encounter this dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Underlying support for AQS-- locks

When it comes to concurrency, we have to talk about ReentrantLock; and ReentrantLock, and we have to talk about AbstractQueuedSynchronizer (AQS)!

Class as its name, abstract queue synchronizer, AQS defines a synchronizer framework for multithreaded access to shared resources, and many synchronized class implementations rely on it, such as the commonly used ReentrantLock/Semaphore/CountDownLatch...

The underlying layer of the concurrent package is implemented using AQS. The following is the class diagram structure of AQS

It maintains a volatile int state (representing shared resources) and a FIFO thread waiting queue (which will be entered by multiple threads of competing resources if they are blocked). Here volatile guarantees thread visibility.

There are three ways to access state:

GetState ()

SetState ()

CompareAndSetState ()

All three are atomic operations, where the implementation of compareAndSetState depends on Unsafe's compareAndSwapInt () method. The code is as follows:

Protected final boolean compareAndSetState (int expect, int update) {/ / See below for intrinsics setup to support this return unsafe.compareAndSwapInt (this, stateOffset, expect, update);} Custom Resource sharing method

AQS defines two ways to share resources: Exclusive (exclusive, only one thread can execute, such as ReentantLock) and Share (shared, multiple threads can execute simultaneously, such as Semaphore/CountDownLatch).

Different custom synchronizers compete for shared resources in different ways. Custom synchronizers only need to implement the acquisition and release of shared resources state. As for the maintenance of specific threads waiting for queues (such as getting resources failed to join the queue / wake up the queue, etc.), AQS has been implemented at the top level. The following methods are mainly implemented when implementing a custom synchronizer.

IsHeldExclusively (): whether the thread is monopolizing resources. You need to implement it only if you use condition.

TryAcquire (int): exclusive mode. If you try to get the resource, you will return true if you succeed and false if you fail.

TryRelease (int): exclusive mode. If you try to release the resource, return true if you succeed, or false if you fail.

TryAcquireShared (int): sharing method. When you try to get a resource, a negative number indicates failure; 0 indicates success, but no remaining available resources are used; a positive number indicates success and there are remaining resources.

TryReleaseShared (int): sharing method. Try to release the resource, and return true if you are allowed to wake up the subsequent waiting node, otherwise return false.

Take ReentrantLock as an example, state is initialized to 0, indicating an unlocked state. When thread A lock (), tryAcquire () is called to monopolize the lock and state+1. After that, other threads fail when they tryAcquire (), and the other threads don't have a chance to acquire the lock until the A thread unlock () to state=0 (that is, release the lock). Of course, thread A can acquire the lock repeatedly (state accumulates) before releasing the lock, which is the concept of reentrant. Note, however, that you release as many times as you get, so that state can return to zero.

Take CountDownLatch as an example, the task is divided into N sub-threads to execute, and the state is initialized to N (note that N should be the same as the number of threads). These N child threads are executed in parallel, and after each child thread completes execution, countDown () once, state will CAS minus 1. When all the child threads have finished executing (that is, state=0), the main calling thread unpark (), and then the main calling thread returns from the await () function to continue the rest of the action.

In general, custom synchronizers are either exclusive or shared, and they only need to implement one of tryAcquire-tryRelease or tryAcquireShared-tryReleaseShared. However, AQS also supports both monopolization and sharing of custom synchronizers, such as ReentrantReadWriteLock.

Source code implementation

Next, let's start with the source implementation of AQS. Follow the order of acquire-release and acquireShared-releaseShared.

1. Acquire (int)

Acquire is an exclusive way to obtain resources, if the resource is obtained, the thread returns directly, otherwise it enters the waiting queue until the resource is obtained, and the whole process ignores the impact of interruption. This method is the top-level entry for threads to obtain shared resources in exclusive mode.

Once the resource is obtained, the thread can execute its critical section code. The following is the source code for acquire ()

Public final void acquire (int arg) {if (! tryAcquire (arg) & & acquireQueued (addWaiter (Node.EXCLUSIVE), arg) selfInterrupt ();}

We know from annotations that the acquire method is a mutex and ignores interrupts. This method executes the tryAcquire (int) method at least once, and if the tryAcquire (int) method returns true, the acquire returns directly, otherwise the current thread needs to be queued. The flow of the function is as follows

1. TryAcquire (): try to get the resource directly, and return directly if it succeeds

2. AddWaiter (): add the thread to the end of the waiting queue and mark it in exclusive mode

3. AcquireQueued (): causes the thread to get the resource in the waiting queue and does not return until the resource is obtained. True is returned if it has been interrupted throughout the wait, otherwise false is returned.

4. If a thread is interrupted while waiting, it does not respond. Self-interrupt selfInterrupt () is performed only after the resource is obtained, and the interrupt is made up.

Now start the source code analysis of these four methods 1.1 tryAcquire (int) protected boolean tryAcquire (int arg) {throw new UnsupportedOperationException ();}

TryAcquire attempts to get the resource in an exclusive way, and if it succeeds, it returns true directly, otherwise it returns false directly. This method can be used to implement the tryLock () method in Lock. The default implementation of this method is to throw a UnsupportedOperationException exception

What? Direct throw exception? What about the function we talked about? Well, remember that AQS in the overview is just a framework, and the acquisition / release of specific resources is implemented by a custom synchronizer? This is it! AQS defines only one interface here, and the acquisition of specific resources is handed over to

Custom synchronizer to achieve (through state's get/set/CAS)! As for whether you can re-enter and plug, it depends on how to design the specific custom synchronizer! Of course, the custom synchronizer takes into account the impact of thread safety when accessing resources.

The reason why it is not defined as abstract is that only tryAcquire-tryRelease is implemented in exclusive mode and tryAcquireShared-tryReleaseShared is only used in shared mode. If all are defined as abstract, then each pattern also implements an interface in another mode.

In the final analysis, Doug Lea is still from our developer's point of view to minimize unnecessary workload.

1.2 addWaiter (Node) / * Creates and enqueues node for current thread and given mode. * * @ param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @ return the new node * / private Node addWaiter (Node mode) {Node node = new Node (Thread.currentThread (), mode); / /-- construct nodes in the given mode. There are two kinds of mode: EXCLUSIVE (exclusive) and SHARED (shared) / / Try the fast path of enq; backup to full enq on failure Node pred = tail; / /-- try to quickly put it directly at the end of the queue if (pred! = null) {node.prev = pred; if (compareAndSetTail (pred, node)) {pred.next = node; return node;}} enq (node) / /-- if you fail in the previous step, join the team through enq return node;}

Needless to say, just look at the notes. Here we talk about Node. The Node node is the encapsulation of each thread accessing the synchronization code, which contains the thread itself that needs to be synchronized and the state of the thread, such as whether it is blocked, whether it is waiting to wake up, whether it has been cancelled, and so on. The variable waitStatus represents the wait state that is currently encapsulated as a Node node. There are four values: CANCELLED, SIGNAL, CONDITION and PROPAGATE.

CANCELLED: the value is 1. If the thread waiting in the synchronization queue times out or is interrupted, you need to cancel the Node node from the synchronization queue. The waitStatus of the node is CANCELLED, that is, the end state. After entering this state, the node will not change.

SIGNAL: the value is-1, which is identified as the successor node of the waiting wake-up state. When the thread of its predecessor node releases the synchronization lock or is cancelled, it will notify the thread of the successor node to execute. To put it bluntly, it is in the wake-up state, and as soon as the predecessor node releases the lock, the thread of the successor node identified as SIGNAL will be notified to execute.

CONDITION: the value is-2, which is related to Condition. The identified node is in the waiting queue, and the thread of the node is waiting on the Condition. When other threads call the signal () method of Condition, the node in CONDITION state will be transferred from the waiting queue to the synchronization queue, waiting for the synchronization lock to be acquired.

PROPAGATE: the value is-3, which is related to the shared mode, where the state indicates that the thread of the node is runnable.

0 status: the value is 0, which represents the initialization state.

AQS cancels the status by using waitStatus > 0 to indicate the canceled status, while waitStatus 0) {/ /-- if it is empty, or has cancelled s = null; for (Node t = tail; t! = null & & t! = node; t = t.prev) if (t.waitStatus 0 | | h = = null | | h.waitStatus < 0 | | (h = head) = null | | h.waitStatus < 0) {Node s = node.next If (s = = null | | s.isShared () doReleaseShared ();}}

This method is one more step on the basis of setHead (), that is, when you wake up, if the conditions are met (for example, there are remaining resources), you will also wake up the successor node, which is a shared mode after all!

3.2 Summary

OK, at this point, acquireShared () is coming to an end. Let's comb through its process again:

TryAcquireShared () attempts to get the resource, and if it succeeds, it returns directly.

If it fails, it enters the waiting queue park () through doAcquireShared () and returns until it is successfully fetched by unpark () / interrupt (). Interrupts are also ignored throughout the waiting process.

In fact, the process is more or less the same as that of acquire (), except that after you get the resources, you will awaken the operation of your subsequent teammates (this is sharing).

4. ReleaseShared (int)

Having finished with acquireShared () in the previous section, let's talk about its anti-operation releaseShared () in this section. This method is the top-level entry for threads to release shared resources in shared mode. It releases a specified amount of resources, and if it is successfully released and allows to wake up waiting threads, it wakes up other threads in the waiting queue to obtain resources. Here is the source code for releaseShared ():

Public final boolean releaseShared (int arg) {if (tryReleaseShared (arg)) {/ / attempt to release the resource doReleaseShared (); / / Wake up the successor return true;} return false;}

The process of this method is also relatively simple, in a word: after releasing the resources, wake up the successor. Similar to release () in exclusive mode, but with one slight note: tryRelease () in exclusive mode returns to true to wake up other threads after completely releasing the state=0, mainly based on the consideration of reentrant in exclusive mode. However, releaseShared () in shared mode does not have this requirement. The essence of shared mode is to control a certain number of threads to execute concurrently, so threads with resources can wake up the successor waiting node when they release part of the resources. For example, the total amount of resources is 13, respectively, A (5) and B (7) get resources to run concurrently, and when C (4) comes, there is only one resource left to wait. A releases 2 resources during the operation, and then tryReleaseShared (2) returns true to wake up C Magazine C to see that only 3 are still not enough to wait; then B releases 2 more, tryReleaseShared (2) returns true to wake C Ling C to see that 5 are enough for its own use, and then C can run with An and B. The tryReleaseShared () of the ReentrantReadWriteLock read lock returns true only if the resource (state=0) is completely freed, so the custom synchronizer can determine the return value of tryReleaseShared () as needed.

4.1 doReleaseShared ()

This method is mainly used to wake up successors. Here is its source code:

Private void doReleaseShared () {for (;;) {Node h = head; if (h! = null & & h! = tail) {int ws = h.waitstatus; if (ws = = Node.SIGNAL) {if (! compareAndSetWaitStatus (h, Node.SIGNAL, 0)) continue;// loop to recheck cases unparkSuccessor (h) / / Wake up the successor} else if (ws = = 0 & &! compareAndSetWaitStatus (h, 0, Node.PROPAGATE)) continue; / / loop on failed CAS} if (h = = head) / / head changes break;}} 5 Summary

In this section, we explain in detail the source code of access-release resources (acquire-release, acquireShared-releaseShared) in both exclusive and shared modes. I believe we all have a certain understanding. It is worth noting that under both acquire () and acquireShared () methods, threads ignore interrupts in the waiting queue. AQS also supports interrupt response, acquireInterruptibly () / acquireSharedInterruptibly (), that is, the corresponding source code here is similar to acquire () and acquireShared (), so I won't go into details here.

4. Simple application of 4.1 Mutex (mutex)

Mutex is a non-reentrant mutex implementation. The lock resource (state in AQS) has only two states: 0 means unlocked and 1 indicates locked. The following is the core source code of Mutex:

Class Mutex implements Lock, java.io.Serializable {/ / Custom Synchronizer private static class Sync extends AbstractQueuedSynchronizer {/ / determines whether to lock the state protected boolean isHeldExclusively () {return getState () = = 1;} / / attempts to get the resource and returns immediately. True is returned if successful, otherwise false is returned. Public boolean tryAcquire (int acquires) {assert acquires = = 1; / / it is limited to 1 quantity if (compareAndSetState (0,1)) {/ / state is 0 before it is set to 1, and cannot be reentered! SetExclusiveOwnerThread (Thread.currentThread ()); / / set to the current thread exclusive resource return true;} return false;} / / attempt to release the resource and return immediately. Success is true, otherwise false. Protected boolean tryRelease (int releases) {assert releases = = 1; / / is limited to 1 quantity if (getState () = = 0) / / since it is released, it must be in the occupied state. Just for insurance, multi-layer judgment! Throw new IllegalMonitorStateException (); setExclusiveOwnerThread (null); setState (0); / / release resources, abandon possession return true;}} / / the implementation of the real synchronization class all depends on the custom synchronizer inherited from AQS! Private final Sync sync = new Sync (); / / lockacquire. The semantics of the two are the same: get the resource, even if you wait, and don't return until you succeed. Public void lock () {sync.acquire (1);} / tryLocktryAcquire. The semantics of the two are the same: try to get the resource and require an immediate return. Success is true and failure is false. Public boolean tryLock () {return sync.tryAcquire (1);} / unlockrelease. Both languages are the same: release resources. Public void unlock () {sync.release (1);} / / whether the lock occupies status public boolean isLocked () {return sync.isHeldExclusively ();}}

When a synchronization class is implemented, it generally defines a custom synchronizer (sync) as an internal class for its own use, while the synchronization class itself (Mutex) implements an interface for external services. Of course, the implementation of the interface directly depends on sync, and they also have some semantic correspondence! However, sync only uses tryAcquire-tryRelelase to obtain and release the resource state. As for the queuing, waiting and waking of threads, the upper AQS has been implemented, so we don't have to worry about it.

With the exception of Mutex,ReentrantLock/CountDownLatch/Semphore, these synchronization classes are all implemented in the same way, but the difference is in the way of obtaining and releasing resources, tryAcquire-tryRelelase. If you master this, the core of AQS will be breached!

That's all for "how to understand Abstract synchronization queue AQS". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report