Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement Java blocking queue BlockingQueue

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "how to implement Java blocking queue BlockingQueue". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to implement Java blocking queue BlockingQueue".

BlockingQueue

First of all, basically, BlockingQueue is a first-in, first-out queue (Queue). Why is it Blocking? This is because BlockingQueue supports that when the queue element is obtained but the queue is empty, it blocks waiting for an element in the queue before returning; it also supports adding elements, if the queue is full, then wait until the queue can be put into the new element.

BlockingQueue is an interface that inherits from Queue, so its implementation class can also be used as an implementation of Queue, while Queue inherits from the Collection interface.

BlockingQueue provides four different methods for insert operation, remove operation, and get element operation for different scenarios: 1, throw an exception; 2, return a special value (null or true/false, depending on the specific operation); 3, block and wait for this operation until the operation succeeds; 4, block wait for this operation until it succeeds or times out for a specified time. The summary is as follows:

Throws exceptionSpecial valueBlocksTimes outInsertadd (e) offer (e) put (e) offer (e, time, unit) Removeremove () poll () take () poll (time, unit) Examineelement () peek () not applicablenot applicable

Various implementations of BlockingQueue follow these rules, and of course we don't have to memorize the table, know what's going on, and then write code to choose the right method by looking at the comments of the method according to our own needs.

For BlockingQueue, we should focus on the put (e) and take () methods, because they are blocked.

BlockingQueue does not accept the insertion of null values, and the corresponding method throws a NullPointerException exception when it encounters the insertion of null. The null value is usually returned as a special value here (the third column in the table), which means that the poll failed. Therefore, if the null value is allowed to be inserted, when getting it, it is not good to use null to determine whether it represents a failure or whether the value obtained is the null value.

A BlockingQueue may be bounded, and if the queue is found to be full when inserting, then the put operation will block. Usually, the unbounded queue we are talking about here is not really unbounded, but its capacity is Integer.MAX_VALUE (more than 2.1 billion).

BlockingQueue is designed to implement the producer-consumer queue, of course, you can also use it as a normal Collection, as mentioned earlier, it implements the java.util.Collection interface. For example, we can delete any element with remove (x), but this type of operation is usually not efficient, so try to use it only in a few situations, such as when a message is queued but needs to be canceled.

BlockingQueue implementations are thread-safe, but batch collection operations such as addAll, containsAll, retainAll, and removeAll are not necessarily atomic. For example, addAll (c) may throw an exception halfway after adding some elements, and some elements have already been added to the BlockingQueue, which is allowed, depending on the implementation.

BlockingQueue does not support shutdown operations such as close or shutdown, because developers may want no new elements to be added, depending on the implementation and without mandatory constraints.

Finally, in the producer-consumer scenario, BlockingQueue supports multi-consumers and multi-producers, which is actually a thread safety issue.

I believe every sentence above is very clear, BlockingQueue is a relatively simple thread-safe container, I will analyze its specific implementation in JDK, here it is time for Doug Lea performance.

ArrayBlockingQueue implemented by BlockingQueue

ArrayBlockingQueue is the bounded queue implementation class of BlockingQueue interface, which is implemented by array at the bottom.

Its concurrency control is controlled by a reentrant lock. No matter it is an insert operation or a read operation, it needs to acquire the lock in order to operate.

If readers have read my previous article on Condition in "one Line Source Code Analysis AbstractQueuedSynchronizer (2)", then you must be able to easily understand the ArrayBlockingQueue source code, which is implemented with one ReentrantLock and two corresponding Condition.

ArrayBlockingQueue has the following properties:

/ / Array final Object for storing elements [] items;// location of the next read operation int takeIndex;// number of elements in the int putIndex;// queue for the next write operation int count;// the following is the synchronizer final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull used to control concurrency

We use a schematic diagram to describe its synchronization mechanism:

