Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is AQS Abstract queue Synchronizer

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly introduces "what is AQS Abstract queue Synchronizer". In daily operation, I believe that many people have doubts about what is AQS Abstract queue Synchronizer. Xiaobian consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the questions of "what is AQS Abstract queue Synchronizer". Next, please follow the editor to study!

Abstract queue Synchronizer (AQS-AbstractQueuedSynchronizer)

In terms of name:

Abstract: it is an abstract class, and the concrete is implemented by subclasses

Queues: the data structure is queues, using queues to store data

Synchronization: synchronization function can be realized based on it.

Let's interpret it from these aspects, but first, we need to know the following characteristics in order to understand it.

Characteristics of AbstractQueuedSynchronizer

1.AQS can implement exclusive lock and shared lock.

two。 Exclusive lock exclusive is a pessimistic lock. Ensure that only one thread passes through a blocking point, and only one thread can acquire the lock.

3. The shared lock shared is an optimistic lock. Multiple thread blocking points can be allowed, and multiple threads can acquire locks at the same time. It allows a resource to be accessed by multiple read operations or by a write operation, but two operations cannot be accessed simultaneously.

4.AQS uses a member variable of type int, state, to represent the synchronization status. When state > 0, the lock has been acquired, and when state = 0, there is no lock. It provides three methods (getState (), setState (int newState), compareAndSetState (int expect,int update)) to operate on the synchronous state state, which ensures that the operation on the state is safe.

5.AQS is implemented through a CLH queue (CLH lock, namely Craig, Landin, and Hagersten (CLH) locks,CLH lock is a spin lock that ensures no hunger and provides fairness in first-come-first-served service. The CLH lock is also a scalable, high-performance, fair spin lock based on a linked list. The application thread spins only on local variables, constantly polling the status of the precursor, and ending the spin if the precursor is found to have released the lock.)

Abstract

Let's pick the source code and we can see that it inherits from AbstractOwnableSynchronizer and it is an abstract class.

Public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

Inside AQS, a volatile variable state is used as the resource identity. At the same time, several protected methods to obtain and change state are defined, and subclasses can override these methods to implement their own logic.

You can see that several protected-level methods are provided to us in the class, which are:

/ / create a queue synchronizer instance with an initial state of 0 protected AbstractQueuedSynchronizer () {} / / returns the current value of the synchronization status. Protected final int getState () {return state;} / / sets the synchronization status value protected final void setState (int newState) {state = newState;} / / exclusive mode. If you try to get the resource, you will return true if you succeed and false if you fail. Protected boolean tryAcquire (int arg) {throw new UnsupportedOperationException ();} / exclusive mode. If you try to release the resource, return true if you succeed, or false if you fail. Protected boolean tryRelease (int arg) {throw new UnsupportedOperationException ();} / / sharing method. Try to get the resource. A negative number indicates failure; 0 indicates success, but there are no remaining available resources; a positive number indicates success, and the remaining resources protected int tryAcquireShared (int arg) {throw new UnsupportedOperationException ();} / / are shared. Try to release the resource, if you are allowed to wake up the subsequent waiting node to return true, otherwise return false. Protected boolean tryReleaseShared (int arg) {throw new UnsupportedOperationException ();}

Although these methods are protected methods, but they are not implemented in AQS, but directly throw exceptions, AQS implements a series of main logic, AQS is an abstract framework for building locks and synchronizers, using AQS can be simple and efficient to build a widely used synchronizer, such as the ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask we mentioned are based on AQS.

We can easily construct a custom synchronizer using AQS, as long as the subclass implements a few of its protected methods.

Queue

The AQS class itself implements the maintenance of specific thread waiting queues (such as getting resources failed to join the queue / wake up from the queue, etc.). It internally uses a FIFO double-ended queue (CLH) and uses two pointers head and tail to identify the head and tail of the queue. Its data structure is shown in the figure:

Instead of storing threads directly, queues store Node nodes that own threads.

Let's look at the structure of Node:

Static final class Node {/ / Mark a node (corresponding thread) waiting for static final Node SHARED = new Node () in shared mode; / / mark a node (corresponding thread) waiting for the value of static final Node EXCLUSIVE = null; / / waitStatus in exclusive mode, indicating that the node (corresponding thread) has been cancelled static final int CANCELLED = 1 The value of / / waitStatus, indicating that the successor node (corresponding thread) needs to be awakened with static final int SIGNAL =-1; / / waitStatus, indicating that the node (corresponding thread) is waiting for a condition static final int CONDITION =-2 The value of / / waitStatus indicates that resources are available, and the new head node needs to continue to wake up successor nodes / / (in shared mode, multithreading releases resources concurrently, while head wakes up subsequent nodes, / / the extra resources need to be left to subsequent nodes. When a new head node is set, it will continue to wake up the successor node) static final int PROPAGATE =-3; / / wait status, value range,-3 volatile int waitStatus; volatile Node prev; / / precursor node volatile Node next; / / thread Node nextWaiter corresponding to successor node volatile Thread thread; / / node / / waiting for the node of the next waiting condition in the queue / / the method to determine the shared mode final boolean isShared () {return nextWaiter = = SHARED;} Node (Thread thread, Node mode) {/ / Used by addWaiter this.nextWaiter = mode; this.thread = thread } / / other methods are ignored. You can refer to the addWaiter private method private Node addWaiter (Node mode) in the specific source code} / / AQS {/ / use this constructor of Node Node node = new Node (Thread.currentThread (), mode); / / other code omitted}

After Node, we can implement two queues, one is to implement CLH queues (thread synchronization queues, two-way queues) through prev and next, and the other is to implement Condition conditional waiting thread queues (one-way queues) by nextWaiter. This Condition is mainly used in ReentrantLock classes.

