In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains "Java blocking queue SynchronousQueue instance Analysis". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "Java blocking queue SynchronousQueue instance Analysis".
Catalogue
Analysis.
In fact, SynchronousQueue is a particularly interesting blocking queue, and as far as I understand it, its important feature is that it has no capacity.
Look directly at an example:
Package dongguabai.test.juc.test;import java.util.concurrent.SynchronousQueue;/** * @ author Dongguabai * @ description * @ date 2021-09-01 21:52 * / public class TestSynchronousQueue {public static void main (String [] args) {SynchronousQueue synchronousQueue = new SynchronousQueue (); boolean add = synchronousQueue.add ("1"); System.out.println (add);}}
The code is simple: you put an element in the SynchronousQueue, but the program throws an exception:
Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add (AbstractQueue.java:98) at dongguabai.test.juc.test.TestSynchronousQueue.main (TestSynchronousQueue.java:14)
The reason for the exception is that the queue is full. You just used the SynchronousQueue#add method, so now let's look at the SynchronousQueue#put method:
Public static void main (String [] args) throws InterruptedException {SynchronousQueue synchronousQueue = new SynchronousQueue (); synchronousQueue.put ("1"); System.out.println ("- -");}
When you look at InterruptedException, you can actually guess that this method will definitely block the current thread.
Through these two examples, it is explained that the SynchronousQueue queue has no capacity, that is to say, before adding elements to the SynchronousQueue, you have to extract the elements from the SynchronousQueue. This sentence sounds awkward, and you can guess its implementation principle from another point of view. When calling the take-out method, you set a "thread is waiting to be taken out" identity, and then when you add elements, you first look at this logo. If there is a thread waiting to be fetched, the addition is successful, otherwise an exception or blocking is thrown.
Analysis.
Next, start with the SynchronousQueue#put method:
Public void put (E e) throws InterruptedException {if (e = null) throw new NullPointerException (); if (transferer.transfer (e, false, 0) = = null) {Thread.interrupted (); throw new InterruptedException ();}}
You can find that the Transferer#transfer method is called, and this Transferer is initialized when the SynchronousQueue is constructed:
Public SynchronousQueue (boolean fair) {transferer = fair? New TransferQueue (): new TransferStack ();}
SynchronousQueue has two modes, fair and unfair. The default is unfair, and the unfair use is TransferStack, which is based on an one-way linked list:
Static final class SNode {volatile SNode next; / / next node in stack volatile SNode match; / / the node matched to this volatile Thread waiter; / / to control park/unpark Object item; / / data; or null for REQUESTs int mode;...}
So the key point is the SynchronousQueue.TransferStack#transfer method, which can be seen from the method name to be used for data exchange, but this method has dozens of lines, and there are all kinds of Node pointers messing around. I don't think it's necessary to worry too much about the details, the old rules, grasping the big and small, and queues, which are very convenient for Debug debugging.
Let's get this straight again:
Today, we are studying blocking queues. If we focus on blocking, we should focus on take and put methods.
Transferer is an abstract class, and there is only one transfer method, that is, take and put are shared, so it must be based on the input parameters to distinguish functions.
The SynchronousQueue.TransferStack#transfer method called at the bottom of both take and put methods
Modify the example used by SynchronousQueue#put above and add a thread take:
Package dongguabai.test.juc.test;import java.util.Date;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;/** * @ author Dongguabai * @ description * @ date 2021-09-01 21:52 * / public class TestSynchronousQueue {public static void main (String [] args) throws InterruptedException {SynchronousQueue synchronousQueue = new SynchronousQueue () New Thread (()-> {System.out.println (new Date (). ToLocaleString () + ":" + Thread.currentThread (). GetName () + "- put data:" + "1"); try {synchronousQueue.put ("1");} catch (InterruptedException e) {e.printStackTrace ();}}) .start () System.out.println ("- -"); new Thread (()-> {Object take = null; try {take = synchronousQueue.take ();} catch (InterruptedException e) {e.printStackTrace ()) } System.out.println (new Date (). ToLocaleString () + ":" + Thread.currentThread (). GetName () + "- take to the data:" + take);}) .start (); TimeUnit.SECONDS.sleep (1); System.out.println ("end...");}}
The whole program ends and outputs:
-
2021-9-20: 58:55::Thread-0-put data: 1
2021-9-20: 58:55::Thread-1-take to data: 1
It's over.
That is, when a thread is in put, if there is a thread take, then the put thread can run normally and will not be blocked.
Based on this example, combined with the conjecture above, that is to say, the core point is that there is already a thread identification in take when you find put, or a thread is already in put when take is found. This logo is not necessarily a variable. Combined with the principle of AQS, it is likely to be judged by the Node in the linked list.
Next, take a look at the SynchronousQueue.put method:
Public void put (E e) throws InterruptedException {if (e = null) throw new NullPointerException (); if (transferer.transfer (e, false, 0) = = null) {Thread.interrupted (); throw new InterruptedException ();}}
The underlying layer is also the SynchronousQueue.TransferStack#transfer method that is called, but the passed-in parameters are the elements of the current put, false, and 0. Looking back at the SynchronousQueue.TransferStack#transfer method:
E transfer (E e, boolean timed, long nanos) {SNode s = null; / / constructed/reused as needed / / the parameter e here is the element to be put, obviously not null, that is to say, DATA mode, according to the comment, the DATA mode indicates that the current thread is producer int mode = (e = = null)? REQUEST: DATA; for (;;) {SNode h = head; if (h = = null | | h.mode = = mode) {/ / empty or same-mode if (timed & & nanos)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.