In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article is about how to implement blocking queue in Java. The editor thinks it is very practical, so I hope you can get something after reading this article. Let's take a look at it with the editor.
1. What is a blocking queue?
The blocking queue (BlockingQueue) is a queue that supports two additional operations. The two additional operations are: when the queue is empty, the thread that gets the element waits for the queue to become non-empty. When the queue is full, the thread of the storage element waits for the queue to be available. Blocking queues are often used in the scenarios of producers and consumers, where producers are threads that add elements to the queue, and consumers are threads that take elements from the queue. A blocking queue is a container in which producers store elements, while consumers only take elements from the container.
Blocking queues provide four processing methods:
Method\ handling method throws an exception returns special value always blocks timeout exits insertion method add (e) offer (e) put (e) offer (eje timestamp unit) removal method remove () poll () take () poll (time,unit) check method element () peek () is not available
Throw an exception: when the blocking queue is full, inserting elements into the queue will throw an IllegalStateException ("Queue full") exception. When the queue column is empty, a NoSuchElementException exception is thrown when getting an element from the queue.
Returns a special value: the insert method returns whether it was successful, and if it succeeds, it returns true. Remove method, take an element out of the queue, and return null if not
Blocking: when the blocking queue is full, if the producer thread put the queue, the queue blocks the producer thread until it gets the data or exits in response to an interrupt. When the queue is empty, the consumer thread attempts to take elements from the queue, and the queue blocks the consumer thread until the queue is available.
Timeout exit: when the blocking queue is full, the queue will block the producer thread for a period of time, and if it exceeds a certain period of time, the producer thread will exit.
2. Blocking queue in Java
JDK7 provides seven blocking queues. Are
ArrayBlockingQueue: a bounded blocking queue consisting of array structures.
LinkedBlockingQueue: a bounded blocking queue consisting of linked list structures.
PriorityBlockingQueue: an unbounded blocking queue that supports prioritization.
DelayQueue: an unbounded blocking queue implemented using priority queues.
SynchronousQueue: a blocking queue that does not store elements.
LinkedTransferQueue: an unbounded blocking queue consisting of linked list structures.
LinkedBlockingDeque: a bidirectional blocking queue consisting of linked list structures.
ArrayBlockingQueue
ArrayBlockingQueue is a bounded blocking queue implemented in an array. This queue sorts elements on a first-in, first-out (FIFO) basis. By default, visitors are not guaranteed fair access to the queue. The so-called fair access queue refers to all blocked producer or consumer threads. When the queue is available, you can access the queue in the order of blocking, that is, the producer thread that blocks first can insert elements into the queue first, and the consumer thread that blocks first can get the elements from the queue first. Usually, the throughput will be reduced in order to ensure fairness. We can create a fair blocking queue using the following code:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue (1000 million true)
The fairness of visitors is achieved using reentrant locks, as follows:
Public ArrayBlockingQueue (int capacity, boolean fair) {if (capacity 0) return 1; else if (sequenceNumber
< x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } 如何实现Delayed接口 我们可以参考ScheduledThreadPoolExecutor里ScheduledFutureTask类。这个类实现了Delayed接口。首先:在对象创建的时候,使用time记录前对象什么时候可以使用,代码如下: ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement();} 然后使用getDelay可以查询当前元素还需要延时多久,代码如下: public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } 通过构造函数可以看出延迟时间参数ns的单位是纳秒,自己设计的时候最好使用纳秒,因为getDelay时可以指定任意单位,一旦以纳秒作为单位,而延时的时间又精确不到纳秒就麻烦了。使用时请注意当time小于当前时间时,getDelay会返回负数。 如何实现延时队列 延时队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。 long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay = 0, "invariant") ; if (v == 0) { // Do this the hard way by blocking ... int status = pthread_mutex_lock(_mutex); assert_status(status == 0, status, "mutex_lock"); guarantee (_nParked == 0, "invariant") ; ++ _nParked ; while (_Event < 0) { status = pthread_cond_wait(_cond, _mutex); // for some reason, under 2.7 lwp_cond_wait() may return ETIME ... // Treat this the same as if the wait was interrupted if (status == ETIME) { status = EINTR; } assert_status(status == 0 || status == EINTR, status, "cond_wait"); } -- _nParked ; // In theory we could move the ST of 0 into _Event past the unlock(), // but then we'd need a MEMBAR after the ST. _Event = 0 ; status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "mutex_unlock"); } guarantee (_Event >= 0, "invariant");}}
Pthread_cond_wait is a multi-threaded conditional variable function, and cond is an abbreviation for condition, which literally means that a thread is waiting for a condition to occur, which is a global variable. This method takes two parameters, a shared variable _ cond and a mutex _ mutex. The unpark method is implemented using pthread_cond_signal under linux. Park is implemented using WaitForSingleObject under windows.
When the queue is full, the producer inserts an element into the blocking queue and the producer thread enters the WAITING (parking) state. We can see this using the producer thread blocked by jstack dump:
Main prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park (Native Method)-parking to wait for (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park (LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (AbstractQueuedSynchronizer.java:2043) at java.util.concurrent. ArrayBlockingQueue.put (ArrayBlockingQueue.java:324) at blockingqueue.ArrayBlockingQueueTest.main (ArrayBlockingQueueTest.java:11) above is how to implement blocking queues in Java The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.