In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces the Java concurrent Semaphore source code example analysis, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.
Semaphore (semaphore) is a class commonly used in JUC packages. It is an application of AQS sharing mode, which allows multiple threads to operate on shared resources at the same time, and can effectively control the number of concurrency, so it can be used to realize flow control. Semaphore provides the concept of a license, which can be regarded as a bus ticket. Only those who have successfully obtained the ticket can get on the bus, and there are a certain number of tickets, which cannot be issued without limit, which will lead to bus overloading. So when the tickets are issued (the bus is fully loaded), others will have to wait for the next one. If someone gets off midway, his seat will be free, so if other people want to get on the bus, they can get a ticket again. Various kinds of pooling can be implemented with Semaphore, and we will write a simple database connection pool at the end of this article. First, let's take a look at Semaphore's constructor.
/ / Constructor 1public Semaphore (int permits) {sync = new NonfairSync (permits);} / / Constructor 2public Semaphore (int permits, boolean fair) {sync = fair? New FairSync (permits): new NonfairSync (permits);}
Semaphore provides two constructors with parameters, but no constructors without parameters. Both constructors must pass in an initial number of licenses, and the semaphores constructed using constructor 1 will be obtained in an unfair manner when obtaining licenses, and constructor 2 can specify how the license is obtained through parameters (fair or is unfair). Semaphore mainly provides two types of API, obtaining and releasing licenses. The default is to obtain and release a license, or you can pass in parameters to obtain and release multiple licenses at the same time. In this article we will only talk about obtaining and releasing one license at a time.
1. Obtain a license
/ / obtain a license (respond to interruption) public void acquire () throws InterruptedException {sync.acquireSharedInterruptibly (1);} / obtain a license (not responding to interruption) public void acquireUninterruptibly () {sync.acquireShared (1);} / attempt to obtain a license (unfair acquisition) public boolean tryAcquire () {return sync.nonfairTryAcquireShared (1) > = 0 } / / attempt to obtain license (timed acquisition) public boolean tryAcquire (long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos (1, unit.toNanos (timeout));}
The API above is the default license acquisition operation provided by Semaphore. Only one license is obtained at a time, which is a more common situation in real life. In addition to direct acquisition, it also provides an attempt to get, which may block the thread after a failure, while an attempt to get will not. It is also important to note that the tryAcquire method is attempted to obtain in an unfair way. In peacetime, what we often use is the acquire method to obtain the license. Let's take a look at how it is obtained. You can see that the acquire method is directly called sync.acquireSharedInterruptibly (1). This method is the method in AQS. We talked about it when we talked about the AQS source series. Now let's review it again.
/ / acquire locks in interruptible mode (shared mode) public final void acquireSharedInterruptibly (int arg) throws InterruptedException {/ / first determine whether the thread is interrupted, and if so, throw an exception if (Thread.interrupted ()) {throw new InterruptedException ();} / 1. Try to acquire the lock if (tryAcquireShared (arg))
< 0) { //2. 如果获取失败则进人该方法 doAcquireSharedInterruptibly(arg); }} acquireSharedInterruptibly方法首先就是去调用tryAcquireShared方法去尝试获取,tryAcquireShared在AQS里面是抽象方法,FairSync和NonfairSync这两个派生类实现了该方法的逻辑。FairSync实现的是公平获取的逻辑,而NonfairSync实现的非公平获取的逻辑。 abstract static class Sync extends AbstractQueuedSynchronizer { //非公平方式尝试获取 final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取可用许可证 int available = getState(); //获取剩余许可证 int remaining = available - acquires; //1.如果remaining小于0则直接返回remaining //2.如果remaining大于0则先更新同步状态再返回remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } }}//非公平同步器static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } //尝试获取许可证 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }}//公平同步器static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } //尝试获取许可证 protected int tryAcquireShared(int acquires) { for (;;) { //判断同步队列前面有没有人排队 if (hasQueuedPredecessors()) { //如果有的话就直接返回-1,表示尝试获取失败 return -1; } //获取可用许可证 int available = getState(); //获取剩余许可证 int remaining = available - acquires; //1.如果remaining小于0则直接返回remaining //2.如果remaining大于0则先更新同步状态再返回remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } }} 这里需要注意的是NonfairSync的tryAcquireShared方法直接调用的是nonfairTryAcquireShared方法,这个方法是在父类Sync里面的。非公平获取锁的逻辑是先取出当前同步状态(同步状态表示许可证个数),将当前同步状态减去参入的参数,如果结果不小于0的话证明还有可用的许可证,那么就直接使用CAS操作更新同步状态的值,最后不管结果是否小于0都会返回该结果值。这里我们要了解tryAcquireShared方法返回值的含义,返回负数表示获取失败,零表示当前线程获取成功但后续线程不能再获取,正数表示当前线程获取成功并且后续线程也能够获取。我们再来看acquireSharedInterruptibly方法的代码。 //以可中断模式获取锁(共享模式)public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //首先判断线程是否中断, 如果是则抛出异常 if (Thread.interrupted()) { throw new InterruptedException(); } //1.尝试去获取锁 //负数:表示获取失败 //零值:表示当前线程获取成功, 但是后继线程不能再获取了 //正数:表示当前线程获取成功, 并且后继线程同样可以获取成功 if (tryAcquireShared(arg) < 0) { //2. 如果获取失败则进人该方法 doAcquireSharedInterruptibly(arg); }} 如果返回的remaining小于0的话就代表获取失败,因此tryAcquireShared(arg) < 0就为true,所以接下来就会调用doAcquireSharedInterruptibly方法,这个方法我们在讲AQS的时候讲过,它会将当前线程包装成结点放入同步队列尾部,并且有可能挂起线程。这也是当remaining小于0时线程会排队阻塞的原因。而如果返回的remaining>= 0 means that the current thread is successful, so tryAcquireShared (arg)
< 0就为flase,所以就不会再去调用doAcquireSharedInterruptibly方法阻塞当前线程了。以上是非公平获取的整个逻辑,而公平获取时仅仅是在此之前先去调用hasQueuedPredecessors方法判断同步队列是否有人在排队,如果有的话就直接return -1表示获取失败,否则才继续执行下面和非公平获取一样的步骤。 2.释放许可证 //释放一个许可证public void release() { sync.releaseShared(1);} 调用release方法是释放一个许可证,它的操作很简单,就调用了AQS的releaseShared方法,我们来看看这个方法。 //释放锁的操作(共享模式)public final boolean releaseShared(int arg) { //1.尝试去释放锁 if (tryReleaseShared(arg)) { //2.如果释放成功就唤醒其他线程 doReleaseShared(); return true; } return false;} AQS的releaseShared方法首先调用tryReleaseShared方法尝试释放锁,这个方法的实现逻辑在子类Sync里面。 abstract static class Sync extends AbstractQueuedSynchronizer { ... //尝试释放操作 protected final boolean tryReleaseShared(int releases) { for (;;) { //获取当前同步状态 int current = getState(); //将当前同步状态加上传入的参数 int next = current + releases; //如果相加结果小于当前同步状态的话就报错 if (next < current) { throw new Error("Maximum permit count exceeded"); } //以CAS方式更新同步状态的值, 更新成功则返回true, 否则继续循环 if (compareAndSetState(current, next)) { return true; } } } ...} 可以看到tryReleaseShared方法里面采用for循环进行自旋,首先获取同步状态,将同步状态加上传入的参数,然后以CAS方式更新同步状态,更新成功就返回true并跳出方法,否则就继续循环直到成功为止,这就是Semaphore释放许可证的流程。 3.动手写个连接池 Semaphore代码并没有很复杂,常用的操作就是获取和释放一个许可证,这些操作的实现逻辑也都比较简单,但这并不妨碍Semaphore的广泛应用。下面我们就来利用Semaphore实现一个简单的数据库连接池,通过这个例子希望读者们能更加深入的掌握Semaphore的运用。 public class ConnectPool { //连接池大小 private int size; //数据库连接集合 private Connect[] connects; //连接状态标志 private boolean[] connectFlag; //剩余可用连接数 private volatile int available; //信号量 private Semaphore semaphore; //构造器 public ConnectPool(int size) { this.size = size; this.available = size; semaphore = new Semaphore(size, true); connects = new Connect[size]; connectFlag = new boolean[size]; initConnects(); } //初始化连接 private void initConnects() { //生成指定数量的数据库连接 for(int i = 0; i < this.size; i++) { connects[i] = new Connect(); } } //获取数据库连接 private synchronized Connect getConnect(){ for(int i = 0; i < connectFlag.length; i++) { //遍历集合找到未使用的连接 if(!connectFlag[i]) { //将连接设置为使用中 connectFlag[i] = true; //可用连接数减1 available--; System.out.println("【"+Thread.currentThread().getName()+"】以获取连接 剩余连接数:" + available); //返回连接引用 return connects[i]; } } return null; } //获取一个连接 public Connect openConnect() throws InterruptedException { //获取许可证 semaphore.acquire(); //获取数据库连接 return getConnect(); } //释放一个连接 public synchronized void release(Connect connect) { for(int i = 0; i < this.size; i++) { if(connect == connects[i]){ //将连接设置为未使用 connectFlag[i] = false; //可用连接数加1 available++; System.out.println("【"+Thread.currentThread().getName()+"】以释放连接 剩余连接数:" + available); //释放许可证 semaphore.release(); } } } //剩余可用连接数 public int available() { return available; } } 测试代码: public class TestThread extends Thread { private static ConnectPool pool = new ConnectPool(3); @Override public void run() { try { Connect connect = pool.openConnect(); Thread.sleep(100); //休息一下 pool.release(connect); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { for(int i = 0; i < 10; i++) { new TestThread().start(); } }} 测试结果:We use an array to hold references to database connections, and when initializing the connection pool, we call the initConnects method to create a specified number of database connections and store their references in an array, as well as an array of the same size to record whether the connection is available. Whenever an external thread requests a connection, it first calls the semaphore.acquire () method to obtain a license, then sets the connection status to in use, and finally returns a reference to the connection. The number of licenses is determined by the parameters passed during construction. The number of licenses for each call to the semaphore.acquire () method is reduced by 1. When the number is reduced to 0, there is no connection available, and if other threads come to get it, it will be blocked. Every time a thread releases a connection, semaphore.release () will be called to release the license, and the total number of licenses will increase, which means that the number of available connections will increase, so that the previously blocked thread will wake up and continue to obtain the connection, and then get the connection again. In the test example, a connection pool of 3 connections is initialized. We can see from the test results that every time a thread gets a connection, the remaining number of connections will be reduced by 1, and when it is reduced to 0, other threads will no longer be able to obtain it. At this point, you must wait for a thread to release the connection before continuing to get it. You can see that the number of remaining connections always changes between 0 and 3, indicating that our test is successful.
Thank you for reading this article carefully. I hope the article "sample Analysis of Semaphore Source Code of Java concurrency" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.