In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the relevant knowledge of "BlockingQueue interface and the method of ArrayBlockingQueue implementation class". The editor shows you the operation process through the actual case. The operation method is simple, fast and practical. I hope this article "BlockingQueue interface and ArrayBlockingQueue implementation class method" can help you solve the problem.
Queue is a FIFO (first-in, first-out) data structure, and the BlockingQueue we will talk about in this article is also a kind of queue, and emphasizes the thread safety feature.
Full name of BlockingQueue: java.util.concurrent.BlockingQueue. It is a thread-safe queue interface, multiple threads can insert data from the queue in a concurrent way, and there will be no thread safety problems while taking out the data.
Examples of producers and consumers
BlockingQueue is typically used by consumer threads to store data in queues, and consumer threads extract data from queues, as follows
The producer thread keeps inserting data into the queue until the queue is full and the producer thread is blocked
The consumer thread keeps fetching data from the queue until the queue is empty and the consumer thread is blocked
(recommended tutorial: Java tutorial)
BlockingQueue method
BlockingQueue provides four different types of methods for inserting numbers, fetching data, and checking data, as follows
Operation failed, exception thrown
Regardless of success / failure, return to true/false immediately
If the queue is empty / full, block the current thread
If the queue is empty / full, block the current thread and have a timeout mechanism to insert add (o) offer (o) put (o) offer (o, timeout, timeunit) take out remove (o) poll () take () poll (timeout, timeunit) check element () peek ()
Concrete implementation class of BlockingQueue
BlockingQueue is just an interface, which is implemented by the following classes in actual development.
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
The use of ArrayBlockingQueue
Here is an example of the concrete implementation class ArrayBlockingQueue of the BlockingQueue interface. A multithreaded model of consumers and producers is implemented through ArrayBlockingQueue.
The core content is as follows:
Using ArrayBlockingQueue as a data container for producers and consumers
Start 3 threads, 2 producers, 1 consumer through ExecutorService
Specify total amount of data
Producer thread
ArrayBlockingQueueProducer
Import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;/** * the task of the producer thread to store the specified amount into the container * * / public class ArrayBlockingQueueProducer implements Runnable {private static final Logger logger = LoggerFactory.getLogger (ArrayBlockingQueueProducer.class); / / Container private ArrayBlockingQueue queue; / / produce the specified quantity private AtomicInteger numberOfElementsToProduce Public ArrayBlockingQueueProducer (ArrayBlockingQueue queue, AtomicInteger numberOfElementsToProduce) {this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce } @ Override public void run () {try {while (numberOfElementsToProduce.get () > 0) {try {/ / put the task String task = String.format ("task_%s", numberOfElementsToProduce.getAndUpdate (x-> x mae 1)) in the queue; queue.put (task) Logger.info ("thread {}, produce task {}", Thread.currentThread (). GetName (), task); / / task is 0, producer thread exits if (numberOfElementsToProduce.get () = = 0) {break }} catch (Exception e) {e.printStackTrace ();} catch (Exception e) {logger.error (this.getClass (). GetName (). Concat (". Has error "), e);} consumer thread
ArrayBlockingQueueConsumer
Import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;/** * Consumer thread consumes a specified amount of tasks from the container * * / public class ArrayBlockingQueueConsumer implements Runnable {private static final Logger logger = LoggerFactory.getLogger (ArrayBlockingQueueConsumer.class); private ArrayBlockingQueue queue; private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueConsumer (ArrayBlockingQueue queue, AtomicInteger numberOfElementsToProduce) {this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce } @ Override public void run () {try {while (! queue.isEmpty () | | numberOfElementsToProduce.get () > = 0) {/ / get the task from the queue and execute the task String task = queue.take (); logger.info ("thread {} consume task {}", Thread.currentThread (). GetName (), task) / / the data in the queue is empty and the consumer thread exits if (queue.isEmpty ()) {break;} catch (Exception e) {logger.error (this.getClass (). GetName (). Concat (". Has error "), e);}
Test TestBlockingQueue
Import com.ckjava.synchronizeds.appCache.WaitUtils;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicInteger;/** * 1. Using ArrayBlockingQueue as a data container for producers and consumers
* 2. Start 3 threads, 2 producers, 1 consumer through ExecutorService
* 3. Specify total data * / public class TestBlockingQueue {public static void main (String [] args) {ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue (10); / * BlockingQueue delayQueue = new DelayQueue (); BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue (10); BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue (10); BlockingQueue synchronousQueue = new SynchronousQueue (); * / ExecutorService executorService = Executors.newFixedThreadPool (3) / / produce up to 5 data AtomicInteger numberOfElementsToProduce = new AtomicInteger (5); / / 2 producer threads executorService.submit (new ArrayBlockingQueueProducer (arrayBlockingQueue, numberOfElementsToProduce)); executorService.submit (new ArrayBlockingQueueProducer (arrayBlockingQueue, numberOfElementsToProduce)); / / 1 consumer thread executorService.submit (new ArrayBlockingQueueConsumer (arrayBlockingQueue, numberOfElementsToProduce)); executorService.shutdown () WaitUtils.waitUntil (()-> executorService.isTerminated (), 1000L);}}
The output is as follows:
13 thread pool-1-thread-3 consume task task_513:54:17.884 54 pool-1-thread-3 17.884 [pool-1-thread-3] thread pool-1-thread-3 consume task task_513:54:17.884 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer-thread pool-1-thread-1, produce task task_513:54:17.884 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer-thread pool-1-thread-2 Produce task task_413:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer-thread pool-1-thread-3 consume task task_413:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer-thread pool-1-thread-2, produce task task_213:54:17.887 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer-thread pool-1-thread-1 Produce task task_313:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer-thread pool-1-thread-3 consume task task_313:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer-thread pool-1-thread-2 Produce task task_113:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer-thread pool-1-thread-3 consume task task_213:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer-thread pool-1-thread-3 consume task task_1 on "BlockingQueue interface and methods of ArrayBlockingQueue implementation classes" ends here. Thank you for your reading. If you want to know more about the industry, you can follow the industry information channel. The editor will update different knowledge points for you every day.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.