In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the relevant knowledge of "how to understand the LinkedBlockingQueue source code of blocking queue". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
LinkedBlockingQueue, an optional bounded queue supported by linked nodes, is an unbounded queue based on a linked list (theoretically bounded), sorted in first-in-first-out order. Unlike ArrayBlockingQueue, LinkedBlockingQueue defaults to Integer.MAX_VALUE, or unbounded queue, if it does not specify a capacity. Therefore, in order to avoid excessive queue load or full memory, we recommend manually passing the size of a queue when using it.
Queue creation
BlockingQueue blockingQueue = new LinkedBlockingQueue ()
In the above code, the capacity of blockingQueue will be set to Integer.MAX_VALUE.
Application scenario
Mostly used in the task queue, single-threaded release task, stop waiting for blocking when the task is full, and start to be responsible for publishing the task when the task is completed and consumes less.
Let's look at an example:
Package com.niuh.queue.linked; import org.apache.commons.lang.RandomStringUtils; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class TestLinkedBlockingQueue {private static LinkedBlockingQueue queue = new LinkedBlockingQueue (); / / Thread control switch private final CountDownLatch latch = new CountDownLatch (1) / / Thread Pool private final ExecutorService pool; / / AtomicLong count production quantity private final AtomicLong output = new AtomicLong (0); / / AtomicLong count sales quantity private final AtomicLong sales = new AtomicLong (0); / / whether to stop thread private final boolean clear; public TestLinkedBlockingQueue (boolean clear) {this.pool = Executors.newCachedThreadPool (); this.clear = clear } public void service () throws InterruptedException {Consumer a = new Consumer (queue, sales, latch, clear); pool.submit (a); Producer w = new Producer (queue, output, latch); pool.submit (w); latch.countDown ();} public static void main (String [] args) {TestLinkedBlockingQueue t = new TestLinkedBlockingQueue (false) Try {t.service ();} catch (InterruptedException e) {e.printStackTrace ();} / * consumers (selling products) * / class Consumer implements Runnable {private final LinkedBlockingQueue queue; private final AtomicLong sales; private final CountDownLatch latch; private final boolean clear Public Consumer (LinkedBlockingQueue queue, AtomicLong sales, CountDownLatch latch, boolean clear) {this.queue = queue; this.sales = sales; this.latch = latch; this.clear = clear;} public void run () {try {latch.await (); / / wait for for (; ) {sale (); Thread.sleep (500);}} catch (InterruptedException e) {if (clear) {/ / in response to the interrupt request, if required, terminate the thread cleanWarehouse () after selling the queued product () } else {System.out.println ("Seller Thread will be interrupted...");} public void sale () {System.out.println ("= = take take="); try {String item = queue.poll (50, TimeUnit.MILLISECONDS); System.out.println (item) If (item! = null) {sales.incrementAndGet (); / / you can declare the long parameter to get the return value as the log parameter}} catch (InterruptedException e) {e.printStackTrace () }} / * sold the remaining products in the queue * / private void cleanWarehouse () {try {while (queue.size () > 0) {sale ();}} catch (Exception ex) {System.out.println ("Seller Thread will be interrupted...") } / * producer (production) * * / class Producer implements Runnable {private LinkedBlockingQueue queue; private CountDownLatch latch; private AtomicLong output; public Producer () {} public Producer (LinkedBlockingQueue queue, AtomicLong output, CountDownLatch latch) {this.queue = queue; this.latch = latch; this.output = output } public void run () {try {latch.await (); / / Thread waits for for (;;) {work (); Thread.sleep (100);}} catch (InterruptedException e) {System.out.println ("Producer thread will be interrupted...") }} / * work * / public void work () {try {String product = RandomStringUtils.randomAscii (3); boolean success = queue.offer (product, 100, TimeUnit.MILLISECONDS); if (success) {output.incrementAndGet () / / you can declare the long parameter to get the return value as the log parameter}} catch (InterruptedException e) {e.printStackTrace ();}
working principle
LinkedBlockingQueue is internally implemented by a single linked list, and you can only take elements from head and add elements from tail. Both adding and fetching elements have independent locks, that is, LinkedBlockingQueue is read-write separate, and read-write operations can be performed in parallel. LinkedBlockingQueue uses reentrant locks (ReentrantLock) to ensure thread safety in concurrency situations.
All operations that add elements to an infinite queue will never block, [note that there is no locking to ensure thread safety], so it can grow to a very large capacity.
The most important thing when designing a producer-consumer model using infinite BlockingQueue is that consumers should be able to consume messages as quickly as producers add messages to queues. Otherwise, the memory may fill up and you will get an OutOfMemory exception.
Source code analysis
Define
The class inheritance relationship of LinkedBlockingQueue is as follows:
The methods it contains are defined as follows:
Member attribute
/ * Node class, used to store data * / static class Node {E item; Node next; Node (E x) {item = x;} / * * the size of the blocking queue. Default is Integer.MAX_VALUE * / private final int capacity; / * * the number of elements in the current blocking queue * / private final AtomicInteger count = new AtomicInteger () / * * the head node of the blocking queue * / transient Node head; / * blocks the tail node of the queue * / private transient Node last; / * * the lock used when the element is acquired and removed, such as take,poll,etc * / private final ReentrantLock takeLock = new ReentrantLock (); / * notEmpty conditional object, used to suspend the thread performing the deletion when the queue has no data * / private final Condition notEmpty = takeLock.newCondition () / * * the lock used when adding an element, such as put,offer,etc * / private final ReentrantLock putLock = new ReentrantLock (); / * * notFull condition object, used to suspend the thread performing the addition whenever the queue data is full * / private final Condition notFull = putLock.newCondition ()
We know from the above attributes that every data added to the LinkedBlockingQueue queue will be encapsulated as a Node node in the added linked list queue, where head and last point to the head and tail nodes of the queue, respectively. Unlike ArrayBlockingQueue, LinkedBlockingQueue uses takeLock and putLock to control concurrency respectively, that is, adding and deleting operations are not mutually exclusive operations, but can be carried out at the same time, which can greatly improve throughput.
If the capacity of the queue is not specified here, that is, the default Integer.MAX_VALUE is used, if the add speed is greater than the delete speed, memory overflow may occur. This point should be carefully considered before use.
In addition, LinkedBlockingQueue provides a Condition for each lock lock to suspend and wake up other threads.
Constructor function
Both the default constructor and the last constructor create a queue size of Integer.MAX_VALUE, and only the user of the second constructor can specify the queue size. The second constructor finally initializes the last and head nodes so that they both point to a node whose element is null.
The last constructor uses putLock for locking, but it's not locking for multithreaded competition, just so that the placed elements are immediately visible to other threads.
Public LinkedBlockingQueue () {/ / default size is Integer.MAX_VALUE this (Integer.MAX_VALUE);} public LinkedBlockingQueue (int capacity) {if (capacity)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.