Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the blocking queue of Java multithreading

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to realize the blocking queue of Java multithread". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to realize the blocking queue of Java multithread.

What is the blocking queue?

First of all, understand the queue, which is a data structure of data first-in, first-out. Blocking queue, the keyword is blocking, first understand the meaning of blocking, in the blocking queue, thread blocking has two situations:

1. When the blocking queue is empty, the thread that gets the queue element will wait until the queue is not empty; 2. When the blocking queue becomes full, the thread using the blocking queue waits until the blocking queue becomes non-full.

Why use blocking queues?

In common cases, the producer-consumer model needs to use the queue, the producer thread produces the data, puts it into the queue, and then consumes to get the data from the queue, which is fine in the case of a single thread. But in the case of multithreading, at a certain time (peak high concurrency), the consumer speed is much faster than the producer speed, and the consumer must block and wait for the producer to ensure that the producer can produce new data; the same is true when the producer speed is much faster than the consumer speed. These situations require programmers to control blocking while at the same time requiring thread safety and running efficiency.

The emergence of blocking queues eliminates the need for programmers to pay attention to these details, such as when to block threads and when to wake up threads, which are done by blocking queues.

The main methods of blocking queues

The methods of blocking queues can be divided into three categories according to the way they are handled when they cannot be satisfied immediately but may be satisfied at some point in the future:

Throw an exception: throw an exception

Special value: returns a special value (null or false, as appropriate)

Plug: blocks the thread until the operation is successful

Timeout: block only for the maximum time before giving up

If you want to do a good job, you must first sharpen its tools, learn to use the blocking queue, and you must know what methods it has, how to use it, and what matters needing attention, so that when you really use it, you can step on less mines.

Let's start by introducing the insert operation:

1.public abstract boolean add (E paramE)

Inserts the specified element into this queue (if it is immediately feasible and does not violate the capacity limit), returns true on success, and throws an IllegalStateException if no space is currently available.

If the element is NULL, a NullPointerException exception is thrown.

2.public abstract boolean offer (E paramE)

Inserts the specified element into this queue (if it is immediately feasible and does not violate the capacity limit), returns true on success, and returns false if no space is currently available.

3.public abstract void put (E paramE) throws InterruptedException

Inserts the specified element into this queue and waits for available space (if necessary)

4.offer (E o, long timeout, TimeUnit unit)

You can set the wait time, and if the BlockingQueue cannot be added to the queue within the specified time, a failure is returned.

Get data operation:

1.poll (time): take the object that ranks first in the BlockingQueue. If it cannot be removed immediately, you can wait for the time specified in the time parameter, and return null if it is not available.

2.poll (long timeout, TimeUnit unit): the object that fetches a queue leader from BlockingQueue, and returns the data in the queue as soon as data is available in the queue within a specified period of time. Otherwise, you know the time.

Timeout does not have data desirable, the return failed.

3.take (): take the first object in the BlockingQueue. If BlockingQueue is empty, block entering the waiting state until new data is added to the BlockingQueue.

4.drainTo (): get all available data objects from BlockingQueue at once (you can also specify the number of data to be obtained), which can improve the efficiency of obtaining data; there is no need to lock or release locks in batches multiple times.

Key methods focus on introduction

First, let's take a look at the put method.

Public void put (E paramE) throws InterruptedException {checkNotNull (paramE); ReentrantLock localReentrantLock = this.lock; localReentrantLock.lockInterruptibly (); try {while (this.count = = this.items.length) this.notFull.await (); enqueue (paramE); localReentrantLock.unlock ();} finally {localReentrantLock.unlock ();}}

If you look at the code one by one, do a null check first. CheckNotNull (paramE)

Private static void checkNotNull (Object paramObject) {if (paramObject! = null) return; throw new NullPointerException ();}

