Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Analysis of CountDownLatch Source Code in Java concurrent programming

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces the Java concurrent programming CountDownLatch source code analysis related knowledge, the content is detailed and easy to understand, the operation is simple and fast, has a certain reference value, I believe that everyone after reading this Java concurrent programming CountDownLatch source code analysis article will have a harvest, let's take a look.

I. Preface

CountDownLatch maintains a counter (again the state field), calling the countDown method subtracts the counter by 1, and calling the await method blocks the thread until the counter changes to 0. It can be used to realize the logic that a thread waits for the completion of all child thread tasks before continuing to execute, or it can also achieve the function similar to simple CyclicBarrier, so that multiple threads can wait to start executing a certain section of logic at the same time.

2. Use

One thread waits for other threads to finish execution before continuing execution

. CountDownLatch cdl = new CountDownLatch (10); ExecutorService es = Executors.newFixedThreadPool (10); for (int I = 0; I

< 10; i++) { es.execute(() ->

{/ / do something cdl.countDown ();});} cdl.await ();.

To achieve a function similar to CyclicBarrier, first await, then countDown

. CountDownLatch cdl = new CountDownLatch (1); ExecutorService es = Executors.newFixedThreadPool (10); for (int I = 0; I

< 10; i++) { es.execute(() ->

{cdl.await (); / / do something});} Thread.sleep (10000L); cdl.countDown ();. III. Source code analysis

The structure of CountDownLatch is similar to that of ReentrantLock and Semaphore. It also uses the inner class Sync to inherit AQS, and overrides the tryAcquireShared and tryReleaseShared methods.

Let's first look at the constructor:

Public CountDownLatch (int count) {if (count)

< 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 需要传入一个大于0的count,代表CountDownLatch计数器的初始值,通过Sync的构造函数最终赋值给父类AQS的state字段。可一个看到这个state字段用法多多,在ReentrantLock中使用0和1来标识锁的状态,Semaphore中用来标识信号量,此处又用来表示计数器。 CountDownLatch要通过await方法完成阻塞,先来看看这个方法是如何实现的: public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 调用的是sync的acquireSharedInterruptibly方法,该方法定义在AQS中,Semaphore也调用的这个方法: public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 这个方法的逻辑前面在解析SemaPhore的时候细说过了,这里不再赘述,主要就是两个方法的调用,先通过tryAcquireShared方法尝试获取"许可",返回值代表此次获取后的剩余量,如果大于等于0表示获取成功,否则表示失败。如果失败,那么就会进入doAcquireSharedInterruptibly方法执行入队阻塞的逻辑。这里我们主要到CountDownLatch中看看tryAcquireShared方法的实现: protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } 和Semaphore的实现中每次将state减去requires不同,这里直接判断state是否为0,如果为0那么返回1,表示获取"许可"成功;如果不为0,表示失败,则需要入队阻塞。从这个tryAcquireShared方法就能看出CountDownLatch的逻辑了:等到state变为了0,那么所有线程都能获取运行许可。 那么我们接下来来到countDown方法: public void countDown() { sync.releaseShared(1); } 调用的是sync的releaseShared方法,该方法定义在父类AQS中,Semaphore使用的也是这个方法: public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //当state从非 doReleaseShared(); return true; } return false; } 前面提到了CountDownLatch也重写了tryReleaseShared方法: protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) //如果state等于0了直接返回false //保证在并发情况下,最多只会有一个线程返回true //也包括调用countDown的次数超过state的初始值 return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //如果返回true,表示state从非0变为了0 //那么后续需要唤醒阻塞线程 return nextc == 0; } } Semaphore在释放信号量的时候,是将获取的许可归还到state中,但是CountDownLatch没有获取许可的逻辑(获取许可的时候是判断state是否等于0),所以在countDown的时候没有释放的逻辑,就是将state减1,然后根据state减1之后的值是否为0判断release是否成功,如果state本来大于0,经过减1之后变为了0,那么返回true。tryReleaseShared方法的返回值决定了后续需不需要调用doReleaseShared方法唤醒阻塞线程。 这里有个逻辑:如果state已经为0,那么返回false。这个主要应对两种情况: 调用countDown的次数超过了state的初始值多 线程并发调用的时候保证只有一个线程去完成阻塞线程的唤醒操作 可以看到CountDownLatch没有锁的概念,countDown方法可以被一个线程重复调用,只需要对state做reduce操作,而不用关心是谁做的reduce。如果tryReleaseShared返回true,那么表示需要在后面进入doReleaseShared方法,该方法和Semaphore中调用的方法是同一个,主要是唤醒阻塞线程或者设置PROPAGAGE状态,这里也不再赘述~ 阻塞线程被唤醒之后,会在doAcquireSharedInterruptibly方法中继续循环,虽然和Semaphore调用的是同样的方法,但是这里有不一样的地方,所以还是提一句。我们首先回到doAcquireSharedInterruptibly方法: private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //如果head.next被unpark唤醒,说明此时state==0 //那么tryAcquireShared会返回1 int r = tryAcquireShared(arg); //r==1 if (r >

= 0) {/ / after the node node is awakened, the wake-up node.next / / will continue to be passed in turn, because the r here must be 1 setHeadAndPropagate (node, r); p.next = null; / / help GC failed = false Return;} if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt ()) throw new InterruptedException ();} finally {if (failed) cancelAcquire (node);}}

When the head.next thread is awakened by unpark, it will enter the tryAcquireShared method to judge, because the state is already 0 (unpark wakes up the thread only when state becomes 0). As mentioned earlier, in the tryAcquireShared rewritten by CountDownLatch, if state==0, 1 will be returned, so the setHeadAndPropagate method will be entered:

Private void setHeadAndPropagate (Node node, int propagate) {Node h = head; / / Record old head for check below setHead (node); if (propagate > 0 | h = = null | | h.waitStatus < 0 | (h = head) = = null | | h.waitStatus < 0) {Node s = node.next; if (s = = null | s.isShared () doReleaseShared ();}}

This method is described in detail in Semaphore, so let's take a look at it from the perspective of CountDownLatch. It's actually very simple, notice that the propagate parameter value of this method is 1, then you will enter the following if logic and continue to wake up the next node. When the thread corresponding to the next node is awakened, it will also enter the setHeadAndPropagate method, and the propagage is also 1, so continue to wake up the next node, which in turn wakes up the nodes of the entire CLH queue.

This is the end of the article on "CountDownLatch source code analysis in Java concurrent programming". Thank you for reading! I believe that everyone has a certain understanding of the knowledge of "CountDownLatch source code analysis in Java concurrent programming". If you want to learn more knowledge, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report