Synchronization

There are two synchronization methods:

Exclusive: resources are exclusive and can only be obtained by one thread at a time. Such as ReentrantLock.

Shared mode (Share): it can be obtained by multiple threads at the same time, and the specific number of resources can be specified by parameters. Such as Semaphore/CountDownLatch.

Simultaneously implement synchronization classes for both modes, such as ReadWriteLock

Get resources

The entry to get the resource is the acquire (int arg) method. Arg is the number of resources to get, which is always 1 in exclusive mode.

Public final void acquire (int arg) {if (! tryAcquire (arg) & & acquireQueued (addWaiter (Node.EXCLUSIVE), arg) selfInterrupt ();}

First call tryAcquire (arg) to try to get the resource. As mentioned earlier, this method is implemented in the subclass. If the resource acquisition fails, the thread is inserted into the waiting queue through the addWaiter (Node.EXCLUSIVE) method. The parameter passed in indicates that the Node to be inserted is exclusive. The specific implementation of this method:

Private Node addWaiter (Node mode) {/ / generate the Node node Node node = new Node (Thread.currentThread (), mode) for the thread; / / insert Node into the queue Node pred = tail; if (pred! = null) {node.prev = pred; / / try using CAS, and if successful, return if (compareAndSetTail (pred, node)) {pred.next = node Return node;}} / / if the waiting queue is empty or the above CAS fails, spin CAS into enq (node); return node } / / there will be multiple threads competing for resources at the same time in AQS, / / so it is certain that multiple threads will insert nodes at the same time. / / in this case, the thread safety of the operation is ensured by CAS spin. / / insert spin CAS into waiting queue private Node enq (final Node node) {for (;;) {Node t = tail; if (t = = null) {/ / Must initialize if (compareAndSetHead (new Node ()) tail = head;} else {node.prev = t If (compareAndSetTail (t, node)) {t.next = node; return t;}

If the setting is successful, it means that you have acquired the lock and return true. An action with a state of 0 and a setting of 1 has been done once on the outside, and doing it internally only increases the probability, and such an operation does not take up the overhead relative to the lock. If the state is not 0, determine whether the current thread is an exclusive lock Owner, and if it is Owner, try to increase the state by acquires (that is, by 1). If the state value is out of bounds, an exception will be thrown. If the state is not out of bounds, the state will be set in and return true (similar to biased function can be reentered, but no further requisition is required). If the state is not 0 and itself is not owner, false is returned.

Now, through the addWaiter method, you have put a Node at the end of the waiting queue. The nodes in the waiting queue get the resources one by one from the header node. For specific implementation, let's take a look at the acquireQueued method:

Final boolean acquireQueued (final Node node, int arg) {boolean failed = true; try {boolean interrupted = false; / / spin for (;;) {final Node p = node.predecessor () / / if the precursor node p of node is head, which means that node is the second node, you can try to obtain the resource. If (p = = head & & tryAcquire (arg)) {/ / after getting the resource, point the head to the node. / / so the node that head refers to is the node or null from which the resource is currently acquired. SetHead (node); p.next = null; / / help GC failed = false; return interrupted;} / / if you can rest, enter the waiting state until unpark () if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt () interrupted = true } finally {if (failed) cancelAcquire (node);}}

Here LockSupport.park (this) is used internally in the parkAndCheckInterrupt method, and park is briefly introduced by the way. The LockSupport class is a class introduced by Java 6 that provides basic thread synchronization primitives. LockSupport actually calls functions in the Unsafe class, which boils down to Unsafe. There are only two functions: park (boolean isAbsolute, long time): block the current thread unpark (Thread jthread): cause a given thread to stop blocking

So after the node enters the waiting queue, it calls park to make it into the blocking state. Only the thread of the header node is active.

The process of obtaining resources by the acquire method:

Of course, there are three ways to get resources besides acquire:

AcquireInterruptibly: request interruptible resources (exclusive mode)

AcquireShared: apply for resources in shared mode

AcquireSharedInterruptibly: request interruptible resources (shared mode)

Interruptible means that an InterruptedException may be thrown when a thread is interrupted

Release resources

Releasing resources is much easier than getting them. There is only a short implementation in AQS.

Source code:

Public final boolean release (int arg) {if (tryRelease (arg)) {Node h = head; if (h! = null & & h.waitStatus! = 0) unparkSuccessor (h); return true;} return false;}

The action of the tryRelease method can be thought of as an operation to set the lock state, and subtract the state from the passed parameter value (parameter is 1). If the result state is 0, the Owner of the exclusive lock is set to null, so that other threads have a chance to execute.

In an exclusive lock, the state will be increased by 1 when the lock is added (of course, you can change this value yourself), and 1 will be subtracted when unlocked, and the same lock may be superimposed to these values of 2, 3, 4 after being reentrant. Only the number of times of unlock () corresponds to the number of times of lock () will set the Owner thread to null, and only in this case will true be returned. Note that if you use lock () or deliberately use lock () more than twice in the body of the loop, and you end up with only one unlock (), you may not be able to release the lock. Lead to deadlock.

Private void unparkSuccessor (Node node) {/ / if the status is negative, try setting it to 0 int ws = node.waitStatus; if (ws

< 0) compareAndSetWaitStatus(node, ws, 0); // 得到头结点的后继结点head.next Node s = node.next; // 如果这个后继结点为空或者状态大于0 // 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消 if (s == null || s.waitStatus >

0) {s = null; / / all nodes that are still useful in the waiting queue are moved forward for (Node t = tail; t! = null & & t! = node; t = t.prev) if (t.waitStatus)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report