In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article will explain in detail what the synchronization framework of AQS AbstractQueuedSynchronizer is, and the content of the article is of high quality, so the editor will share it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.
Queue Synchronizer AbstractQueuedSynchronizer (hereinafter referred to as Synchronizer) is the basic framework for building locks or other synchronization components.
Its main design idea is to use an int type member variable named state to represent the synchronization status, most of the methods in AQS are to operate on this edge again, and then build a FIFO queue to complete the queuing work of the resource acquisition thread.
The use of AQS mainly uses inheritance, and static inner classes are recommended, which has the advantage of isolating the areas of concern of users and implementers.
AQS's main interface accesses or modifies the synchronization status method name description getState () to get the current synchronization status. SetState (int newState) sets the current synchronization status. CompareAndSetState (int expect,int update) uses CAS to set the current state, which ensures the atomicity of the state setting. The rewritable method name describes that tryAcquire (int arg) exclusively acquires the synchronization state. To implement this method, it is necessary to query the current state and determine whether the synchronization state is in line with the expected state, and then set the synchronization state by CAS. TreRelease (int arg) exclusive release synchronization status, threads waiting for synchronization status will have the opportunity to obtain synchronization status tryAcquireShared (int arg) shared synchronization status, return a value greater than or equal to 0, indicating success, otherwise failure tryReleaseShared (int arg) shared release synchronization status isHeldExclusively () whether the current synchronizer is occupied by the thread in exclusive mode, generally this method indicates whether it is exclusive by the current thread
These overridden methods actually have a default implementation, which directly throws an exception, such as:
Protected boolean tryAcquire (int arg) {throw new UnsupportedOperationException ();}
The advantage of not defining these methods directly as abstract methods is that we can override only some of them as needed. If we want to implement an exclusive lock, we just need to override tryAcquire, treRelease, and isHeldExclusively.
Template method of AQS
The design pattern of AQS using template method is roughly divided into three types of interfaces: exclusive acquisition and release of synchronization status, shared acquisition and release of synchronization status, and query of waiting threads in synchronization queues.
Queue Synchronizer synchronization queue for AQS
The synchronizer relies on the internal synchronization queue (a FIFO two-way queue) to manage the synchronization state. When the current thread fails to obtain the synchronization status, the synchronizer will construct the current thread and waiting status into a node (Node) and add it to the synchronization queue. At the same time, it will block the current thread. When the synchronization state is released, it will wake up the thread in the first node to try to get the synchronization status again.
The node in the synchronization queue (Node), which is a static inner class, is used to hold thread references, wait states, and precursor and successor nodes that failed to get the synchronization state.
The static final class Node {/ * tag indicates that the node is waiting in shared mode * / static final Node SHARED = new Node (); the / * tag indicates that the node is waiting in exclusive mode * / static final Node EXCLUSIVE = null; / * cancellation status * / static final int CANCELLED = 1 / * the successor node is waiting * / static final int SIGNAL =-1; / * indicates that the thread is in the waiting queue * / static final int CONDITION =-2; / * shared, indicating that the status is to be propagated to subsequent nodes * / static final int PROPAGATE =-3 / * CANCELLED value is 1: the thread waiting in the synchronization queue times out or is interrupted, and the waiting needs to be cancelled from the synchronization queue, and the node will not change after entering this state. * SIGNAL value is-1: the thread of the successor node is in the waiting state. If the thread of the current node releases the synchronization state or is cancelled, it will notify the successor node. Enable the thread of the successor node to run * CONDITION value is-2: the node is in the waiting queue, and the node thread is waiting on the Condition. When other threads call the signal () method on the Condition, the node will be transferred from the waiting queue to the synchronous queue. Add to the acquisition of synchronization status * PROPAGATE value of-3: indicates that the next shared synchronization status will be propagated unconditionally * INITIAL value is 0: initial state of node * / volatile int waitStatus / * precursor node, when the node joins the synchronization queue, it is set (tail added) * / volatile Node prev; / * * the successor node * / volatile Node next; / * gets the synchronization status of the thread * / volatile Thread thread; / * waiting for the successor node in the queue. If the current node is shared, then this field will be a SHAEED constant, that is, the node type (exclusive and shared) and the successor nodes in the waiting queue share the same field * / Node nextWaiter; / * Returns true if node is waiting in shared mode. * / final boolean isShared () {return nextWaiter = = SHARED;} / * * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @ return the predecessor of this node * / final Node predecessor () throws NullPointerException {Node p = prev; if (p = = null) throw new NullPointerException (); else return p;} Node () {/ / Used to establish initial head or SHARED marker} Node (Thread thread, Node mode) {/ / Used by addWaiter this.nextWaiter = mode This.thread = thread;} Node (Thread thread, int waitStatus) {/ / Used by Condition this.waitStatus = waitStatus; this.thread = thread;}}
The node is the basis of the synchronization queue (waiting queue). The synchronizer has the head node (head) and the tail node (tail). The thread that does not successfully obtain the synchronization status will become the tail of the node to join the queue. The basic structure of the synchronization queue is shown in the figure:
To ensure thread safety, you need to use the compareAndSetTail (Node expect,Node update) method to set the tail node.
Because only the thread that gets the synchronization state can set the header node, there is no competition to use the setHead (Node update) method directly.
Exclusive synchronous state acquisition and release exclusive synchronous state acquisition
The synchronization status can be obtained by calling the acquire (int arg) method of the synchronizer.
Public final void acquire (int arg) {if (! tryAcquire (arg) & & acquireQueued (addWaiter (Node.EXCLUSIVE), arg) selfInterrupt ();}
The above code mainly completes the synchronization state acquisition, node construction, joining the synchronization queue and spin waiting in the synchronization queue. The main logic is as follows: first, call the tryAcquire (int arg) method implemented by the user-defined synchronizer, which ensures that the synchronization state is obtained safely by threads. If the synchronization state acquisition fails, the synchronization node is constructed (exclusive Node.EXCLUSIVE). Only one thread can successfully obtain the synchronization status at a time) and add the node to the end of the synchronization queue through the addWaiter (Node node) method, and finally call the acquireQueued (Node node,int arg) method to make the node obtain the synchronization state in an "endless loop" way. If not, the thread in the node is blocked, and the awakening of the blocked thread mainly depends on the dequeuing of the precursor node or the interruption of the blocking thread.
Let's analyze the relevant work. The first is the construction of the node and joining the synchronization queue:
Private Node addWaiter (Node mode) {Node node = new Node (Thread.currentThread (), mode); / / assuming there is no competition, quickly try to set the current node as the tail node. If it fails, call the enq method, spin Node pred = tail; if (pred! = null) {node.prev = pred If (compareAndSetTail (pred, node)) {pred.next = node; return node;}} enq (node); return node;} private Node enq (final Node node) {for (;;) {Node t = tail If (t = = null) {/ / Must initialize if (compareAndSetHead (new Node ()) tail = head;} else {node.prev = t If (compareAndSetTail (t, node)) {t.next = node; return t;}}
The above code ensures that nodes can be safely added by threads by using the compareAndSetTail (Node expect,Node update) method.
In the enq (final Node node) method, the synchronizer ensures the correct addition of the node through a "dead loop". In the "dead loop", the current thread can not return from this method until the node is set as a tail node through CAS, otherwise, the frontline keeps trying to set it. As you can see, the enq (final Node node) method serializes requests to add nodes concurrently through CAS.
After the node enters the synchronization queue, it enters a spin process, and each node (or every thread) is observing introspectively. When the condition is met and the synchronization state is obtained, it can exit from the spin process, otherwise it will still stay in the spin process (and block the node's thread). The code is as follows:
Final boolean acquireQueued (final Node node, int arg) {boolean failed = true; try {boolean interrupted = false; / / get the synchronization state for (;;) {final Node p = node.predecessor () in spin mode / / get synchronization status if (p = = head & & tryAcquire (arg)) {setHead (node); p.next = null; / / help GC failed = false Return interrupted;} / / check and set the synchronization status, and then suspend the current thread if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt ()) interrupted = true }} finally {if (failed) cancelAcquire (node);}} private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) {int ws = pred.waitStatus; if (ws = = Node.SIGNAL) return true If (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); pred.next = node;} else {compareAndSetWaitStatus (pred, ws, Node.SIGNAL);} return false } private final boolean parkAndCheckInterrupt () {LockSupport.park (this); return Thread.interrupted ();}
In the acquireQueued (final Node node,int arg) method, the current thread attempts to obtain the synchronization state in a "dead loop", while only the precursor node is the head node can attempt to obtain the synchronization state for two reasons:
The head node is a node that successfully obtains the same state, and after the thread of the head node releases the synchronization state, it will wake up its successor node. After being awakened, the thread of the successor node needs to check whether its precursor node is the head node.
Maintain FIFO principles for synchronization queues.
Node spin to obtain synchronous state structure diagram:
Exclusive synchronous state acquisition process, that is, acquire (int arg) method invocation process, as shown in the figure:
Exclusive synchronization status timeout acquisition
The start of exclusive timeout synchronization is that time judgment is added to the spin method body of exclusive synchronization state acquisition, and each cycle will judge whether there is too little. The code is as follows:
Private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException {if (nanosTimeout 0) {s = null; for (Node t = tail; t! = null & & t! = node; t = t.prev) if (t.waitStatus {lock.lock () Try {user.setAge (user.getAge () + 1); System.out.println (user.getAge ());} finally {lock.unlock ();}}. Start ();}
Summary: when obtaining the synchronization status, the synchronizer maintains a synchronization queue, and threads that fail to obtain the synchronization status are added to the synchronization queue and spin in the queue; the condition for moving out of the queue (or stopping spinning) is that the precursor node is the header node and the synchronization status is successfully obtained. When the synchronization state is released, the synchronizer calls the tryRelease (int arg) method to release the synchronization state, and then wakes up the successor node of the head node.
Shared synchronous state acquisition and release shared synchronous state acquisition
The main difference between shared acquisition and exclusive acquisition is whether multiple threads can obtain the synchronous state at the same time.
In the left half, when sharing access to resources, other shared access is allowed, while exclusive access is blocked, and when the right half is exclusive access to resources, other access is blocked at the same time.
The synchronization status can be shared by calling the acquireShared (int arg) method of the synchronizer, which has the following code:
Public final void acquireShared (int arg) {if (tryAcquireShared (arg))
< 0) doAcquireShared(arg);}private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {setHeadAndPropagate (node, r); p.next = null; / / help GC if (interrupted) selfInterrupt () Failed = false; return }} if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt ()) interrupted = true } finally {if (failed) cancelAcquire (node);}}
In the acquireShared (int arg) method, the synchronizer calls the tryAcquireShared (int arg) method to try to get the synchronization status, and the tryAcquireShared (int arg) method returns a value of type int, indicating that the synchronization status can be obtained when the return value is greater than or equal to 0. Therefore, in the process of shared spin acquisition, the condition for successfully obtaining the synchronous state and exiting the spin is that the return value of the tryAcquireShared (int arg) method is greater than or equal to 0. As you can see, during the spin process of the doAcquireShared (int arg) method, if the precursor of the current node is the head node, try to obtain the synchronization state. If the return value is greater than or equal to 0, it means that the synchronization state was successfully obtained and exited from the spin process.
Shared synchronous state release
Like exclusive acquisition, shared acquisition also needs to release the synchronization state, which can be released by calling the releaseShared (int arg) method. The code is as follows:
Public final boolean releaseShared (int arg) {if (tryReleaseShared (arg)) {doReleaseShared (); return true;} return false;}
After releasing the synchronization state, this method will wake up the subsequent nodes in the waiting state. For concurrent components (such as Semaphore) that can support simultaneous access by multiple threads, the main difference between tryReleaseShared (int arg) and exclusive is that the synchronous state (or number of resources) thread is safely released, usually through loops and CAS, because the operation of releasing the synchronous state comes from multiple threads at the same time.
Shared Lock sample package com.xiaolyuh;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock / * shared locks, which allow multiple threads to acquire locks at the same time * * @ author yuhao.wang3 * @ since 10:00 * / public class SharedLock implements Lock {private static class Sync extends AbstractQueuedSynchronizer {public Sync (int count) {if (count 0 & & compareAndSetState (count, count-arg)) {setExclusiveOwnerThread (Thread.currentThread ()); return count } return-1;} @ Override protected boolean tryReleaseShared (int arg) {for (;;) {/ / secure release locks int count = getState () through loops and CAS; if (compareAndSetState (count, count + arg)) {setExclusiveOwnerThread (null) Return true;} @ Override protected boolean isHeldExclusively () {return getState () {lock.lock (); try {System.out.println (Thread.currentThread (). GetName ()); Thread.sleep (1000) } catch (Exception e) {} finally {lock.unlock ();}}) .start ();} / / wrapping for every second (int I = 0; I < 20; iTunes +) {try {Thread.sleep (1000) } catch (InterruptedException e) {} System.out.println ();} so much about what the synchronization framework of AQS AbstractQueuedSynchronizer is. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.