Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the cornerstone of Java concurrent data structure

2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces the relevant knowledge of what is the cornerstone of Java concurrent data structure, the content is detailed and easy to understand, the operation is simple and fast, and it has certain reference value. I believe you will gain something after reading this article on the cornerstone of Java concurrent data structure. Let's take a look.

Thread blocking primitive

Thread blocking and awakening of Java is done through the park and unpark methods of the Unsafe class.

Public class Unsafe {

...

Public native void park (boolean isAbsolute, long time)

Public native void unpark (Thread t)

...

}

Both of these methods are native methods, and they are the core functions implemented by the C language. Park means to stop and let the currently running thread Thread.currentThread () sleep, and unpark means to release the stop and wake up the specified thread. At the bottom, these two methods are implemented using the semaphore mechanism provided by the operating system. The specific implementation process should be in-depth study of the C code, which will not be analyzed here for the time being. The two parameters of the park method are used to control how long the sleep lasts. The first parameter, isAbsolute, indicates whether the second parameter is absolute or relative, in milliseconds.

The thread runs from startup, and except for the operating system's task scheduling policy, it pauses only when park is called. The secret that a lock can pause a thread is that the lock calls the park method at the bottom.

ParkBlocker

The thread object Thread has an important property, parkBlocker, which holds the park of the current thread for what. It's like there are a lot of cars parked in the parking lot, and these car owners come to take part in an auction and drive away after taking pictures of what they want. So the parkBlocker here probably refers to this "auction". It is the manager coordinator of a series of conflicting threads, and it controls which thread should sleep and which thread should wake up.

Class Thread {

...

Volatile Object parkBlocker

...

}

This property is set to null when the thread is awakened by unpark. Unsafe.park and unpark do not help us set the parkBlocker property. The utility class responsible for managing this property is LockSupport, which simply wraps the two methods Unsafe.

Class LockSupport {

...

Public static void park (Object blocker) {

Thread t = Thread.currentThread ()

SetBlocker (t, blocker)

U.park (false, 0L)

SetBlocker (t, null); / / null after waking up

}

Public static void unpark (Thread thread) {

If (thread! = null)

U.unpark (thread)

}

}

...

}

The lock data structure of Java realizes hibernation and wakeup by calling LockSupport. The value of the parkBlocker field in the thread object is the "queue manager" we will talk about below.

Queuing manager

When multiple threads compete for the same lock, there must be a queuing mechanism to string together those threads that fail to get the lock. When the lock is released, the lock manager picks a suitable thread to occupy the lock that has just been released. There is a queue manager inside each lock that maintains a queue of waiting threads. The queue manager in ReentrantLock is AbstractQueuedSynchronizer, and its internal waiting queue is a two-way list structure. The structure of each node in the list is as follows.

Class AbstractQueuedSynchronizer {

The volatile Node head; / / queue head thread will have priority in acquiring the lock

Volatile Node tail; / / the thread that failed to grab the lock is appended to the end of the queue

Volatile int state; / / lock count

}

Class Node {

Node prev

Node next

Thread thread; / / one thread per node

/ / the following two special fields can not be understood first.

Node nextWaiter; / / whether a shared lock or an exclusive lock is requested

Int waitStatus; / / Fine status description word

}

When locking is unsuccessful, the current thread puts itself at the end of the waiting list and then calls LockSupport.park to hibernate itself. When other threads unlock, they take a node from the header of the linked list and call LockSupport.unpark to wake it up.

The AbstractQueuedSynchronizer class is an abstract class, which is the parent class of all lock queue managers. The internal queue managers of various forms of locks in JDK inherit this class. It is the core cornerstone of the Java concurrent world. For example, queue managers within ReentrantLock, ReadWriteLock, CountDownLatch, Semaphone, and ThreadPoolExecutor are all its subclasses. This abstract class exposes some abstract methods, and each lock needs to be customized to the manager. All the concurrent data structures built into JDK are completed under the protection of these locks, which is the foundation of JDK multi-threaded high-rise buildings.

