Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to master CountDownLatch usage and source code

2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly introduces "how to master the usage and source code of CountDownLatch". In daily operation, I believe many people have doubts about how to master the usage and source code of CountDownLatch. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "how to master the usage and source code of CountDownLatch". Next, please follow the editor to study!

Get to know CountDownLatch

CountDownLatch enables a thread to continue execution after waiting for other threads to finish their work. It is equivalent to a counter, the initial value of this counter is the number of threads, each time a task is completed, the value of the counter will be reduced by one, when the value of the counter is 0, it means that all threads have finished the task, and then the thread waiting on the CountDownLatch can resume execution of the next task.

The use of CountDownLatch

CountDownLatch provides a constructor, you must specify its initial value, but also specify the countDown method, this method is mainly used to reduce the value of the counter, when the counter becomes 0, the thread of await on the CountDownLatch will be awakened to continue to perform other tasks. Of course, you can also delay the wake-up call, which can be achieved by adding a delay to CountDownLatch.

The main methods are as follows

CountDownLatch mainly has the following application scenarios

CountDownLatch application scenario

A typical application scenario is that when a service starts, many components and services are loaded at the same time, and the main thread waits for the components and services to load. When all the components and services are loaded, the main thread and other threads complete a task together.

CountDownLatch can also implement the program for students to race and run together. CountDownLatch is initialized to the number of threads of students. After firing a shot, each student is a thread to complete their own tasks. When the first student finishes running, the CountDownLatch will be reduced by one, until all the students finish, the CountDownLatch will become zero, and then announce the running results together.

Along this scene, you can extend and expand a lot of other task scenarios.

CountDownLatch usage

Let's demonstrate the use of CountDownLatch through a simple counter.