This is a private method, and it is important to note that if the argument to put is null, a null pointer exception is thrown. (this is worth learning, doing null check first, it's easy to locate errors during maintenance.) then ReentrantLock localReentrantLock = this.lock; instantiates locks, which I introduced in my previous blog, which can be discussed together.

The next line localReentrantLock.lockInterruptibly (); here is a special emphasis:

LockInterruptibly () allows the waiting thread to be interrupted and returned directly by the Thread.interrupt () method of another thread while waiting. Instead of acquiring a lock, an InterruptException is thrown. The ReentrantLock.lock () method does not allow Thread.interrupt () to interrupt and continues to attempt to acquire the lock even if Thread.interruptted is detected, and continues to hibernate if it fails. Only after the lock is finally acquired successfully, the current thread is set to interrupted state.

Notice that this is locked, and there is only one thread each time you do this, go back to the code and proceed

While (this.count = = this.items.length)

This.notFull.await ()

Here is a message that when the queue is full, it will wait. Private final Condition notFull; is used here as an instantiated Condition, which is used to control the wait when the queue is full.

Then execute the enqueue (paramE) method, and enter this method to continue to see

Private void enqueue (E paramE) {Object [] arrayOfObject = this.items; arrayOfObject [this.putIndex] = paramE; if (+ + this.putIndex = = arrayOfObject.length) this.putIndex = 0; this.count + = 1; this.notEmpty.signal ();}

Looking at the first line, Object [] arrayOfObject = this.items; the items is instantiated at the constructor, final Object [] items = new Object [paramInt]; assign the item to the arrayObject

Continue arrayOfObject [this.putIndex] = paramE; to assign the parameters passed in by the put method to arrayOfObject, where items has actually changed, because java is a value reference.

If (+ + this.putIndex = = arrayOfObject.length)

This.putIndex = 0

If the offset value + 1 is equal to the length of the array, the offset value becomes 0. This.count + = 1; count value plus 1; this count represents the total number of arrays. This.notEmpty.signal (); wake up the method blocked by Condition notEmpty, and finally localReentrantLock.unlock (); unlock (this operation can not be forgotten)

Here, I can't help but ask, what method is blocked? Take a look at the take method with this question.

Public E take () throws InterruptedException {ReentrantLock localReentrantLock = this.lock; localReentrantLock.lockInterruptibly (); try {while (this.count = = 0) this.notEmpty.await (); Object localObject1 = dequeue (); return localObject1;} finally {localReentrantLock.unlock ();}}

First, take a look at the first two lines and lock it first, like the put method, so that there is only one thread at a time when you hold this piece of code.

While (this.count = = 0)

This.notEmpty.await ()

When the number of arrays is empty, that is, when no data supply area comes out, the Condition of notEmpty will block until it is woken up by notEmpty, remember the above mentioned. It is awakened in the put method, and it can be found here that as long as a successful put operation is performed, it will be awakened once.

Continue to look at the code, then execute Object localObject1 = dequeue (); get the element, and follow up on the dequeue () method to continue:

Private E dequeue () {Object [] arrayOfObject = this.items; Object localObject = arrayOfObject [this.takeIndex]; arrayOfObject [this.takeIndex] = null; if (+ + this.takeIndex = = arrayOfObject.length) this.takeIndex = 0; this.count-= 1; if (this.itrs! = null) this.itrs.elementDequeued (); this.notFull.signal (); return localObject }

Object [] arrayOfObject = this.items; carries out the value passing operation, and takeIndex is the offset value when the element is fetched, thus it can be seen that the offset of put and take operations is controlled by putIndex and takeIndex, respectively.

Object localObject = arrayOfObject [this.takeIndex]; take out the data in the array, and then arrayOfObject [this.takeIndex] = null; changes the data b of the original location into null.

If (+ + this.takeIndex = = arrayOfObject.length)

This.takeIndex = 0

If the current + + takeIndex is equal to the length of the array, the takeIndex assignment is 0, and the two operations, in combination with the put method, form a queue operation with an array. Then wake up the thread that holds the Condition of notFull.

This is the end of the method. In fact, there are many similarities between put and take. Let's move on to the next chapter.

Common blocking queues

First of all, let's take a look at this picture, which is the inheritance diagram of the blocking queue (double-ended queue, not listed, not much difference)

There are mainly five implementation classes, ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,DelayQueue.

Of the five blocking queues, ArrayBlockingQueue,LinkedBlockingQueue is more commonly used, and this article will also focus on these two classes.

ArrayBlockingQueue

In the above source code analysis is the analysis of the ArrayBlockingQueue source code. The parameter that the array blocking queue must pass in is the array size, and you can also specify whether it is fair or not. Fairness means that when the queue is available, the thread accesses the queue in the order in which it queued, while the unfair lock does not, but the unfair queue executes faster than the fair queue.

Continue to see that ArrayBlockingQueue is actually an array bounded queue, which maintains the order of elements in the array on a first-in-first-out basis. According to the source code, two integer variables (putIndex and takeIndex mentioned above) point to the position of the head and tail, respectively.

LinkedBlockingQueue

LinkedBlockingQueue is a blocking queue based on linked lists, and the internally maintained data buffer queues are made up of linked lists and are also based on the first-in-first-out principle.

If you construct a LinkedBlockingQueue object without specifying its capacity size, LinkedBlockingQueue defaults to a capacity similar to Integer.Max_VALUE, so that once the speed of the producer is faster than that of the consumer, the system memory may have been exhausted before the queue full blocking occurs.

The reason why LinkedBlockingQueue can deal with concurrent data efficiently is that take () method and put (E param) method use different reentrant locks, private final ReentrantLock putLock and private final ReentrantLock takeLock, respectively, which also means that producers and consumers can manipulate the data in the queue in parallel in high concurrency cases, so as to improve the concurrency performance of the whole queue.

Comparison between the two

1.ArrayBlockingQueue uses the same lock for put,take operations, and the two operations cannot be performed at the same time, while LinkedBlockingQueue uses different locks, and put operations and take operations can be performed at the same time.

Another obvious difference between 2.ArrayBlockingQueue and LinkedBlockingQueue is that the former does not generate or destroy any additional object instances when inserting or deleting elements, while the latter generates an additional Node object, which still has a different impact on GC in systems that need to process large amounts of data efficiently and concurrently for a long time.

There are other priority blocking queues: PriorityBlockingQueue, delay queues: DelayQueue,SynchronousQueue, etc. Because of the low frequency of use, we will not focus on it here, and interested readers can study it in depth.

Implementing producers and consumers with blocking queues

To simulate the experience of washing dishes, the dishwasher washed a plate and put it on the workbench, and then the cook saw that there were free dishes on the workbench and used them. Written into the code, the dishwasher is a producer thread, the cook is the consumer thread, and the workbench is the blocking queue.

Public class TestBlockingQueue {/ * production and consumption business operations * * @ author tang * * / protected class WorkDesk {BlockingQueue desk = new LinkedBlockingQueue (10); public void washDish () throws InterruptedException {desk.put ("wash a dish");} public String useDish () throws InterruptedException {return desk.take () }} / * * producer class * * @ author tang * * / class Producer implements Runnable {private String producerName; private WorkDesk workDesk; public Producer (String producerName, WorkDesk workDesk) {this.producerName = producerName; this.workDesk = workDesk } @ Override public void run () {try {for (;;) {System.out.println (producerName + "wash a dish"); workDesk.washDish (); Thread.sleep (1000) }} catch (Exception e) {e.printStackTrace ();} / * * Consumer class * * @ author tang * * / class Consumer implements Runnable {private String consumerName; private WorkDesk workDesk Public Consumer (String consumerName, WorkDesk workDesk) {this.consumerName = consumerName; this.workDesk = workDesk;} @ Override public void run () {try {for (;;) {System.out.println (consumerName + "use a plate"); workDesk.useDish () Thread.sleep (1000);}} catch (Exception e) {e.printStackTrace ();} public static void main (String args []) throws InterruptedException {TestBlockingQueue testQueue = new TestBlockingQueue (); WorkDesk workDesk = testQueue.new WorkDesk (); ExecutorService service = Executors.newCachedThreadPool () / four producer threads Producer producer1 = testQueue.new Producer ("producer-1 -", workDesk); Producer producer2 = testQueue.new Producer ("producer-2 -", workDesk); Producer producer3 = testQueue.new Producer ("producer-3 -", workDesk); Producer producer4 = testQueue.new Producer ("producer-4 -", workDesk) / two consumer threads Consumer consumer1 = testQueue.new Consumer ("consumer-1 -", workDesk); Consumer consumer2 = testQueue.new Consumer ("consumer-2 -", workDesk); service.submit (producer1); service.submit (producer2); service.submit (producer3); service.submit (producer4); service.submit (consumer1); service.submit (consumer2) }}

View the print results:

Thank you for your reading, the above is the content of "how to realize the blocking queue of Java multithreading". After the study of this article, I believe you have a deeper understanding of how to realize the blocking queue of Java multithreading, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report