In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use JUC's SynchronousQueue". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn how to use JUC's SynchronousQueue.
SynchronousQueue is not really a queue, but it is more appropriate to understand it as a component for communication between threads. SynchronousQueue has no concept of capacity. After performing a queuing operation, a thread must wait for another thread to match and finish queuing before continuing to queue again, and vice versa. In addition, unlike the common understanding that the nodes in the queue only carry elements, the nodes in SynchronousQueue also need to attach corresponding operation threads, which are waiting to be matched (fulfill) on the corresponding nodes.
SynchronousQueue is implemented from the BlockingQueue interface, and the underlying LockSupport tool class implements thread blocking and wake-up operations, and relies on CAS to ensure thread safety. When constructing a SynchronousQueue object, it is allowed to specify whether fair mode is enabled by parameters. SynchronousQueue implements unfair thread communication based on Dual Stack data structure and fair thread communication based on Dual Queue data structure. Because the fair mode of SynchronousQueue reduces the conflicts between threads, it has higher performance in the scenario of frequent competition, while the unfair mode can better maintain thread locality (thread locality) and reduce the overhead of thread context switching.
SynchronousQueue example
In this section, we demonstrate the basic use of SynchronousQueue with a "producer-consumer" example, in which we set up one producer and two consumers to demonstrate the fairness characteristics of SynchronousQueue. The example implementation is as follows (exception handling is omitted):
Private static BlockingQueue queue = new SynchronousQueue (true); private static class Producer implements Runnable {@ Override public void run () {int count = 0; while (count)
< 10) { int val = count++; System.out.println("Producer produce: " + val); queue.put(val); TimeUnit.SECONDS.sleep(1); } }}private static class Consumer implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { System.out.println("Consumer " + Thread.currentThread().getName() + " consume: " + queue.take()); } }}public static void main(String[] args) { Thread producer = new Thread(new Producer()); Thread consumer1 = new Thread(new Consumer()); Thread consumer2 = new Thread(new Consumer()); consumer1.setName("A"); consumer2.setName("B"); producer.start(); consumer1.start(); consumer2.start();} 运行输出如下: Producer produce: 0Consumer A consume: 0Producer produce: 1Consumer A consume: 1Producer produce: 2Consumer B consume: 2Producer produce: 3Consumer A consume: 3Producer produce: 4Consumer B consume: 4Producer produce: 5Consumer A consume: 5Producer produce: 6Consumer B consume: 6Producer produce: 7Consumer A consume: 7Producer produce: 8Consumer B consume: 8Producer produce: 9Consumer A consume: 9 可以看到,当生产者往 SynchronousQueue 中插入一个元素之后,生产者线程会等待消费者完成消费,而消费者线程在完成消费之后会等待生产者生产。SynchronousQueue 的公平性特性尽可能保证了消费者 A 和 B 能够交替执行消费操作。 在上述示例中,如果我们将 Producer 入队列的方法由 put 改为 offer,那么在 Consumer 入队列成功之前,Producer 始终不能入队列成功,这对于一般的队列而言显得有些奇怪。实际上,这里说的不能成功入队列不够准确,要知道 offer 是一类带有超时机制的方法,也就是说当 Producer 在将某个元素执行入队列之后,它希望有一个 Consumer 能够在自己期望的时间内与该元素进行匹配,否则就只能返回 false,从表象上来看就是没有入队列成功。实际应用中我们需要考虑此类表象是否符合自己的业务场景,如果不满足则可以考虑使用 put 方法执行入队列操作。 核心方法实现 SynchronousQueue 实现自 BlockingQueue 接口,但并未对接口中声明的方法全部支持,例如 SynchronousQueue 的 SynchronousQueue#peek 方法就始终返回 null,在使用时推荐先阅读 API 文档,避免影响程序的正确性。本文主要分析 SynchronousQueue 的实现机制,所以下面重点来看一下 SynchronousQueue 已实现的出队列和入队列操作。 前面我们提及到 SynchronousQueue 内部基于 Dual Stack 和 Dual Queue 数据结构实现,在 SynchronousQueue 中定义了一个 Transferer 抽象类,该类抽象了 Dual Stack 和 Dual Queue 数据结构的实现,定义如下: abstract static class Transferer { abstract E transfer(E e, boolean timed, long nanos);} SynchronousQueue 的出队列和入队列操作均委托给 Transferer#transfer 方法执行(如下),该方法接收 3 个参数,其中参数 e 表示待添加到队列中的元素值,对于出队列操作来说,e 始终等于 null;参数 timed 用于设置当前操作是否具备超时策略,如果是则需要使用参数 nanos 参数指定超时时间。 SynchronousQueue#put(E e) ->Transferer.transfer (e, false, 0)
SynchronousQueue#offer (E)-> transferer.transfer (e, true, 0)
SynchronousQueue#offer (E, long, TimeUnit)-> transferer.transfer (e, true, unit.toNanos (timeout))
SynchronousQueue#take-> transferer.transfer (null, false, 0)
SynchronousQueue#poll ()-> transferer.transfer (null, true, 0)
SynchronousQueue#poll (long, TimeUnit)-> transferer.transfer (null, true, unit.toNanos (timeout))
For Dual Stack and Dual Queue data structures, SynchronousQueue defines TransferStack and TransferQueue implementation classes, respectively. The following sections analyze the implementation mechanisms of these two classes.
Before we begin, let's explain what the word match means in SynchronousQueue, and the concept of match will be mentioned many times in the following chapters. We have generally learned that SynchronousQueue internally implements the interaction between threads based on stack or queue. Take "producer-consumer" as an example, if you are using a stack structure (as is the case with queues), when the producer inserts an element into the SynchronousQueue, the producer thread will not return immediately after the insertion is successful, but will wait for the consumer to come to consume. When consumers execute consumption, they find that there are producers waiting on the stack, so they execute the consumption logic, also known as start matching (fulfill) process, to match the current consumers and producers out of the stack one after another.
Dual Stack
SynchronousQueue implements the TransferStack class for Dual Stack data structures. TransferStack inherits from the Transferer abstract class and defines nodes on the SNode class description stack. According to the operation mode of the node, TransferStack defines three constant fields of int type to describe them, as follows:
REQUEST: identifies the consumer node that does not match.
DATA: identifies the unmatched producer node.
FULFILLING: identifies that the node is performing a matching operation.
During the run time, the stack is either empty or stores one or more unmatched consumer nodes or producer nodes, and the corresponding consumer or producer threads are attached to the specific nodes and wait. It is impossible to co-exist both unmatched consumer nodes and unmatched producer nodes on a stack, that is, the running mode (that is, SNode#mode field values) of all nodes on the same time stack should be the same, except that the top node of the stack may attach FULFILLING state because the matching process is in progress.
The fields of the SNode class are defined as follows:
Static final class SNode {/ * * successor pointer * / volatile SNode next; / / next node in stack / * * records the matching node. If the current node is canceled, it points to its own * / volatile SNode match; / / the node matched to this / * * thread object waiting on the current node * / volatile Thread waiter / / to control park/unpark / * * node element value, or null * / Object item; / / data if it is a consumer node Or null for REQUESTs / * Node Operation Mode: *-0: represents consumer node *-1: represents producer node *-(2 | 0) or (2 | 1): indicates that the node is being or has been matched * / int mode; / /. Implementation of ellipsis method}
The meaning of each field, such as code comments, we will analyze the method defined in SNode when we analyze the TransferStack#transfer method implementation below, and further introduce the meaning of each field in combination with the specific scenario.
When we introduced the Transferer abstract class earlier, we knew that the abstract class declared only one method, the Transferer#transfer method, which is also the core implementation of the entire SynchronousQueue. Before we begin to analyze the effect of TransferStack on the implementation of this method, let's take a look at the running process of TransferStack as a whole.
Taking "producer-consumer" as an example, assuming that three producers execute inserting elements into SynchronousQueue in turn, and the order of execution is 1-> 2-> 3, the stack structure obtained after entering the stack is as follows:
3-> 2-> 1-> null ↓ head
The three producer threads after entering the stack will wait on the corresponding node of the stack. If a consumer performs the queue operation, the consumer will match the producer on the head node. After the match, the stack structure is as follows:
2-> 1-> null ↓ head
At this point, the remaining producer threads will continue to wait, during which new consumers can be allowed to dequeue or new producers can be allowed to enter the queue.
After having a general sense of the above process, which is the core execution logic of the TransferStack#transfer method, let's take an in-depth look at the specific implementation of the TransferStack#transfer method. In fact, at the beginning of the TransferStack#transfer method, the author has given a visual summary of the running process of the whole method, and the excerpt is as follows:
If apparently empty or already containing nodes of same mode, try to push node on stack and wait for a match, returning it, or null if cancelled.
If apparently containing node of complementary mode, try to push a fulfilling node on to stack, match with corresponding waiting node, pop both from stack, and return matched item. The matching or unlinking might not actually be necessary because of other threads performing action 3:
If top of stack already holds another fulfilling node, help it out by doing its match and/or pop operations, and then continue. The code for helping is essentially the same as for fulfilling, except that it doesn't return the item.
The method TransferStack#transfer is implemented as follows:
E transfer (E e, boolean timed, long nanos) {SNode s = null; / / constructed/reused as needed / / mode of operation. If it is null, it means that it is currently an out-queue operation, otherwise it is an out-queue operation int mode = (e = = null)? REQUEST: DATA; for (;;) {SNode h = head; / / 1. If the stack is empty or contains nodes of the same pattern, the node will be stacked to wait for matching if (h = = null | | h.mode = = mode) {/ / empty or same-mode / / if timeout is set and if (timed & & nanos spinForTimeoutThreshold) {LockSupport.parkNanos (this, nanos) {LockSupport.parkNanos (this, nanos)}
The above method first calculates the remaining expiration time and the number of spins based on whether the timeout is set, and then executes:
Determine whether the waiting period is interrupted. If so, cancel the current node and point the match pointer of the node to yourself.
Determine whether the match pointer of the node points to null. If it is not null, it means that the current node has been successfully matched or canceled (in this case, the match pointer points to the node itself), and returns the node pointed to by the match pointer.
Otherwise, the node is not matched or canceled. If the timeout is set and has expired, the current node is canceled and returned in the next cycle.
Try to spin several times before entering the blockage
If the match is not completed after several spins, the wait is blocked, whether to wait indefinitely or not is determined according to whether the timeout is set, and whether there is a binding thread on the current node before waiting. If it is not bound, bind the current thread to the node.
As you can see from the above implementation, the waiting thread does not block immediately, but attempts to spin several times first, mainly considering the frequent interaction between producers and consumers. In such scenarios, consumers will come out of the queue immediately after the producer performs the queuing operation, and the producer thread does not need to be blocked, but only needs to spin several times to be matched successfully, thus avoiding the performance overhead caused by thread blocking and waking up. If the producer and consumer do not interact frequently, because the number of spins is not high, it will not cause too much CPU overhead and can almost be ignored.
If the node is cancelled during the waiting period, the above method will point the match pointer of the node to itself. The subsequent process will identify the cancelled node based on this feature, and call the TransferStack#clean method to perform the cleanup work. This method is implemented as follows:
Void clean (SNode s) {s.item = null; / / forget item s.waiter = null; / / forget thread / / find the nearest subsequent valid (not cancelled) node of s as the sentinel node SNode past = s.next; if (past! = null & & past.isCancelled ()) {past = past.next Traversal from scratch, pointing the head pointer to the first valid (uncanceled) node SNode p; while ((p = head)! = null & & p! = past & & p.isCancelled ()) {this.casHead (p, p.next) } / / traverse from the currently valid header node until the sentry node is encountered, and the invalid node while (p! = null & & p! = past) {SNode n = p.next.if (n! = null & & n.isCancelled ()) {p.casNext (n, n.next);} else {p = n) encountered during removal }}}
The clean-up process first establishes a sentinel node, which is the nearest valid (uncancelled) node after node s, and then traverses and clears those nodes that have been cancelled from the top of the stack. As for why it is necessary to set a Sentinel node, considering that node s may have been removed by other threads in concurrent scenarios, setting Sentinel node can avoid traversing the entire stack.
Let's take a look at scenario 2, where the running mode of the waiting thread in the stack is complementary to the current thread (it can be simply understood that the waiting thread in the stack is the producer, while the current thread is the consumer). And no thread is performing a matching operation at this time, so enter the matching process. This time, the thread on the head node matches the current thread, so you need to find the first valid (uncanceled) head node on the stack from top to bottom, and then execute:
Create a node with the element e, append the FULFILLING flag to s, and put the node on the stack
Get the node m to be matched with s this time. If m is null, there is no waiting node on the stack. You need to exit the matching process and continue to determine which scenario to enter next.
Otherwise, call the SNode#tryMatch method to perform the matching operation
Move the head pointer back if the match is successful and return (the element value of the matching node is returned if the current thread is a consumer thread, or the element value just added if the current thread is a producer thread)
If the match fails, it means that the node m has been cancelled and try to continue to match the subsequent node of m.
The following illustration is used to demonstrate the above execution process. As shown in figure 1 below, assume that the current operation thread is a consumer (yellow node in the figure), expects to perform a queue operation on the SynchronousQueue, and that the current stack already contains two producers in the waiting state (cyan node in the figure). Because the current thread complements the thread pattern waiting in the stack, create a new node with an element value of null (shown in figure 2 below) and append the FULFILLING flag (red node in the figure).
Then start the matching process, setting the m and mn pointers, as shown in figure 3 above. After the successful execution of the SNode#tryMatch method, the match pointer of node m is directed to node s, indicating that the node m and s match successfully, as shown in figure 4 above.
Continue to analyze the SNode#tryMatch method that executes the matching process, and the implementation is as follows:
Boolean tryMatch (SNode s) {/ / based on CAS, set the match field of the current node to the s node if (match = null & & UNSAFE.compareAndSwapObject (this, matchOffset, null, s)) {Thread w = waiter; if (w! = null) {/ / waiters need at most one unpark waiter = null; / / Wake up the thread LockSupport.unpark (w) blocked on the current node } return true;} return match = = s;}
The core of the matching process is to point the match pointer of the node to be matched to the node corresponding to the current operation thread.
There is so much about the running mechanism of Dual Stack. Constrained by the stack FILO feature, Dual Stack-based SynchronousQueue always performs in-queue and out-queue operations at the top of the stack, and the threads that enter the queue will be matched first, which explains why Dual Stack-based SynchronousQueue is unfair. One of the potential problems of Dual Stack-based SynchronousQueue is that it may lead to long-term unmatched and hungry threads, and the advantage is that it can better maintain thread locality (thread locality) and reduce the overhead of thread context switching.
Dual Queue
For the Dual Queue data structure, SynchronousQueue implements the TransferQueue class, TransferQueue also inherits from the Transferer abstract class, and defines the QNode class to describe the nodes on the queue. TransferQueue defines the TransferQueue#head and TransferQueue#tail pointer fields, which point to the head and tail nodes of the queue, respectively.
The fields of the QNode class are defined as follows:
Static final class QNode {/ * follow-up pointer * / volatile QNode next; / / next node in queue / * * node element value, if equal to the node itself, then * / volatile Object item; / / CAS'ed to or from null / * * Thread object waiting on the current node * / volatile Thread waiter / / to control park/unpark / * * identifies whether it is a consumer node or a producer node * / final boolean isData; / /. Omit method definition}
The meaning of each field is code comments, where the QNode#isData field is used to identify whether the corresponding node is a producer node or a consumer node. Unlike TransferStack's SNode, you need to use the SNode#mode field to describe whether the node is an unmatched producer, an unmatched consumer, or is matching a medium state. Since TransferQueue is executed on head and tail nodes respectively, there is no need to define a special field to describe the node's operation mode. When we analyze the implementation of the TransferQueue#transfer method below, we will also analyze the methods defined in QNode, and further introduce the meaning of each field combined with the specific scenario.
Before we start to analyze the implementation of TransferQueue to the Transferer#transfer method, let's take a look at the whole process of running TransferQueue. Also taking "producer-consumer" as an example, assuming that three producers execute inserting elements into the SynchronousQueue in turn, and the order of execution is 1-> 2-> 3, the queue structure obtained after joining the queue is as follows:
1-> 2-> 3-> null ↓ ↓ head tail
After queuing, the three producer threads will wait at the corresponding node of the queue. If a consumer performs the queue operation, the consumer will match the producer on the head node. The queue structure obtained after the match is as follows:
2-> 3-> null ↓ ↓ head tail
At this point, the remaining producer threads will continue to wait, during which new consumers can be allowed to dequeue or new producers can be allowed to enter the queue.
After having a general sense of the above process, which is the core execution logic of the TransferQueue#transfer method, let's take an in-depth look at the specific implementation of the TransferQueue#transfer method. In fact, at the beginning of the TransferQueue#transfer method, the author has also given an intuitive summary of the running process of the whole method, with an excerpt as follows:
If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, wait to be fulfilled (or cancelled) and return matching item.
If queue apparently contains waiting items, and this call is of complementary mode, try to fulfill by CAS'ing item field of waiting node and dequeuing it, and then returning matching item.
The method TransferQueue#transfer is implemented as follows:
E transfer (E e, boolean timed, long nanos) {QNode s = null; / / constructed/reused as needed / / identifies whether the current production mode or consumption mode is boolean isData = (e! = null); for (;) {QNode t = tail; QNode h = head If (t = = null | | h = = null) {/ / saw uninitialized value continue; / / spin} / / 1. If the queue is empty or contains nodes of the same pattern, the node will be queued for matching if (h = t | | t.isData = = isData) {/ / empty or same-mode QNode tn = t.next. during / / other threads will enter the queue and enter the next cycle to re-obtain tail if (t! = tail) {/ / inconsistent read continue } / / t is not the end node of the queue, try to move the tail pointer if (tn! = null) {/ / lagging tail this.advanceTail (t, tn); if continue;} / / sets the timeout and has expired, return null if (timed & & nanos)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.