In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article focuses on "how to use AQS on the basis of Java highly concurrent programming". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to use AQS in the basics of Java highly concurrent programming.
Introduction
There was once a more classic interview question, "what are the common classes under the concurrent package of java?" Most people should be able to name CountDownLatch, CyclicBarrier, and Sempahore multithreading concurrency. These three sharp tools are all implemented through the AbstractQueuedSynchronizer abstract class (hereinafter abbreviated AQS), so it is necessary for us to learn AQS before learning the three sharp weapons.
AQS is a simple framework that provides atomic management of synchronization state, blocking and waking threads, and a queue model. "
AQS structure
When it comes to synchronization, how can we ensure synchronization? Everyone's first impression must be to add a lock, and when it comes to locks, the first thing you will think of is Synchronized. Synchronized should be basically used by everyone. Locking and releasing locks are all implemented by jvm. We just need to simply add a Synchronized keyword. It's super easy to use. But there is a situation where we can not set the timeout of a lock Synchronized, then we can use ReentrantLock to achieve, ReentrantLock is achieved through aqs, today we will use ReentrantLock to learn aqs.
CAS & & Fair lock and unfair lock
AQS uses a lot of CAS to learn AQS before we need to simply understand CAS, fair locks and unfair locks.
CAS
CAS, whose full name is compare and swap, is a mechanism used to implement synchronization in a multithreaded environment. The CAS operation contains three operands: the memory location, the expected value, and the new value. The implementation logic of CAS is to compare the value at the memory location with the expected value, and if equal, replace the value at the memory location with the new value. If it is not equal, no operation is done. This operation is atomic, and AtomicInteger and other classes in java are implemented through cas.
Fair lock and unfair lock
Fair lock: multiple threads acquire the lock in the order in which they apply for the lock, and the thread directly enters the queue to queue so that the first thread in the queue can get the lock. Advantage: the thread waiting for the lock will not starve to death, and each thread can acquire the lock. Disadvantages: the overall throughput efficiency is relatively low, all threads except the first thread in the waiting queue will block, and the overhead of CPU waking up blocking threads is higher than that of unfair locks.
Unfair lock: when multiple threads go to acquire the lock, they will directly try to acquire it. If they cannot get it, they will enter the waiting queue. If they can get it, they will acquire the lock directly. Advantages: CPU can reduce the overhead of waking up threads, the overall throughput efficiency will be higher, CPU does not have to wake up all threads, will reduce the number of awakened threads. Cons: threads in the waiting queue may starve to death or wait a long time to acquire the lock. The text is a bit of a mouthful, let's give a practical example to illustrate. For example, when we go to the canteen, we all have to wait in line for dinner, and we all wait in line for meals in the first-come-first-come order. This is the fair lock. If you wait until you are ready to take a plate for dinner, a fat man of five and three will jump in front of you, and if you can't win, he can only let him jump the queue, and when the fat man has finished his meal, a small man will come to your queue. At this time, you can't stand it, just yell him out, and this little guy can only jump to the end of the queue and wait in line. This is an unfair lock. Let's first take a look at the attributes of AQS.
/ / header node private transient volatile Node head; / / blocked tail node, each new node is inserted at the end, forming a linked list private transient volatile Node tail; / / this is the most important, representing the state of the current lock, 0 indicates that the current lock is not occupied, and greater than 0 means that the thread holds the current lock / / this value can be greater than 1, because the lock can be reentered, and 1 private volatile int state is added each time the lock is reentered / / represents the thread that currently holds an exclusive lock, to give the most important example, because the lock can be reentered / / reentrantLock.lock () can be called multiple times, so this is used each time to determine whether the current thread already has a lock / / if (currentThread = = getExclusiveOwnerThread ()) {state++} private transient Thread exclusiveOwnerThread; / / inherits from AbstractOwnableSynchronizer.
Let's write a demo to analyze the process of locking and releasing locks in lock.
Final void lock () {/ / first try setting the state directly to 1. If no one acquires the lock at this time, you will directly if (compareAndSetState (0,1)) / / then modify the thread setExclusiveOwnerThread (Thread.currentThread ()); else acquire (1);}
The cas attempt failed, indicating that someone has already held the lock, so enter the acquire method
Public final void acquire (int arg) {if (! tryAcquire (arg) & & acquireQueued (addWaiter (Node.EXCLUSIVE), arg) selfInterrupt ();}
TryAcquire method, to see what the name probably means, is to give it a try. TryAcquire actually calls the nonfairTryAcquire method of the parent class Sync
Final boolean nonfairTryAcquire (int acquires) {final Thread current = Thread.currentThread (); / / get the status of the current lock int c = getState () / / this if logic is the same as the previous logic that acquired the lock as soon as it came in. The cas attempts to acquire the unlocked if (c = = 0) {if (compareAndSetState (0, acquires)) {setExclusiveOwnerThread (current); return true }} / / entering this judgment indicates that + 1 else if (current = = getExclusiveOwnerThread ()) {int nextc = c + acquires is required for the lock to re-enter the state. / / if the number of reentrants of the lock is greater than the maximum value of int, an exception will be thrown directly. This should not happen normally, but jdk is still a strict if (nextc).
< 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 说明尝试获取锁失败了,失败了就要进行acquireQueued方法了 return false; } tryAcquire方法如果获取锁失败了,那么肯定就要排队等待获取锁。排队的线程需要待在哪里等待获取锁?这个就跟我们线程池执行任务一样,线程池把任务都封装成一个work,然后当线程处理任务不过来的时候,就把任务放到队列里面。AQS同样也是类似的,把排队等待获取锁的线程封装成一个NODE。然后再把NODE放入到一个队列里面。队列如下所示,不过需要注意一点head是不存NODE的。Next, let's move on to the source code to see how the failure to acquire the lock is queued. To execute the acquireQueued method, you need to execute the addWaiter method before executing the acquireQueued method.
Private Node addWaiter (Node mode) {Node node = new Node (Thread.currentThread (), mode); / / Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred! = null) {node.prev = pred; / / cas join queue tail if (compareAndSetTail (pred, node)) {pred.next = node Return node;}} / / tail node is not empty | | cas failed to join tail node enq (node); return node;}
Enq
Let's take a look at the enq method.
/ make sure that the current node joins the queue tail private Node enq (final Node node) {for (;;) {Node t = tail via spin and CAS / / if the tail node is empty, the queue is still empty and has not been initialized, so initialize the header node. You can see that the node of the header node has no thread bound, that is, if (t = = null) {/ / Must initialize if (compareAndSetHead (new Node () tail = head. } else {node.prev = t; if (compareAndSetTail (t, node)) {t.next = node; return t;}
Through the addWaiter method, the thread that acquired the lock has been added to the alignment by encapsulating it into a NODE. An execution flowchart of the above method is as follows:
The next step is to continue to execute the acquireQueued method
AcquireQueued
Final boolean acquireQueued (final Node node, int arg) {boolean failed = true; try {boolean interrupted = false; for (;;) {/ / attempt to acquire the lock by spinning the precursor node = = head, this method has been analyzed earlier. Final Node p = node.predecessor (); if (p = = head & & tryAcquire (arg)) {setHead (node); p.next = null; / / help GC failed = false; return interrupted } / / entering this if indicates that the precursor node of node is not equal to head or failed to acquire a lock / / determine whether the current thread needs to be suspended if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt ()) interrupted = true }} finally {/ / abnormal condition enters cancelAcquire. In jdk11, the source code is directly catch (Throwable e) {cancelAcquire (node);} simple and clear if (failed) cancelAcquire (node);}}
SetHead
In this method, whenever a node acquires a lock, the current node node is set as the header node, which can be simply regarded as the current node "removing" (changing to the header node) queue when the current node acquires the lock.
ShouldParkAfterFailedAcquire
When it comes to this method, we need to take a look at the possible states of NODE in the source code, and we can see that there are a total of four states.
CANCELLED: the value is 1. If the thread waiting in the synchronization queue times out or is interrupted, you need to cancel the Node node from the synchronization queue. The waitStatus of the node is CANCELLED, that is, the end state. After entering this state, the node will not change.
SIGNAL: the value is-1, which is identified as the successor node of the waiting wake-up state. When the thread of its predecessor node releases the synchronization lock or is cancelled, it will notify the thread of the successor node to execute. To put it bluntly, it is in the wake-up state, and as soon as the predecessor node releases the lock, the thread of the successor node identified as SIGNAL will be notified to execute.
CONDITION: the value is-2, which is related to Condition. The identified node is in the waiting queue, and the thread of the node is waiting on the Condition. When other threads call the signal () method of Condition, the node in CONDITION state will be transferred from the waiting queue to the synchronization queue, waiting for the synchronization lock to be acquired.
PROPAGATE: the value is-3, which is related to the shared mode, where the state indicates that the thread of the node is runnable.
Private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) {int ws = pred.waitStatus; / / precursor node status returns true if this state is-1, suspending the current thread if (ws = = Node.SIGNAL) return true / / greater than 0, indicating that the status is CANCELLED if (ws > 0) {do {/ / delete the cancelled node (let the cancelled node become an unreferenced node waiting for the next GC to be recycled) node.prev = pred = pred.prev;} while (pred.waitStatus > 0); pred.next = node } else {/ / entry here can only be 0mai Murray 2mai Mui 3. When the NODE node is initialized, the default value of waitStatus is 0, so it is only here that the waitStatus is modified / / the status of the precursor node is set to-1 through cas, and then false is returned. The outside call to this method is a loop, and the method compareAndSetWaitStatus (pred, ws, Node.SIGNAL) is called again;} return false;}
ParkAndCheckInterrupt
Suspends the current thread and blocks
Private final boolean parkAndCheckInterrupt () {LockSupport.park (this); / / suspends the current thread, blocking return Thread.interrupted ();}
Insert a picture description here
Unlock
If the lock is successfully added, the lock should be released when the lock is used up. Release the lock and focus on the unparkSuccessor method.
Private void unparkSuccessor (Node node) {/ / header node status int ws = node.waitStatus; if (ws
< 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // s==null head的successor节点获取锁成功后,执行了head.next=null的操作后,解锁线程读取了head.next,因此s==null // head的successor节点被取消(cancelAcquire)时,执行了如下操作:successor.waitStatus=1 ; successor.next = successor; if (s == null || s.waitStatus >0) {s = null; / / start with the tail node and find the first non-canceled node. There is no break oh for (Node t = tail; t! = null & & t! = node; t = t.prev) if (t.waitStatus)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.