The lock manager maintains a queue in the form of an ordinary two-way list. This data structure is simple, but it is quite complex to maintain carefully, because it requires careful consideration of multithreading concurrency, and every line of code is written with great care.

The implementer of JDK lock manager is Douglas S. Lea,Java concurrency package, which is almost all written by him alone. In the world of algorithms, the more sophisticated things are, the more suitable for a person to do.

Douglas S. Lea is a professor of computer science at the State University of New York at Oswego and currently head of the computer science department, specializing in concurrent programming and the design of concurrent data structures. He is a member of the executive committee of Java Community Process and hosts JSR 166, which adds concurrency utilities to the Java programming language.

Later, we will write AbstractQueuedSynchronizer as AQS. I must remind readers that AQS is so complex that it's normal to encounter setbacks on the way to understanding it. At present, there is not a book on the market that can easily understand AQS. There are too few people who can understand AQS, and I don't count myself.

Fair lock and unfair lock

The fair lock ensures the order in which the lock is requested and acquired. If the lock is free at some point and a thread tries to add the lock, the fair lock must also check to see if any other threads are currently queued. Unfair locks can jump the queue directly. Think of the queue when you buy a hamburger at KFC.

You might ask, if a lock is free, how can it have queued threads? Let's assume that the thread holding the lock has just released the lock, which wakes up the first node thread in the waiting queue, and the awakened thread has just returned from the park method, and then it will try to lock, so the state between park and locking is the free state of the lock, which is very short, and other threads may also be trying to lock during this short period of time.

Secondly, it is important to note that after the thread that executes the Lock.park method sleeps itself, it does not have to wait until other threads unpark to wake up. It may wake up for some unknown reason at any time. Let's look at the source code comments. There are four reasons why park returns.

Other threads unpark the current thread

Time to wake up naturally (park has time parameter)

Other threads interrupt the current thread

"false waking" caused by other unknown reasons

The document does not specify what causes the false wake, but it does indicate that when the park method returns, it does not mean that the lock is free, and that the awakened thread will park itself again after a failed reattempt to acquire the lock. So the locking process needs to be written in a loop, and multiple attempts may be made before successfully getting the lock.

The service efficiency of unfair locks in the computer world is higher than that of fair locks, so Java default locks use unfair locks. However, in the real world, it seems that unfair locks will be less efficient. For example, if you can keep jumping the queue at KFC, you can imagine that the scene must be a mess. The reason why there is a difference between the computer world and the real world is probably because one thread jumping the queue in the computer world does not cause other threads to complain.

Public ReentrantLock () {

This.sync = new NonfairSync ()

}

Public ReentrantLock (boolean fair) {

This.sync = fair? New FairSync (): new NonfairSync ()

}

Shared lock and exclusive lock

ReentrantLock's lock is an exclusive lock, held by one thread, and other threads must wait. The read lock in ReadWriteLock is not an exclusive lock, it allows multiple threads to hold the read lock at the same time, which is a shared lock. Shared locks and exclusive locks are distinguished by the nextWaiter field in the Node class.

Class AQS {

Static final Node SHARED = new Node ()

Static final Node EXCLUSIVE = null

Boolean isShared () {

Return this.nextWaiter = = SHARED

}

}

So why isn't this field named mode or type or simply called shared? This is because nextWaiter has a different use in other scenarios, just like fields of C language union types, except that the Java language does not have union types.

Conditional variable

With regard to conditional variables, the first question to be asked is why conditional variables are needed, and locks are not enough. Consider the following pseudo-code to do something when a condition is met

Void doSomething () {

Locker.lock ()

While (! condition_is_true ()) {/ / Let's see if we can make a scene first.

Locker.unlock (); / / just take a break and see if you can do it.

Sleep (1)

Locker.lock (); / / locking is needed to determine whether something can be done or not.

}

Justdoit (); / / make trouble

Locker.unlock ()

}

When the condition is not met, it will loop and retry (other threads will modify the condition by locking), but you need to interval sleep, otherwise the CPU will soar because of idling. There is a problem here, that is, how long sleep is out of control. If the interval is too long, it will slow down the overall efficiency, or even miss the opportunity (the condition is instantly satisfied and then reset immediately), and the interval is too short, which will lead to CPU idling. With conditional variables, this problem can be solved.

