In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article focuses on "how to use the Semaphore of Java highly concurrent programming". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to use the Semaphore of Java highly concurrent programming.
Shared lock, exclusive lock
Before learning semaphore, we must first understand what a shared lock is.
Shared lock: it allows multiple threads to acquire locks at the same time and access shared resources concurrently
Exclusive lock: some people call it an "exclusive lock". It is exclusive and can only be held by one thread. when the exclusive lock is already held by a thread, other threads can only wait for it to be released before they can compete for the lock. And only one thread can compete for the lock at a time.
What is Semaphore?
Semaphore (semaphore) is used to control the number of threads accessing specific resources at the same time. It coordinates each thread to ensure the reasonable use of common resources. For many years, I have found it difficult to understand the meaning of Semaphore literally. I can only compare it to a traffic light that controls traffic. For example, traffic is restricted on XX road, only one hundred vehicles are allowed to travel on this road at the same time, and the rest have to wait at the intersection, so the first 100 cars will see the green light, they can drive into the road, and the cars behind will see the red light and cannot enter the XX road. But if five of the first hundred cars have left XX Road, then five cars are allowed to enter the road. In this example, the car is a thread. Driving into the road indicates that the thread is executing, leaving the road means that the thread is finished, and seeing a red light means that the thread is blocked and cannot be executed. "
The Semaphore mechanism is provided to threads to obtain permissions preemptively, so it can be fair or unfair, similar to ReentrantLock. Having said so much, let's take a look at a practical example. for example, if we go to the parking lot, there are only five parking spaces in the parking lot, but now there are eight cars coming to park, and the remaining three cars either wait for other cars to leave or find another parking space.
/ * * @ author: official account [Java Finance] * / public class SemaphoreTest {public static void main (String [] args) throws InterruptedException {/ / initialize five parking spaces Semaphore semaphore = new Semaphore (5); / / all cars final CountDownLatch latch = new CountDownLatch (8); for (int I = 0; I)
< 8; i++) { int finalI = i; if (i == 5) { Thread.sleep(1000); new Thread(() ->{stopCarNotWait (semaphore, finalI); latch.countDown ();}) .start (); continue;} new Thread (()-> {stopCarWait (semaphore, finalI); latch.countDown ();}) .start () } latch.await (); log ("Total remaining: + semaphore.availablePermits () +" parking spaces ");} private static void stopCarWait (Semaphore semaphore, int finalI) {String format = String.format (" license plate number% d ", finalI); try {semaphore.acquire (1); log (format +" found a parking space, go parking ") Thread.sleep (10000);} catch (Exception e) {e.printStackTrace ();} finally {semaphore.release (1); log (format + "drove away");}} private static void stopCarNotWait (Semaphore semaphore, int finalI) {String format = String.format ("license plate number% d", finalI) Try {if (semaphore.tryAcquire ()) {log (format + "found a parking space and went to park"); Thread.sleep (10000); log (format + "drove away"); semaphore.release () } else {log (format + "No parking space, don't wait here to park somewhere else");} catch (Exception e) {e.printStackTrace () } public static void log (String content) {/ / format DateTimeFormatter fmTime = DateTimeFormatter.ofPattern ("yyyy-MM-dd HH:mm:ss"); / / current time LocalDateTime now = LocalDateTime.now (); System.out.println (now.format (fmTime) + "" + content) } 2021-03-01 18:54:57 license plate 0 found a parking space, went to park 2021-03-01 18:54:57 license plate number 3 found a parking space, went to park 2021-03-01 18:54:57 license plate number 2 found a parking space, went to park 2021-03-01 18:54:57 license plate number 1 found a parking space, went to park 2021-03-01 18:54:57 license plate number 4 found a parking space Went to park 2021-03-01 18:54:58 license plate number 5 has no parking space, did not wait here to park somewhere else, 2021-03-01 18:55:07 license plate number 7 found a parking space, went to park 2021-03-01 18:55:07 license plate number 6 found a parking space 2021-03-01 18:55:07 license plate 2 left 2021-03-01 18:55:07 license plate 0 left 2021-03-01 18:55:07 license plate number 3 left 2021-03-01 18:55:07 license plate 4 left 2021-03-01 18:55:07 license plate 1 left 2021-03-01 18:55:17 license plate 7 left 2021-03-01 18:55 The license plate number 6 left at 18:55:17, 2021-03-01. There are 5 parking spaces left.
From the output results, we can see that the car license plate number 5 saw that there was no parking space, so it did not wait foolishly in this place, but went somewhere else, but the license plate number 6 and license plate number 7 had to wait until two cars had two parking spaces vacated in the garage. This shows that Semaphore's acquire method will block if it does not get the credential, while the tryAcquire method will not block if it does not get the credential.
Application of semaphore in dubbo
In Dubbo, you can configure the thread pool size for Provider to control the maximum parallelism of the service provided by the system, which defaults to 200.
For example, my current order system has three interfaces, which are order creation, order cancellation and order modification. The total concurrency of these three interfaces is 200, but the Chuangdan interface is the core interface. I want it to have more threads to execute so that it can have up to 150 threads, cancel orders and modify orders with a maximum of 25 threads each. Dubbo provides the attribute executes to implement this function.
We can take a look at how executes is implemented inside dubbo. The specific implementation is that in the class ExecuteLimitFilter, we can
Public Result invoke (Invoker invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl (); String methodName = invocation.getMethodName (); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter (methodName, Constants.EXECUTES_KEY, 0); if (max > 0) {RpcStatus count = RpcStatus.getStatus (url, invocation.getMethodName ()) / / if the number of threads currently in use is greater than or equal to the threshold set, throw an exception / / if (count.getActive () > = max) {/ / throw new RpcException ("Failed to invoke method" + invocation.getMethodName () + "in provider" + url + ", cause: The service / / using threads greater than limited.") / * * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) * / executesLimit = count.getSemaphore (max) If (executesLimit! = null & &! (acquireResult = executesLimit.tryAcquire ()) {throw new RpcException ("Failed to invoke method" + invocation.getMethodName () + "in provider" + url + ", cause: The service using threads greater than limited.");} long begin = System.currentTimeMillis (); boolean isSuccess = true / / counter + 1 RpcStatus.beginCount (url, methodName); try {Result result = invoker.invoke (invocation); return result;} catch (Throwable t) {isSuccess = false; if (t instanceof RuntimeException) {throw (RuntimeException) t } else {throw new RpcException ("unexpected exception when ExecuteLimitFilter", t);} finally {/ / counter-1 RpcStatus.endCount (url, methodName, System.currentTimeMillis ()-begin, isSuccess); if (acquireResult) {executesLimit.release ();}
From the above code we can also see that the early implementation of this is not using Semaphore, but directly using the annotated if (count.getActive () > = max) to achieve this, because the count.getActive () > = max and this count plus 1 is not atomic, so there will be problems, specific bug numbers can see https://github.com/apache/dubbo/pull/582 after using the above code to use Semaphore to repair non-atomic problems. For a more detailed analysis, see the link to the code. But now the latest version (2.7.9) I think is implemented using spin plus and CAS.
Semaphore
The above is a simple use of Semaphore and the example used in dubbo, to be honest, Semaphore is rarely used at work, but the interview may be asked, so it is necessary to learn it together. In front of us, "AQS of Java High concurrency programming Foundation" learned AQS together through ReentrantLock. In fact, Semaphore is also realized through AQS. We can compare the methods of exclusive locking together. Basically, there are methods that correspond to each other. Here are two things to note in the picture:
In exclusive lock mode, we wake up the successor node only when the node that has acquired the exclusive lock releases the lock, because the exclusive lock can only be held by one thread, and if it has not been released, there is no need to wake up its successor node.
In shared lock mode, when a node acquires a shared lock, we can wake up the successor node after successful acquisition, without waiting for the node to release the lock, because the shared lock can be held by multiple threads at the same time. If a lock is acquired, the subsequent nodes can acquire it directly. Therefore, in shared lock mode, the successor node is awakened when the lock is acquired and released.
Get credential
We also obtain credentials through the mode of unfair locking, and we can take a look at the core approach of acquire.
Public final void acquireSharedInterruptibly (int arg) throws InterruptedException {if (Thread.interrupted ()) throw new InterruptedException (); if (tryAcquireShared (arg)
< 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 主要看下这个方法,这个方法返回的值也就是tryAcquireShared返回的值,因为tryAcquireShared->NonfairTryAcquireShared final int nonfairTryAcquireShared (int acquires) {/ / spin for (;;) {/ / Semaphore the value of the state variable of AQS represents the number of available licenses int available = getState (); / / the number of available licenses minus the number of licenses required this time is the remaining number of licenses int remaining = available-acquires / / if the number of licenses remaining is less than 0 or CAS successfully sets the number of currently available licenses to the number of licenses remaining, the number of successful licenses if (remaining) is returned.
< 0 || compareAndSetState(available, remaining)) return remaining; } 当tryAcquireShared 获取返回许可书小于0时说明获取许可失败需要进入doAcquireSharedInterruptibly这个方法去休眠。 当tryAcquireShared 获取返回许可书小于0时说明获取许可成功直接结束。 doAcquireSharedInterruptibly private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE), // 而共享锁调用的是addWaiter(Node.SHARED),表明了该节点处于共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {setHeadAndPropagate (node, r); p.next = null; / / help GC failed = false; return }} if (shouldParkAfterFailedAcquire (p, node) & & parkAndCheckInterrupt () throw new InterruptedException ();}} finally {if (failed) cancelAcquire (node);}}
Is this method similar to the acquireQueued of AQS's exclusive lock we talked about in the last article, but the exclusive lock directly calls the setHead (node) method, while the shared lock calls setHeadAndPropagate (node, r), which not only calls setHead but also calls doReleaseShared (wake up successor node).
Private void setHeadAndPropagate (Node node, int propagate) {Node h = head; / / Record old head for check below setHead (node); if (propagate > 0 | h = = null | | h.waitStatus < 0 | (h = head) = = null | | h.waitStatus < 0) {Node s = node.next; if (s = = null | s.isShared () doReleaseShared ();}}
Other methods are basically similar to the exclusive locks implemented by ReentrantLock. I believe you are not interested in source code analysis, and you still need to look at the source code yourself for more details.
Summary
When the semaphore Semaphore initialization setting license is 1, it can also be used as a mutex. Where 0 and 1 are equivalent to its state, when = 1 means that other threads can get it, and when = 0, exclusive, that is, other threads must wait.
Semaphore is a very simple tool class in the JUC package, which is used to limit the number of threads accessing resources at the same time under multithreading.
There is a concept of "permission" in Semaphore, that is, before you can access a resource, you need to obtain a license. If the current number of licenses is 0, then the thread blocks until you get the license.
Semaphore is implemented internally using AQS, and AQS is inherited by the abstract inner class Sync. Because Semaphore is a shared scenario by nature, its interior is actually similar to the implementation of shared locks.
The calling framework of a shared lock is very similar to an exclusive lock, the biggest difference between them is the logic of acquiring the lock-the shared lock can be held by multiple threads at the same time, while the exclusive lock can only be held by one thread at a time.
Because the shared lock can be held by multiple threads at the same time, when the header node acquires the shared lock, it can immediately wake up the successor node to compete for the lock without having to wait until the lock is released. Therefore, there may be two places in which the shared lock triggers the behavior of waking up the successor node, one after the current node successfully obtains the shared lock and the other after the current node releases the shared lock.
If semaphore is used to limit the current, there will be a sharp phenomenon.
★ means that all resources are used up in a short period of time, and no resources are available most of the time. For example, the calculator algorithm in the current limit method sets the maximum number of requests within 1 second to 100. if there are already 100 requests in the previous 100ms, then the later 900ms will not be able to process the requests. This is the spike phenomenon.
At this point, I believe you have a deeper understanding of "how to use the Semaphore of Java highly concurrent programming". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.