The principle of ArrayBlockingQueue to achieve concurrent synchronization is that both read and write operations need to acquire AQS exclusive locks to operate. If the queue is empty, the thread of the read operation enters the queue of the reader thread, waits for the writer thread to write a new element, and then wakes up the first waiting thread of the reader thread queue. If the queue is full, the writer thread enters the write thread queue, waits for the reader thread to remove the queue element to make room, and then wakes up the first waiting thread in the writer queue.

For ArrayBlockingQueue, we can specify the following three parameters at construction time:

Queue capacity, which limits the maximum number of elements allowed in the queue

Specifies whether the exclusive lock is fair or unfair. The throughput of the unfair lock is relatively high, and the fair lock can ensure that the thread that waits the longest gets the lock every time.

You can specify to initialize with a collection in which elements are added to the queue during the constructor.

More specific source code I do not analyze, because it is the use of Condition in AbstractQueuedSynchronizer, interested readers please see my "one line one line source code analysis clear AbstractQueuedSynchronizer (II)", because as long as you understand that article, ArrayBlockingQueue code analysis is not necessary, of course, if you do not understand Condition, then basically you can not understand the source code of ArrayBlockingQueue.

LinkedBlockingQueue implemented by BlockingQueue

The underlying blocking queue based on an one-way linked list can be used as an unbounded queue or as a bounded queue. Look at the construction method:

/ / legendary unbounded queue public LinkedBlockingQueue () {this (Integer.MAX_VALUE);} / / legendary bounded queue public LinkedBlockingQueue (int capacity) {if (capacity spinForTimeoutThreshold) LockSupport.parkNanos (this, nanos);}}

The ingenuity of Doug Lea is that it puts all the code together to make the code very concise, and of course, it also increases our reading burden, so when we look at the code, we still have to think carefully about all possible situations.

Next, let's talk about the difference between the fair model and the unfair model.

I believe you already have the concept of fair mode workflow in your mind, so I will simply talk about the TransferStack algorithm and will not analyze the source code.

When this method is called, if the queue is empty, or the nodes in the queue are the same as the current thread operation type (for example, the current operation is a put operation, and the elements in the stack are all write threads). In this case, the current thread is added to the waiting stack, waiting for pairing. Then return the corresponding element, or if cancelled, return null.

If there is a waiting node in the stack and can match the current operation (for example, the stack is full of read threads, the current thread is a write thread, and vice versa). Press the current node into the top of the stack, match the nodes in the stack, and then unstack the two nodes. Pairing and out-of-stack actions are not really necessary, because the following one will do the same thing.

If the top of the stack is the node that enters the stack for matching, help it match and exit the stack, and then continue.

It should be said that the source code of TransferStack is a little more complex than that of TransferQueue. If readers are interested, please read the source code yourself.

PriorityBlockingQueue implemented by BlockingQueue

For the BlockingQueue implementation with sorting, the concurrency control uses ReentrantLock, and the queue is unbounded. (ArrayBlockingQueue is a bounded queue. LinkedBlockingQueue can also specify the maximum capacity of the queue by passing capacity in the constructor, but PriorityBlockingQueue can only specify the initial queue size. When elements are inserted later, the capacity will be automatically expanded if there is not enough space).

Simply put, it is the thread-safe version of PriorityQueue. The null value cannot be inserted, and the object inserted into the queue must be of comparable size (comparable), otherwise a ClassCastException exception is reported. Its insert operation put method does not block because it is an unbounded queue (the take method blocks when the queue is empty).

Its source code is relatively simple, and this section will introduce its core source code.

Let's take a look at what it has:

/ / in the construction method, if the size is not specified, the default size is 11private static final int DEFAULT_INITIAL_CAPACITY = 11 queue;// / the maximum capacity of the array private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE-8 queue;// / this is the array that stores the data, private transient int size;// [] the current size of the queue;// queue size comparator, if sorted in natural order, then this property can be set to nullprivate transient Comparator

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report