Void doSomethingWithCondition () {

Cond = locker.newCondition ()

Locker.lock ()

While (! condition_is_true ()) {

Cond.await ()

}

Justdoit ()

Locker.unlock ()

}

The await () method blocks on the cond condition variable until the cond.signal () or cond.signalAll () method is called by another thread, and the lock held by the current thread is automatically released when await () is blocked. When await () is awakened, it will try to hold the lock again (which may need to be queued again), and the await () method will not return until the lock is successfully obtained.

There can be multiple threads blocking on conditional variables, and these blocked threads are concatenated into a conditional waiting queue. When signalAll () is called, all blocked threads are awakened, causing all blocked threads to start scrambling for locks again. If signal () is called, only the thread at the head of the queue will be awakened, which can avoid "group problems".

The await () method must release the lock immediately, otherwise the critical section state cannot be modified by other threads and the result returned by condition_is_true () will not change. This is why the condition variable must be created by the lock object, and the condition variable needs to hold a reference to the lock object so that the lock can be released and re-locked after being woken up by signal. The lock that creates the condition variable must be an exclusive lock. If the shared lock is released by the await () method, there is no guarantee that the state of the critical section can be modified by other threads. This is why the newCondition method of the ReadWriteLock.ReadLock class is defined as follows

Public Condition newCondition () {

Throw new UnsupportedOperationException ()

}

With conditional variables, the problem that sleep is not easy to control is solved. When the condition is met, the signal () or signalAll () method is called, and the blocked thread can be awakened immediately with almost no delay.

Conditional waiting queue

When multiple threads await () are on the same condition variable, a conditional waiting queue is formed. Multiple conditional variables can be created for the same lock, so there will be multiple conditional waiting queues. This queue is similar to AQS's queue structure, except that it is not a two-way queue, but an one-way queue. The nodes in the queue and the nodes in the AQS waiting queue are of the same class, but the node pointer is not prev and next, but nextWaiter.

Class AQS {

...

Class ConditionObject {

Node firstWaiter; / / points to the first node

Node lastWaiter; / / points to the second node

}

Class Node {

Static final int CONDITION =-2

Static final int SIGNAL =-1

Thread thread; / / currently waiting thread

Node nextWaiter; / / points to the next conditional waiting node

Node prev

Node next

Int waitStatus; / / waitStatus = CONDITION

}

...

}

ConditionObject is the inner class of AQS, in which there will be a hidden pointer this$0 to the external AQS object, and ConditionObject can directly access all properties and methods of the AQS object (locked and unlocked). The waitStatus status of all nodes in the conditional wait queue is marked as CONDITION, indicating that the node is waiting because of the condition variable.

Queue transfer

When the signal () method of the conditional variable is called, the head node thread of the conditional waiting queue is awakened, and the node is removed from the conditional waiting queue and transferred to the AQS waiting queue, ready to queue to try to reacquire the lock. At this time, the state of the node changes from CONDITION to SIGNAL, indicating that the current node is awakened and transferred by the condition variable.

Class AQS {

...

Boolean transferForSignal (Node node) {

/ / reset node status

If (! node.compareAndSetWaitStatus (Node.CONDITION, 0))

Return false

Node p = enq (node); / / enter the AQS waiting queue

Int ws = p.waitStatus

/ / change the status to SIGNAL again

If (ws > 0 | |! p.compareAndSetWaitStatus (ws, Node.SIGNAL))

LockSupport.unpark (node.thread)

Return true

}

...

}

The meaning of the nextWaiter field of the transferred node has also changed. It is the pointer to the next node in the conditional queue and whether it is a shared lock or mutex flag in the AQS waiting queue.

ReentrantLock locking process

Next, we analyze the locking process in detail and deeply understand the lock logic control. I must be sure that the Dough Lea code is written in such a minimalist form that it is difficult to understand.

Class ReentrantLock {

...

Public void lock () {

Sync.acquire (1)

}

...

}

