In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the relevant knowledge of "how to understand the AQS source code". Many people will encounter such a dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
What is AQS abstractQueueSynchronizer (Abstract queue Synchronizer)?
A: it is a heavyweight basic framework for building locks or other synchronizer components and is the foundation of the entire JUC system. The queue for acquiring the lock is completed through the built-in FIFO queue, and the state of holding the lock is identified by an int type variable
Pre-knowledge points:
1. Reentrant lock (recursive lock):
Sync (implicit lock, jvm management) and ReentrantLock (Lock explicit lock, that is, manual unlock) are typical examples of reentrant locks, which can be reused. One becomes multiple processes and the same lock can be acquired.
Reentrant lock concept: refers to a thread, when the outer method acquires the lock, the inner method of the thread will automatically acquire the lock (it must be the same object) and will not be blocked. Deadlock can be avoided
Example: recursively call the same sync-decorated method or block of code. It has to be an object. A thread calls an object's method1,method1 call method2,method2 calls method3, and all three methods are modified by sync, which is also an example of a reentrant lock.
And for example, the following
Static Object lock = new Object (); public void mm () {synchronized (lock) {System.out.println ("= mm method"); synchronized (lock) {System.out.println ("= method");}
There is only one object and a block of synchronization code, and if the sync is nested in the sycn and all are lock objects, then the thread holds the lock on the current object and can be reentered. After decompilation, we found that
Public void mm (); Code: 0: getstatic # 7 / / Field lock:Ljava/lang/Object; 3: dup 4: astore_1 5: monitorenter 6: getstatic # 8 / / Field java/lang/System.out:Ljava/io/PrintStream 9: ldc # 9 / / String = mm method 11: invokevirtual # 10 / / Method java/io/PrintStream.println: (Ljava/lang/String ) V 14: aload_1 15: monitorexit 16: goto 24 19: astore_2 20: aload_1 21: monitorexit 22: aload_2 23: athrow 24: return Exception table: from to target type 6 16 19 any 19 22 19 any
The sync synchronization code block is added and unlocked, and the commands used are monitorenter and monitorexit (the synchronization method ID is ACC_SYNCHRONIZED, in flag). Enter is locked and must appear in pairs, but here there are two more exit. The reason is that the first exit is the unlock command after the program is running normally, and after execution, it will execute goto to return, that is, line 24
The second exit is the unlock command that needs to be executed when an exception occurs in the program.
Above is the related concept of reentrant lock.
2. What is LockSupport?
According to jdk8's api document display, it is defined as the basic thread blocking primitive used to create locks and other synchronization classes.
Is a thread blocking utility class, all methods are static, can allow threads to block anywhere, there is also a corresponding wake-up method after blocking.
First review the wait of object objects and the condition of notify and Lock
Wait and notify must be in the sync code block before they can be used, otherwise an error is reported. Illegal monitor
The await and signal methods of condition must also be executed before the lock and unlock methods, otherwise an error is reported, an illegal monitor
Threads must wait before being awakened, and the order cannot be changed.
LockSupport has two key functions, park and unpark, which use the concept of Permit to block and wake up threads. Each thread has a Permit, which has only two values 0 and 1, and the default is 0. Similar to semaphores, but with an upper limit of 1
Take a look at the park method:
Method of public static void park () {/ / unsafe. Initially 0 UNSAFE.park (false, 0L);}
Disables thread scheduling for the current thread, unless Permit is available, which is 1
If Permit is 1 (certificates are available), it will be changed to 0 (the thread will still process the business logic) and return immediately. Otherwise, the current thread will be disabled and dormant for thread scheduling purposes. Until one of three things happened:
Some other thread calls the unpark of the current thread as the target; or
Some other threads are currently interrupts; or
The phone was returned falsely (that is, for no reason).
This method does not report which thread caused the method to return. Callers should re-check the conditions that cause the thread to park first. The caller can also determine the interrupted state of the thread when it returns.
Summary: Permit defaults to 0, so when park is called at first, the current thread is blocked until another thread modifies the Permit of the current thread to 1, wakes up from the park method, processes the business, and then modifies the permit to 0 and returns; if permit is 1, when calling park, the permit is modified to 0 and the business logic is executed to the thread life cycle. It is consistent with the definition of park method.
Looking at the unpark method:
Public static void unpark (Thread thread) {if (thread! = null) UNSAFE.unpark (thread);}
After the unpark method is called, the Thread thread's permission permit is set to 1, which automatically wakes up the thread, that is, the LockSupport.park method in the previous blocking returns immediately, and then the thread executes the business logic. And unpark can be executed before park. It is equivalent to executing park with no effect.
3. AQS abstractQueueSynchronizer source code
The remaining pre-knowledge is: fair lock, unfair lock, spin lock, linked list, template design pattern.
AQS uses volatile modified int type variables to identify the state of the lock, queues for resource acquisition through the built-in FIFO queue, encapsulates each thread to preempt resources into a node node to allocate the lock, and modifies the state value through CAS (spin lock).
(1) node node source code
Static final class Node {/ * * Marker to indicate a node is waiting in shared mode * / / shared node static final Node SHARED = new Node (); / * * Marker to indicate a node is waiting in exclusive mode * / / exclusive node static final Node EXCLUSIVE = null / * * waitStatus value to indicate thread has cancelled * / / thread cancelled status static final int CANCELLED = 1; / * * waitStatus value to indicate successor's thread needs unparking * / / subsequent threads need to wake up static final int SIGNAL =-1 / * * waitStatus value to indicate thread is waiting on condition * / / Dengdan condition Wake up static final int CONDITION =-2; / * waitStatus value to indicate the next acquireShared should * unconditionally propagate * / / shared room synchronization status acquisition will propagate unconditionally static final int PROPAGATE =-3 / * * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). * / / initial is 0, and the status is the above, indicating the status of the current node in the queue volatile int waitStatus; / * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. * / / Front node volatile Node prev; / * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. * / / Post node volatile Node next; / * The thread that enqueued this node. Initialized on * construction and nulled out after use. * / / when the thread object volatile Thread thread; / * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. * / Node nextWaiter; / * * Returns true if node is waiting in shared mode. * / final boolean isShared () {return nextWaiter = = SHARED;} / * * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @ return the predecessor of this node * / final Node predecessor () throws NullPointerException {Node p = prev; if (p = null) throw new NullPointerException (); else return p;}
The node node is every thread waiting for execution. There is also a waitState status field that identifies the status of the thread currently waiting
Draw an aqs basic structure diagram according to node nodes
Explanation: state is the status bit and aqs is the synchronizer. There are two head and tail nodes, head and tail. When state = 1, it indicates that the synchronizer is occupied (or a thread currently holds a lock on the same object). Subsequent threads are added to the queue and connected with a two-way linked list, following FIFO.
(2) analyze the realization of ReentrantLock. Because he also implements Lock and holds synchronizers sync and AQS internally (take the bank counter as an example)
When new ReentrantLock () or new ReentrantLock (false), an unfair lock is created, and there are two classes inside the ReentrantLock object, fair synchronizer and unfair synchronizer, respectively.
The static final class NonfairSync extends Sync// fair lock has a thread to determine whether there is a queue in the queue, which is different from the acquisition method of the lock above, static final class FairSync extends Sync.
Fair lock interpretation: first-come-first-served. When a new thread acquires the lock, if there is already a thread waiting in the waiting queue of the synchronizer, the current thread will enter the waiting queue first.
Unfair lock explanation: whether or not the new thread has a waiting thread, if it can acquire the lock, it immediately takes possession of the lock.
There is also a key template design pattern: when querying aqs's tryAcquire method, it is found that this method directly throws an exception, which is a typical template design pattern that forces subclasses to override the method. Otherwise, it will not be used.
1.1 when thread a goes to the counter for business, it calls the lock of sync, that is, thread a calls the lock method
Final void lock () {/ / uses cas to set the state of the current object from 0 to 1. Of course, there is an offset / / which means that if 0 is set to 1, it successfully returns true if (compareAndSetState (0,1)) setExclusiveOwnerThread (Thread.currentThread ()); else acquire (1);}
Because it is the first running thread, it must be true, so set the currently running thread to a, that is, thread an occupies the synchronizer and acquires the lock
1.2 when the b thread runs lock and finds that 0 cannot be set to 1 (cas idea), it runs the acquire (1) method.
Public final void acquire (int arg) {/ / template design pattern is used here to force subclasses to implement this method / / since unfair locks are used by default, see NonfairSync if (! tryAcquire (arg) & & acquireQueued (addWaiter (Node.EXCLUSIVE), arg) selfInterrupt ();} static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L; / * * Performs lock. Try immediate barge, backing up to normal * acquire on failure. * / final void lock () {if (compareAndSetState (0,1)) setExclusiveOwnerThread (Thread.currentThread ()); else acquire (1);} protected final boolean tryAcquire (int acquires) {return nonfairTryAcquire (acquires) }} / / this method is the unfair lock execution waiting method final boolean nonfairTryAcquire (int acquires) {/ / get the current b thread final Thread current = Thread.currentThread (); / / get the status of the current lock is 1, because a thread has acquired and modified state to 1 int c = getState (); / / it is possible that an is processing when setting state, and an is done by the time it is here. State is 0. If (c = = 0) {/ / optimistically modify state from 0 to 1 if (compareAndSetState (0, acquires)) {/ / set the current thread to b setExclusiveOwnerThread (current); return true;}} / / it is possible for a thread to finish the business. I went back to handle another one, so the current thread holding the lock is still an else if (current = = getExclusiveOwnerThread ()) {/ / 2 int nextc = c + acquires; if (nextc)
< 0) // overflow throw new Error("Maximum lock count exceeded"); //设置state的值 setState(nextc); return true; } //如果b线程走到这里,就证明b必须到等待队列里去了 return false;} 再来看逻辑运算符后面的逻辑 private Node addWaiter(Node mode) { //实例化一个节点,并将b 和 节点类型封装成node Node node = new Node(Thread.currentThread(), mode); //等待队列为null 所以tail初始化肯定是null //如果是线程c在b之后进来,tail就是b 节点 Node pred = tail; //c节点之后都走这个方法 if (pred != null) { //node的前置为tail //c 的前置设置为b node.prev = pred; //cas乐观,比较 如果当前节点仍然是b 就将b 设置成c //b就是tail尾节点,将tail设置成c //这里根据源码可知,就是将tail的值设置成c 并不影响pred的值,还是b if (compareAndSetTail(pred, node)) { //b 的下一个节点设置成c pred.next = node; return node; } } //线程b 入等待队列 enq(node); return node;}//入队列方法private Node enq(final Node node) { //该方法类似于while(true) for (;;) { //获取tail节点 Node t = tail; //初始化锁等待队列 if (t == null) { // Must initialize //设置头部节点为新的节点 //这里看出,锁等待队列的第一个节点并非b,而是一个空node,该node为站位节点或者叫哨兵节点 if (compareAndSetHead(new Node())) //将头尾都指向该节点 tail = head; } else { //第二次循环时,t为空node,将b的前置设置为空node node.prev = t; //设置tail节点为b节点 if (compareAndSetTail(t, node)) { //空node节点的下一个节点为b node节点 t.next = node; return t; } } }}/** * CAS head field. Used only by enq. */private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);} 以上b线程的进入等待队列的操作就完成了 ,但线程还是活跃的,如何阻塞的呢? 下面接着看acquireQueued方法 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋执行 for (;;) { //如果是b线程,这里p就是b节点的前置节点 final Node p = node.predecessor(); //空节点就是head节点,但又调用了一次tryAcquire方法,想再尝试获取锁资源 //如果a线程未处理完,那么这里返回false //如果a线程处理完成,那么这里就可以获取到锁 if (p == head && tryAcquire(arg)) { //将head设置成b节点 setHead(node); //原空节点的下连接断开 p.next = null; // help GC failed = false; return interrupted; } //第一次空节点进入should方法。返回false //当第二次循环到此处should方法返回true //执行parkAndCheckInterrupt方法,会将当前线程park,并获取b线程的中断状态,如果未中断返回false,并再次自旋一次 ,中断为true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //head节点就是空节点所以w=0 //空节点第二次进入时就是-1 int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; //如果b节点状态是其他,则将节点连接变化一下 if (ws >0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); when pred.next = node;} else {/ / ws = 0, use cas to verify the values of pred and ws, which are empty nodes and 0 and modify ws to-1 compareAndSetWaitStatus (pred, ws, Node.SIGNAL);} return false } private final boolean parkAndCheckInterrupt () {LockSupport.park (this); return Thread.interrupted ();}
The operation of the above b thread without acquiring the lock and being suspended is completed
1.3 when thread a calls the unlock method:
Public void unlock () {sync.release (1);} public final boolean release (int arg) {/ / determines whether a thread completes the business. And release the lock, state=0 if (tryRelease (arg)) {/ / get the header node, that is, the empty node Node h = head; / / the state of the empty node changes to-1 when b acquires the lock, so here is true if (h! = null & & h.waitStatus! = 0) / wake up thread unparkSuccessor (h); return true;} return false } / / template method of the aqs parent class, forcing the subclass to implement the method protected boolean tryRelease (int arg) {throw new UnsupportedOperationException ();} protected final boolean tryRelease (int releases) {/ / set the state of the lock to 0 int c = getState ()-releases; / / to determine whether the current thread is consistent with the lock's exclusive thread if (Thread.currentThread ()! = getExclusiveOwnerThread ()) throw new IllegalMonitorStateException () / / the resulting status ID boolean free = false; if (c = = 0) {free = true; / / if state=0 proves that a has completed the business. Then the exclusive state of the lock should be restored to null setExclusiveOwnerThread (null);} / / restore the state state of the lock setState (c); return free;}
Note: here state is a minus 1 operation. If the ReentrantLock is constantly reentrant, then you can't go back to zero at once. That's why ReentrantLock tuned lock several times, that is, unlock several times.
A thread wakes up b thread
Private void unparkSuccessor (Node node) {int ws = node.waitStatus; if (ws)
< 0) //空节点-1,设置成0 compareAndSetWaitStatus(node, ws, 0); //获取b节点,非null 且 waitState=0 Node s = node.next; if (s == null || s.waitStatus >0) {s = null; for (Node t = tail; t! = null & & t! = node; t = t.prev) if (t.waitStatus
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.