In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "Java how to achieve data organization through AQS". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how Java achieves data organization through AQS".
AQS
Through the previous introduction, you must have seen that the various types of locks and some thread control interfaces (CountDownLatch, etc.) mentioned above are finally implemented through AQS, and the only difference lies in how abstract functions such as tryAcquire are implemented. From this point of view, the base class AQS (AbstractQueuedSynchronizer) is really well designed to accommodate a variety of synchronization control schemes and provides the necessary lower-level dependencies such as blocking, queuing, etc. Next, let's unveil its mystery.
Internal data
AQS, as its name implies, organizes requests to modify mutually exclusive resources through queues. When the resource is idle, the modification request can be made directly, and when the resource is locked, it needs to wait, and AQS maintains all waiting requests in a queue similar to CLH. CLH:Craig and Landin and Hagersten queues are one-way linked lists, while queues in AQS are virtual two-way queues (FIFO) of CLH variants. AQS implements lock allocation by encapsulating each thread requesting to share resources into a node. The main schematic diagram is as follows:
The state in the figure is an int variable modified with volatile, and its use is carried out through CAS, while the FIFO queue completes the queuing of requests, and the queue operation is also carried out through CAS, so the operation of the queue can achieve the ideal performance requirements.
It's easy to modify state through CAS, and everyone should understand, but what if you want to maintain a two-way queue through CAS? Here we take a look at the implementation of the CLH queue in AQS. There are two pointers in AQS, one to the head of the queue and one to the end of the queue. They are all lazily initialized, which means that they are all null at first.
/ * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. / private transient volatile Node head;/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. * / private transient volatile Node tail
Each node in the queue is a Node instance. The first key field of this instance is waitState, which describes the status of the current node and is modified through CAS:
SIGNAL: indicates that the current node assumes the responsibility of waking up the successor node
CANCELLED: indicates that the current node has timed out or has been interrupted
CONDITION: indicates that the current node is waiting on the Condition (the Condition object can be created through the lock)
PROPAGATE: will only be set on the head node to indicate that when the shared lock is released, this behavior needs to be propagated to other nodes, which we will discuss in more detail later.
Static final class Node {/ * * Marker to indicate a node is waiting in shared mode * / static final Node SHARED = new Node (); / * * Marker to indicate a node is waiting in exclusive mode * / static final Node EXCLUSIVE = null; / * * waitStatus value to indicate thread has cancelled * / static final int CANCELLED = 1; / * * waitStatus value to indicate successor's thread needs unparking * / static final int SIGNAL =-1 / * * waitStatus value to indicate thread is waiting on condition * / static final int CONDITION =-2; / * waitStatus value to indicate the next acquireShared should * unconditionally propagate * / static final int PROPAGATE =-3 / * * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). * / volatile int waitStatus; / * * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. * / volatile Node prev; / * * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. * / volatile Node next; / * * The thread that enqueued this node. Initialized on * construction and nulled out after use. * / volatile Thread thread; / * * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. * / Node nextWaiter; / * * Returns true if node is waiting in shared mode. * / final boolean isShared () {return nextWaiter = = SHARED;} / /...}
Because it is a two-way queue, there must be prev and next pointers in the Node instance, and the corresponding threads will be saved in the Node. Finally, there is nextWaiter. When a node corresponds to a sharing request, nextWaiter points to Node. SHARED and when a node is an exclusive request, nextWaiter points to Node by default. EXCLUSIVE is also called null. We know that AQS also provides the Condition function, which is to maintain threads waiting on the Condition through nextWaiter. In other words, the nextWaiter here acts as the flag bit of the shared lock and exclusive lock in the implementation of the lock, and acts as the next pointer to the linked list in the conditional wait queue.
Synchronization queue
Next, starting from the most common queue operation, we introduce the implementation and use of the AQS framework. You can see from the following code that the queuing operation supports two modes, one is exclusive mode, and the other is shared mode, which corresponds to exclusive lock scenario and shared lock scenario respectively.
When any kind of request is to join the queue, a Node instance is built first, and then the tail node of the current AQS queue is obtained. If the tail node is empty, the queue has not been initialized. The initialization process is implemented in the later enq function.
Here we first look at the situation after initialization, that is, tail! = null, first update the forward pointer prev of the current Node, then modify the tail node to the current Node through CAS, and then update the backward pointer next of the previous node when the modification is successful. Because only the process of modifying the tail pointer is atomic, there will be a situation where the next pointer of the previous tail node previousTail is null when a new node is inserted In other words, there will be a brief out-of-sync of forward and reverse pointers, but in a later introduction, you will find that AQS completely avoids the risk of such asynchrony (by traversing back to front)
If the above operation is successful, the current thread has entered the synchronization queue, otherwise, there may be competition among multiple threads. If the other thread sets the tail node successfully, but the current thread fails, it will enter the enq function just like the tail node is not initialized.
/ * Creates and enqueues node for current thread and given mode. * * @ param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @ return the new node * / private Node addWaiter (Node mode) {Node node = new Node (Thread.currentThread (), mode); / / Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred! = null) {/ / has been initialized node.prev = pred / / CAS modify tail node if (compareAndSetTail (pred, node)) {/ / modify backward pointer pred.next = node; return node;}} / / Loop CAS procedure and initialization process enq (node); return node;}
Normally, the modification of data through CAS is done in a loop, but the addWaiter here is only done in an if. Why? In fact, you can see that this part of the CAS process of addWaiter is a fast execution line, which can omit a lot of judgment process when there is no competition. When competition occurs, it goes into the enq function, which is where the loop CAS is.
The whole enq works in one cycle.
It will first check whether it has not been initialized, and if so, set a virtual node Node as head and tail, that is, the first node of the synchronization queue does not save the actual data, just a place to store pointers
After initialization, modify the tail node through CAS until the modification is successful, and finally fix the backward pointer.
/ * Inserts node into queue, initializing if necessary. See picture above. * @ param node the node to insert * @ return node's predecessor * / private Node enq (final Node node) {for (;;) {/ / CAS operation Node t = tail; if (t = = null) {/ / Must initialize if (compareAndSetHead (new Node () tail = head;} else {node.prev = t / / CAS modify tail node if (compareAndSetTail (t, node)) {/ / modify the backward pointer t.next = node; return t after success }} Thank you for your reading. The above is the content of "how Java implements data organization through AQS". After the study of this article, I believe you have a deeper understanding of how Java implements data organization through AQS, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 235
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.