Class Sync extends AQS {

...

Public final void acquire (int arg) {

If (! tryAcquire (arg) & &

AcquireQueued (addWaiter (Node.EXCLUSIVE), arg))

SelfInterrupt ()

}

...

}

The if judgment statement of acquire is divided into three parts. The tryAcquire method indicates that the current thread is trying to add a lock. If the lock is not successful, it needs to be queued. Call the addWaiter method to queue the current thread. Then call the acquireQueued method to start the cyclic retry locking process of park, waking up, retrying locking, and unsuccessfully continuing park. The acquire method does not return until the lock is successful.

If it is interrupted by another thread during the loop retry locking, the acquireQueued method returns true. At this point, the thread needs to call the selfInterrupt () method to set an interrupted identity bit for the current thread.

/ / interrupting the current thread is actually setting an identification bit

Static void selfInterrupt () {

Thread.currentThread () .interrupt ()

}

How does a thread know it has been interrupted by another thread? You can call Thread.interrupted () after park wakes up, but this method can only be called once, because it will clear the flag immediately after it is called. This is why selfInterrupt () needs to be called in the acquire method to reset the break flag bit. In this way, the upper logic can know if it has been interrupted through Thread.interrupted ().

The acquireQueued and addWaiter methods are provided by the AQS class, and the tryAcquire needs to be implemented by the subclasses themselves. Different locks have different implementations. Let's take a look at the implementation of ReentrantLock's fair lock tryAcquire method.

There is an if else branch, where the else if part indicates the reentry of the lock. The thread that is trying to lock is the thread that already holds the lock, that is, the same thread has repeatedly added the lock. In this case, you only need to increase the count value. The state of the lock records the lock count, which is + 1 for each reentry. There is an exclusiveOwnerThread field in the AQS object that records the thread that currently holds the exclusive lock.

If (c = = 0) means that the current lock is free and the count is zero. At this point, you need to scramble for locks because there may be multiple threads calling tryAcquire at the same time. The way to compete is to operate compareAndSetState with CAS, and the thread that successfully changes the lock count from 0 to 1 will acquire the lock and record the current thread in exclusiveOwnerThread.

There is also a hasQueuedPredecessors () judgment in the code, which is very important. It means to see if there are any other threads in the current AQS waiting queue. Fair lock needs to be check before adding the lock. If there is a queue, you cannot jump the queue. The unfair lock does not need check. This is the difference between fair lock and unfair lock. This check determines whether the lock is fair or not.

Let's take a look at the implementation of the addWaiter method. The parameter mode, which corresponds to the Node.nextWaiter property, indicates whether the lock is shared or exclusive.

AddWaiter needs to add a new node to the end of the AQS waiting queue. If the tail at the end of the queue is empty, which means that the queue has not been initialized, it needs to be initialized. The AQS queue requires a redundant header node when initializing, and the thread field of this node is empty.

Adding a new node to the end of the queue also takes into account multithreaded concurrency, so the code once again uses the CAS operation compareAndSetTail to compete for the tail pointer. The uncontended thread will continue the next round of contention for (;) continue to use the CAS operation to add the new node to the end of the queue.

Let's take a look at the code implementation of the acquireQueue method, which repeats the park, attempts to lock again, and fails to continue the park loop.

AcquireQueue will check to see if he is the first node in the AQS waiting queue before attempting to lock, and if not, continue to park. This means that no matter whether it is fair or unfair, they all adopt a fair solution here to see if it is their turn in the queue. In other words, "once in line, always in line."

Private final boolean parkAndCheckInterrupt () {

LockSupport.park (this)

Return Thread.interrupted ()

}

Immediately after the park returns and wakes up, the thread checks to see if it has been interrupted by another thread. However, even if an interruption occurs, it will continue to try to acquire the lock, and if it cannot, it will continue to sleep until the lock is acquired. This means that interrupting the thread does not cause the deadlock state (unable to get the lock) to exit.

At the same time, we can also notice that the lock is cancelable cancelAcquire (), to be exact, the cancellation is waiting for the lock, and the thread is in the waiting queue of the AQS waiting for the lock. Under what circumstances will an exception be thrown and the lock will be unlocked? the only possibility is the tryAcquire method, which is implemented by a subclass, whose behavior is not controlled by AQS. When the tryAcquire method of the subclass throws an exception, the best way to handle AQS is to unlock it. CancelAcquire removes the current node from the waiting queue.