Public class TCountDownLatch {public static void main (String [] args) {CountDownLatch latch = new CountDownLatch (5); Increment increment = new Increment (latch); Decrement decrement = new Decrement (latch); new Thread (increment). Start (); new Thread (decrement). Start (); try {Thread.sleep (6000);} catch (InterruptedException e) {e.printStackTrace () } class Decrement implements Runnable {CountDownLatch countDownLatch; public Decrement (CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;} @ Override public void run () {try {for (long I = countDownLatch.getCount (); I > 0 Bing imuri -) {Thread.sleep (1000); System.out.println ("countdown") This.countDownLatch.countDown ();}} catch (InterruptedException e) {e.printStackTrace ();} class Increment implements Runnable {CountDownLatch countDownLatch; public Increment (CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch @ Override public void run () {try {System.out.println ("await"); countDownLatch.await ();} catch (InterruptedException e) {e.printStackTrace ();} System.out.println ("Waiter Released");}}

In the main method, we initialize a CountDownLatch with a counter of 5. In the Decrement method, we use countDown to perform the minus one operation, then sleep for a period of time, and wait in the Increment class until the thread in the Decrement completes the count minus one operation. Wake up the run method in the Increment class to continue execution.

Let's demonstrate the specific use of CountDownLatch through the example of student racing.

Public class StudentRunRace {CountDownLatch stopLatch = new CountDownLatch (1); CountDownLatch runLatch = new CountDownLatch (10); public void waitSignal () throws Exception {System.out.println ("contestant" + Thread.currentThread (). GetName () + "waiting for the referee to issue a password"); stopLatch.await (); System.out.println ("contestant" + Thread.currentThread (). GetName () + "password accepted by referee") Thread.sleep ((long) (Math.random () * 10000)); System.out.println ("contestant" + Thread.currentThread (). GetName () + "reach the finish line"); runLatch.countDown ();} public void waitStop () throws Exception {Thread.sleep ((long) (Math.random () * 10000) System.out.println ("referee" + Thread.currentThread (). GetName () + "password to be issued"); stopLatch.countDown (); System.out.println ("referee" + Thread.currentThread (). GetName () + "password sent, waiting for all runners to reach the finish line"); runLatch.await (); System.out.println ("all runners reach the finish line") System.out.println ("referee" + Thread.currentThread (). GetName () + "Summary ranking");} public static void main (String [] args) {ExecutorService service = Executors.newCachedThreadPool (); StudentRunRace studentRunRace = new StudentRunRace (); for (int I = 0; I

< 10; i++) { Runnable runnable = () ->

{try {studentRunRace.waitSignal ();} catch (Exception e) {e.printStackTrace ();}}; service.execute (runnable);} try {studentRunRace.waitStop () } catch (Exception e) {e.printStackTrace ();} service.shutdown ();}}

Next, let's analyze the source code of CountDownLatch.

CountDownLatch source code analysis

CountDownLatch is easy to use, but it is very useful. Now you can add CountDownLatch to your toolbox. Let's take a closer look at CountDownLatch.

The underlying layer of CountDownLatch is supported by AbstractQueuedSynchronizer, while the core of AQS's data structure is two queues, one is synchronous queue (sync queue), the other is conditional queue (condition queue).

Sync inner class

Inside CountDownLatch is a Sync that inherits the AQS abstract class.

Private static final class Sync extends AbstractQueuedSynchronizer {...}

CountDownLatch actually has only one sync attribute inside it, and it is final.

Private final Sync sync

CountDownLatch has only one constructor with parameters

Public CountDownLatch (int count) {if (count)

< 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } 也就是说,初始化的时候必须指定计数器的数量,如果数量为负会直接抛出异常。 然后把 count 初始化为 Sync 内部的 count,也就是 Sync(int count) { setState(count); } 注意这里有一个 setState(count),这是什么意思呢?见闻知意这只是一个设置状态的操作,但是实际上不单单是,还有一层意思是 state 的值代表着待达到条件的线程数。这个我们在聊 countDown 方法的时候再讨论。 getCount() 方法的返回值是 getState() 方法,它是 AbstractQueuedSynchronizer 中的方法,这个方法会返回当前线程计数,具有 volatile 读取的内存语义。 // ---- CountDownLatch ---- int getCount() { return getState(); } // ---- AbstractQueuedSynchronizer ---- protected final int getState() { return state; } tryAcquireShared() 方法用于获取·共享状态下对象的状态,判断对象是否为 0 ,如果为 0 返回 1 ,表示能够尝试获取,如果不为 0,那么返回 -1,表示无法获取。 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // ---- getState() 方法和上面的方法相同 ---- 这个 共享状态 属于 AQS 中的概念,在 AQS 中分为两种模式,一种是 独占模式,一种是 共享模式。 tryAcquire 独占模式,尝试获取资源,成功则返回 true,失败则返回 false。 tryAcquireShared 共享方式,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 tryReleaseShared() 方法用于共享模式下的释放 protected boolean tryReleaseShared(int releases) { // 减小数量,变为 0 的时候进行通知。 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } 这个方法是一个无限循环,获取线程状态,如果线程状态是 0 则表示没有被线程占有,没有占有的话那么直接返回 false ,表示已经释放;然后下一个状态进行 - 1 ,使用 compareAndSetState CAS 方法进行和内存值的比较,如果内存值也是 1 的话,就会更新内存值为 0 ,判断 nextc 是否为 0 ,如果 CAS 比较不成功的话,会再次进行循环判断。 await 方法 await() 方法是 CountDownLatch 一个非常重要的方法,基本上可以说只有 countDown 和 await 方法才是 CountDownLatch 的精髓所在,这个方法将会使当前线程在 CountDownLatch 计数减至零之前一直等待,除非线程被中断。 CountDownLatch 中的 await 方法有两种,一种是不带任何参数的 await(),一种是可以等待一段时间的await(long timeout, TimeUnit unit)。下面我们先来看一下 await() 方法。 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } await 方法内部会调用 acquireSharedInterruptibly 方法,这个 acquireSharedInterruptibly 是 AQS 中的方法,以共享模式进行中断。 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 可以看到,acquireSharedInterruptibly 方法的内部会首先判断线程是否中断,如果线程中断,则直接抛出线程中断异常。如果没有中断,那么会以共享的方式获取。如果能够在共享的方式下不能获取锁,那么就会以共享的方式断开链接。 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >

= 0) {setHeadAndPropagate (node, r); p.next = null; / / help GC failed = false; return;}} if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt () throw new InterruptedException ();}} finally {if (failed) cancelAcquire (node);}}

This method is a little long, let's look at it separately.

First of all, a shared mode Node will be constructed to join the queue.

Then an infinite loop is used to determine the precursor node of the newly constructed node. If the precursor node of the node node is the header node, then the state of the thread will be judged. Here, a setHeadAndPropagate is called, and its source code is as follows

Private void setHeadAndPropagate (Node node, int propagate) {Node h = head; setHead (node); if (propagate > 0 | | h = = null | | h.waitStatus

< 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 首先会设置头节点,然后进行一系列的判断,获取节点的获取节点的后继,以共享模式进行释放,就会调用 doReleaseShared 方法,我们再来看一下 doReleaseShared 方法 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 这个方法会以无限循环的方式首先判断头节点是否等于尾节点,如果头节点等于尾节点的话,就会直接退出。如果头节点不等于尾节点,会判断状态是否为 SIGNAL,不是的话就继续循环 compareAndSetWaitStatus,然后断开后继节点。如果状态不是 SIGNAL,也会调用 compareAndSetWaitStatus 设置状态为 PROPAGATE,状态为 0 并且不成功,就会继续循环。 也就是说 setHeadAndPropagate 就是设置头节点并且释放后继节点的一系列过程。 我们来看下面的 if 判断,也就是 shouldParkAfterFailedAcquire(p, node) 这里 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); 如果上面 Node p = node.predecessor() 获取前驱节点不是头节点,就会进行 park 断开操作,判断此时是否能够断开,判断的标准如下 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws >

0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); pred.next = node;} else {compareAndSetWaitStatus (pred, ws, Node.SIGNAL);} return false;}

This method determines the node state (waitStatus) of the precursor node of Node p. There are five kinds of node states, which are

Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community

CANCELLED (1): indicates that the current node has been unscheduled. When it times out or is interrupted (in response to an interrupt), it will trigger a change to this state, and the node will not change after entering this state.

SIGNAL (- 1): indicates that the successor node is waiting for the current node to wake up. When a successor node joins the queue, the status of the predecessor node is updated to SIGNAL.

CONDITION (- 2): indicates that the node is waiting on the Condition. When other threads call the signal () method of Condition, the node in the CONDITION state will be transferred from the waiting queue to the synchronization queue, waiting for the synchronization lock to be acquired.

PROPAGATE (- 3): in shared mode, the predecessor node wakes up not only the successor node, but also the successor node.

0: the default state of the new node when it joins the queue.

If the precursor node is SIGNAL, it returns true to indicate that it can be disconnected, and if the status of the precursor node is greater than 0 (why not ws = = Node.CANCELLED at this time)? Because the only condition where ws is greater than 0 is the CANCELLED state. Then there is a series of lookup traversal operations up to the waitStatus > 0 of the precursor node. If ws 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node = = tail & & compareAndSetTail (node, pred)) {compareAndSetNext (pred, predNext, null);} else {int ws; if (pred! = head & & (ws = pred.waitStatus) = = Node.SIGNAL | (ws)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report