ReentrantLock unlocking process

The unlocking process is simpler. After the lock count is reduced to zero, the first valid node in the waiting queue is awakened.

Public final boolean release (int arg) {

If (tryRelease (arg)) {

Node h = head

If (h! = null & & h.waitStatus! = 0)

UnparkSuccessor (h)

Return true

}

Return false

}

Protected final boolean tryRelease (int releases) {

Int c = getState ()-releases

/ / the person who unlocks the bell must also tie the bell.

If (Thread.currentThread ()! = getExclusiveOwnerThread ())

Throw new IllegalMonitorStateException ()

Boolean free = false

If (c = = 0) {

Free = true

SetExclusiveOwnerThread (null)

}

SetState (c)

Return free

}

Considering the reentrant lock, it is necessary to determine whether the lock count is reduced to zero to determine whether the lock is completely released. The successor waiting node can be awakened only if the lock is completely released. UnparkSuccessor skips invalid nodes (cancelled nodes) and finds the first valid node to call unpark () to wake up the corresponding thread.

Read-write lock

Read-write locks are divided into two lock objects, ReadLock and WriteLock, which share the same AQS. The lock count variable state of AQS will be divided into two parts, the first 16bit is the shared lock ReadLock count and the latter 16bit is the mutex WriteLock count. The mutex records the number of reentrants of the current write lock, while the shared lock records the total number of reentrants of all threads currently holding the shared read lock.

Read-write locks also need to consider fair and unfair locks. The fair locking strategy for shared locks and mutexes is the same as ReentrantLock, which is to see if there are any other threads waiting in the queue, and will obediently wait at the end of the queue. The unfair locking strategy is different, and it tends to provide more opportunities for write locks. If there are any threads of read and write requests in the current AQS queue, the write lock can compete directly, but if the head of the line is a write lock request, then the read lock needs to give up the opportunity to the write lock and queue at the end of the queue.

After all, read-write locks are suitable for situations where there are more reads and less writes, and occasional write lock requests should be given a higher priority.

Write lock locking process

The write lock and lock of the read-write lock is the same as ReentrantLock in the overall logic, except that the tryAcquire () method

Public final void acquire (int arg) {

If (! tryAcquire (arg) & &

AcquireQueued (addWaiter (Node.EXCLUSIVE), arg))

SelfInterrupt ()

}

Protected final boolean tryAcquire (int acquires) {

Thread current = Thread.currentThread ()

Int c = getState ()

Int w = exclusiveCount (c)

If (c! = 0) {

If (w = = 0 | | current! = getExclusiveOwnerThread ())

Return false

If (w + exclusiveCount (acquires) > MAX_COUNT)

Throw new Error ("Maximum lock count exceeded")

SetState (c + acquires)

Return true

}

If (writerShouldBlock ()) | |

! compareAndSetState (c, c + acquires))

Return false

SetExclusiveOwnerThread (current)

Return true

}

Write locks also need to consider reentrant. If the holding thread of the current AQS mutex lock happens to be the current thread to be locked, then the write lock is reentering, and reentrant only needs to increment the lock count value. When clocked zero, that is, the lock count is not 00:00, either because the current AQS has a read lock or a write lock, judging w = = 0 is to determine whether the current count is brought by a read lock.

If the count is zero, then start scrambling for locks. Depending on whether the lock is fair, call the writerShouldBlock () method before the fight to see if you need to queue. If you don't need to queue, you can use the CAS operation to compete. The thread that successfully sets the count from 0 to 1 will write the lock exclusively.

Read lock adding process

The process of locking a read lock is much more complicated than that of a write lock, which is the same as the write lock in the overall process, but there is a big difference in detail. In particular, it needs to record the read lock count for each thread, which takes up a lot of code.

Public final void acquireShared (int arg) {

/ / if the attempt to lock is unsuccessful, go to the queue to sleep, and then try again in a loop

If (tryAcquireShared (arg) < 0)

/ / queue and cycle retry

DoAcquireShared (arg)

}

If the current thread already holds a write lock, it can continue to add read locks in order to achieve the logic that must be supported for lock degradation. Lock degradation means that when a write lock is held, the read lock is added and then the write lock is unlocked. Compared with writing and unlocking first and then adding read lock, this can save the process of adding lock and queuing again. Because of the lock degradation, the read and write count in the lock count can be non-zero at the same time.

Wlock.lock ()

If (whatever) {

/ / downgrade

Rlock.lock ()

Wlock.unlock ()

DoRead ()

Rlock.unlock ()

} else {

/ / No demotion

DoWrite ()

Wlock.unlock ()

}

To count locks for each lock-reading thread, it sets a ThreadLocal variable.

Private transient ThreadLocalHoldCounter readHolds

Static final class HoldCounter {

Int count

Final long tid = LockSupport.getThreadId (Thread.currentThread ())

}

Static final class ThreadLocalHoldCounter

Extends ThreadLocal {

Public HoldCounter initialValue () {

Return new HoldCounter ()

}

}

But ThreadLocal variables are not accessed efficiently enough, so caching is set up again. It stores the lock count of the last lock acquisition thread. When thread contention is not particularly frequent, it is more efficient to read the cache directly.

Private transient HoldCounter cachedHoldCounter

Dough Lea felt that using cachedHoldCounter was still not efficient enough, so he added another layer of cache record firstReader to record the first thread to change the read lock count from 0 to 1 and the lock count. It is more efficient to read these two fields directly when there is no thread contention.

Private transient Thread firstReader

Private transient int firstReaderHoldCount

Final int getReadHoldCount () {

/ / access the read count part of the lock global count first

If (getReadLockCount () = = 0)

Return 0

/ / visit firstReader again

Thread current = Thread.currentThread ()

If (firstReader = = current)

Return firstReaderHoldCount

/ / revisit the most recent read thread lock count

HoldCounter rh = cachedHoldCounter

If (rh! = null & & rh.tid = = LockSupport.getThreadId (current))

Return rh.count

/ / have no choice but to read ThreadLocal.

Int count = readHolds.get () .count

If (count = = 0) readHolds.remove ()

Return count

}

So we see that the author takes great pains to record this read lock count, so what is the purpose of this read count? That is, the thread can know whether it holds the read-write lock by this count.

There is also a process of spin in reading locking. The so-called spin is the failure of locking for the first time, then directly retry in a loop without dormancy, which sounds a bit like an endless loop retry.

Final static int SHARED_UNIT = 65536

/ / read count is 16 bits high

Final int fullTryAcquireShared (Thread current) {

For (;;) {

Int c = getState ()

/ / if other threads add a write lock, go back to bed

If (exclusiveCount (c)! = 0) {

If (getExclusiveOwnerThread ()! = current)

Return-1

...

/ / exceeded the upper limit of the count

If (sharedCount (c) = = MAX_COUNT)

Throw new Error ("Maximum lock count exceeded")

If (compareAndSetState (c, c + SHARED_UNIT)) {

/ / got the reading lock

...

Return 1

}

...

/ / cycle retry

}

}

Because the read lock needs to use the CAS operation to modify the total reading count of the underlying lock, the successful one can obtain the read lock. The failure of the CAS operation to acquire the read lock only means that there is competition for the CAS operation between the read locks, and it does not mean that the lock is occupied by someone else and cannot be obtained. If you try it a few more times, you will be able to lock it successfully, which is why you spin. There is also a cyclic retry of the CAS operation when the read lock is released.

Protected final boolean tryReleaseShared (int unused) {

...

For (;;) {

Int c = getState ()

Int nextc = c-SHARED_UNIT

If (compareAndSetState (c, nextc)) {

Return nextc = = 0

}

}

...

}

This is the end of the article on "what is the cornerstone of Java concurrent data structures?" Thank you for reading! I believe you all have a certain understanding of the knowledge of "what is the cornerstone of Java concurrent data structure". If you want to learn more, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report