Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use LinkedTransferQueue of JUC

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to use JUC's LinkedTransferQueue". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to use JUC's LinkedTransferQueue.

LinkedTransferQueue, introduced in jdk 1.7, is an unbounded thread safety queue based on Dual Queue data structures. Its author Doug Lea describes that LinkedTransferQueue is functionally a superset of ConcurrentLinkedQueue, SynchronousQueue (Fair Mode), and LinkedBlockingQueue, and is more practical and efficient.

We will analyze the design and implementation of LinkedTransferQueue together in the following sections, but we still need to explain two nouns, matching and relaxation, before we begin.

We explained the concept of matching when we introduced SynchronousQueue queues in the previous article, so let's repeat it here. LinkedTransferQueue internally implements the interaction between threads based on queues. Taking "producer-consumer" as an example, when a producer inserts an element into a LinkedTransferQueue, usually the producer thread does not return immediately after the insertion is successful, but waits for the consumer to come to consume. When consumers perform consumption, they find that there are producers waiting in the queue, so the consumption logic, also known as starting the matching process, is executed, which matches the current consumers and producers out of the queue one after another.

Matching describes the operating mechanism of Dual Queue, while relaxation (slack) is an optimization strategy. In order to avoid frequently moving the head and tail pointers of the queue, the author introduces the concept of relaxation degree to measure the distance between the head node (or tail node) and the nearest unmatched node. When a node is matched (or canceled, or inserted), LinkedTransferQueue does not immediately update the corresponding head or tail pointer, but triggers the update when the relaxation is greater than the specified threshold. The range of this threshold is generally set between 1 and 3. If it is too large to reduce the hit rate of effective nodes and increase the length of traversal, too small will increase the competition and overhead of CAS.

TransferQueue interface

The TransferQueue interface was introduced in JDK 1.7 to describe a completely new blocking queue. LinkedTransferQueue is implemented from the TransferQueue interface and is currently the only implementation class for this interface (JDK 1.8). The TransferQueue interface inherits from the BlockingQueue interface. When the blocking queue described by BlockingQueue is empty or full, the corresponding outgoing or inbound threads will block waiting, while TransferQueue goes further. For example, when a thread successfully adds an element to the blocking queue described by TransferQueue, the thread usually blocks until some dequeued thread removes the element added by the queued thread from the queue.

TransferQueue adds the following methods to the BlockingQueue interface:

Public interface TransferQueue extends BlockingQueue {void transfer (E) throws InterruptedException; boolean tryTransfer (E e); boolean tryTransfer (E, long timeout, TimeUnit unit) throws InterruptedException; boolean hasWaitingConsumer (); int getWaitingConsumerCount ();}

The meaning of each method is explained as follows:

Transfer: the producer passes the element directly to the waiting consumer without performing the queuing operation, and waits indefinitely if there is no waiting consumer, during which the interrupt is supported.

TryTransfer: the producer passes the element directly to the waiting consumer without performing the queue operation, and returns false if there is no waiting consumer, providing the corresponding timeout version.

HasWaitingConsumer: check to see if there are waiting consumers.

GetWaitingConsumerCount: returns the number of consumers currently waiting (approximate).

From the definition of the above interface method, we can see that TransferQueue queues support the direct exchange of data between two threads without first storing the data in the queue. If the landing is really needed, the thread can wait on the queue with the data.

Core method implementation

LinkedTransferQueue delegates implementation to the LinkedTransferQueue#xfer method for the methods declared in the BlockingQueue and TransferQueue interfaces, which is also the method that will be analyzed in this section.

Before we begin to analyze the implementation of the LinkedTransferQueue#xfer method, let's introduce the basic field definition of LinkedTransferQueue. LinkedTransferQueue is based on Dual Queue as the underlying storage structure, and defines the Node class to describe the nodes on the Dual Queue. The fields LinkedTransferQueue#head and LinkedTransferQueue#tail point to the head node and tail node of the underlying queue, respectively.

The fields of the Node class are defined as follows:

Static final class Node {/ * * identifies whether the current node is a data node or a request node * / final boolean isData / / false if this is a request node / * stores the data and identifies the matching status: *-if the request node is initially null, point to yourself after matching *-for the data node initially data, then null * / volatile Object item; / / initially non-null if isData; CASed to match / * * subsequent pointer * / volatile Node next / * * record the thread object waiting on the current node * / volatile Thread waiter; / / null until waiting / /. Omit method definition}

The nodes in LinkedTransferQueue are divided into data nodes and request nodes. Data nodes can be simply understood as producer nodes and request nodes as consumer nodes. The Node class marks whether a node is a data node or a request node through the Node#isData field, and carries the data and identifies the matching status of the corresponding node through the Node#item field. The following table shows the changes in the field Node#item of the data node and the request node before and after matching:

Node type data node request node matching before isData = true; item! = nullisData = false; item = null after matching isData = true; item = nullisData = false; item = this

Note: when a node is cancelled, the Node#item field of the node also points to the node itself.

From the above table, we can design a method to determine whether the nodes have matched, as follows:

/ / Node#isMatchedfinal boolean isMatched () {Object x = item; return (x = = this) | | (x = = null) = = isData);}

If the item field of a node points to itself (that is, x = = this), it means that the node has been cancelled, or for the request node, the node has been matched, otherwise we can continue to execute (x = = null) = = isData to judge, as shown below:

If the current node is a data node (that is, isData = true), if the node is matched, the item of the node should be null, so satisfy (x = = null) = = isData.

If the current node is a request node (that is, isData = false), the item of the node should not be null if the node is matched, so satisfy (x = = null) = = isData.

Next, let's analyze the implementation of the LinkedTransferQueue#xfer method. First, let's take a look at the parameter definition of the method, as follows:

Private E xfer (E, boolean haveData, int how, long nanos) {/ /. Implementation of ellipsis method}

Parameter e represents the element value to be added. If it is an out-queue operation, the parameter haveData is used to specify whether to enter the queue or out of the queue. If it is in the queue, haveData is true, otherwise it is the false; parameter how that corresponds to the current operation mode, which is divided into: NOW, ASYNC, SYNC, and TIMED. In the case of TIMED mode, parameter nanos is used to specify the nanosecond value of the current waiting.

Let's further introduce the how parameter. We know that the queue operation methods of LinkedTransferQueue are basically delegated directly to the LinkedTransferQueue#xfer method, while the parameter how is used to control the running logic of the method in different call scenarios. LinkedTransferQueue defines four int type constants, which represent different modes of operation, as follows:

Private static final int NOW = 0; / / for untimed poll, tryTransferprivate static final int ASYNC = 1; / / for offer, put, addprivate static final int SYNC = 2; / / for transfer, takeprivate static final int TIMED = 3; / / for timed poll, tryTransfer

The meaning of each pattern is described as follows:

NOW: return immediately without waiting when there are no matching nodes in the queue, for example, when the producer performs the queue operation, if there are no waiting consumers in the queue, then return immediately.

ASYNC: the element is queued when there are no matching nodes in the queue, but the current thread does not wait but returns immediately, mainly for queuing operations.

SYNC: the element is queued when there is no matching node in the queue, and the current thread is attached to the corresponding node and waits indefinitely.

TIMED: when there is no matching node in the queue, the element is queued, and the current thread will attach to the corresponding node and wait for a timeout.

The parameter values of the main in-queue and out-queue methods implemented by LinkedTransferQueue when delegating the execution of the LinkedTransferQueue#xfer method are set in the following table:

Methods ehaveDatahownanosLinkedTransferQueue#putetrueASYNC0LinkedTransferQueue#addetrueASYNC0LinkedTransferQueue#offer (E) etrueASYNC0LinkedTransferQueue#offer (E, long, TimeUnit) etrueASYNC0LinkedTransferQueue#takenullfalseSYNC0LinkedTransferQueue#poll () nullfalseNOW0LinkedTransferQueue#poll (long, TimeUnit) nullfalseTIMEDtimeoutLinkedTransferQueue#transferetrueSYNC0LinkedTransferQueue#tryTransfer (E) etrueNOW0LinkedTransferQueue#tryTransfer (E, long, TimeUnit) etrueTIMEDtimeout

Let's start by analyzing the implementation of the method LinkedTransferQueue#xfer, as follows:

Private E xfer (E, boolean haveData, int how, long nanos) {/ / if it is a queue operation, the element value to be added is not allowed to be null if (haveData & & (e = = null)) {throw new NullPointerException ();} / / the node to append, if needed Node s = null; retry: for (; ) {/ / restart on append race / * 1. Try to match an existing node * / / traverse the queue from scratch and perform a matching operation for (Node h = head, p = h; p! = null;) {/ / find & match first node boolean isData = p.isData; Object item = p.item on the first unmatched node / / find the first unmatched and uncanceled node if (item! = p & & (item! = null) = isData) {/ / unmatched / / node mode is consistent with this mode of operation and cannot match. Exit the loop and enter the next step if (isData = = haveData) {/ / can't match break } / / patterns complement each other, perform the matching operation, and change the item value of the matching node p to e / / if item is null, then e is data, if item is data Then e is null if (p.casItem (item, e)) {/ / matches successfully / / if the node currently being matched is not a head node, the head pointer needs to be updated to ensure that the relaxation is less than 2 for (Node Q = p) Q! = h;) {Node n = q.next2 unless singleton / / update head to the next node of the matching node p, and update the next node to the current matching node if if the next node is null (head = h & & this.casHead (h, n = = null? Q: n) {/ / self-reference the previous head node and wait for GC h.forgetNext (); break } / / if the relaxation (slack) is less than 2, exit the loop, otherwise continue to move the head pointer if ((h = head) = = null | | (Q = h.next) = = null | |! q.isMatched ()) {break; / / unless slack

< 2 } } // 唤醒在刚刚完成匹配结点上等待的线程 LockSupport.unpark(p.waiter); return cast(item); } } // 结点已被其它线程匹配,继续往后遍历寻找下一个可匹配结点 Node n = p.next; p = (p != n) ? n : (h = head); // 如果 p 已经脱离队列,则从 head 开始寻找 } // end of for // 未找到可以匹配的结点,将当前结点添加到队列末端 if (how != NOW) { // 上游函数不期望立即返回 if (s == null) { s = new Node(e, haveData); } /* 2. Try to append a new node */ // 将结点 s 添加到队列末端,如果成功则返回 s 的前驱结点 Node pred = this.tryAppend(s, haveData); // 返回 null 说明结点 s 入队列失败,重试 if (pred == null) { continue retry; // lost race vs opposite mode } // 阻塞(或自旋)等待匹配 if (how != ASYNC) { /* 3. Await match or cancellation */ return this.awaitMatch(s, pred, e, (how == TIMED), nanos); } } return e; // not waiting }} 由上述实现可以看出,整个 LinkedTransferQueue#xfer 方法的执行分为 3 个阶段(已在代码中标出),针对各个阶段的说明作者在文档中已经给出了概述,这里直接摘录作者的原话: Try to match an existing node; Try to append a new node; Await match or cancellation. 也就是说当一个线程进入 LinkedTransferQueue#xfer 方法时,第 1 步会尝试在队列中寻找可以匹配的结点,如果存在则执行匹配操作;否则如果上游方法不期望立即返回(即不为 NOW 操作模式)则执行第 2 步,将当前元素添加到队列中;如果上游方法允许当前线程等待(即不为 ASYNC 操作模式),则进入等待状态,也就是第 3 步。 下面我们分步骤对这 3 个阶段逐一进行分析,首先来看 步骤 1 ,作者对这一步的详细概述摘录如下: Try to match an existing node Starting at head, skip already-matched nodes until finding an unmatched node of opposite mode, if one exists, in which case matching it and returning, also if necessary updating head to one past the matched node (or the node itself if the list has no other unmatched nodes). If the CAS misses, then a loop retries advancing head by two steps until either success or the slack is at most two. By requiring that each attempt advances head by two (if applicable), we ensure that the slack does not grow without bound. Traversals also check if the initial head is now off-list, in which case they start at the new head. If no candidates are found and the call was untimed poll/offer, (argument "how" is NOW) return. 这一步的核心逻辑在于从队列中寻找可以匹配的结点,并执行匹配操作,具体执行流程概括为: 从队列头部开始遍历队列,寻找第一个未被取消且未被匹配的结点 p,如果存在则进入匹配进程; 校验结点 p 的模式是否与当前操作模式互补,如果相同则无法匹配,需要转而执行步骤 2,将当前结点添加到队列末端; 否则,基于 CAS 修改结点 p 的 item 值(如果是请求结点,则更新 item 为元素值 e;如果是数据结点,则更新 item 为 null),即执行匹配操作; 如果匹配失败,则说明存在其它线程先于完成了匹配操作,继续往后寻找下一个可以匹配的结点; 如果匹配成功,则尝试后移 head 指针,保证 head 结点的松弛度小于 2,并唤醒在匹配结点上阻塞的线程,最后返回本次匹配结点的 item 值。 下面利用图示演示上述执行流程,其中黄色表示消费者结点,青色表示生产者结点(M 表示已匹配,U 表示未匹配),红色表示当前匹配结点。假设当前操作是一个消费者线程,则从队列头部开始往后寻找第一个未被取消且未被匹配的结点,此时各指针的指向如下图 1 所示。在执行完几轮循环之后,当前线程在队列上找到了第一个可以匹配的结点 p,如下图 2 所示。然后执行匹配操作,基于 CAS 尝试将待匹配结点 p 的 item 值修改为 null,如下图 3 所示。

Next, the thread enters the innermost for loop and tries to move the head pointer back to ensure that the relaxation of the head node is less than 2. If another thread happens to update the head pointer during this period, the pointer points as shown in figure 4 above. At this point, the head pointer and the h pointer point differently, so continue to execute the second if judgment of the innermost for loop, and then point to each pointer as shown in figure 5 above. At this point, because the node pointed to by the pointer Q has been matched, we continue to enter the next innermost for loop, which satisfies the first if judgment of the innermost for cycle, updates the head pointer based on CAS, and points the next pointer of the previous head node to itself (self-reference), waiting for GC recovery, as shown in figure 6 above. Finally, wake up the thread waiting on the matching node and return.

If the above steps do not find a node that can match, try to construct a new node for the current element and insert it into the queue, that is, perform step 2. The author's detailed overview of this step is as follows:

Try to append a new node

Starting at current tail pointer, find the actual last node and try to append a new node (or if head was null, establish the first node). Nodes can be appended only if their predecessors are either already matched or are of the same mode. If we detect otherwise, then a new node with opposite mode must have been appended during traversal, so we must restart at phase 1. The traversal and update steps are otherwise similar to phase 1: Retrying upon CAS misses and checking for staleness. In particular, if a self-link is encountered, then we can safely jump to a node on the list by continuing the traversal at current head.

On successful append, if the call was ASYNC, return.

If the current mode of operation is NOW, the upstream method requires that the current node s should be returned immediately when there is no matching node in the queue, then this step is not performed, otherwise the LinkedTransferQueue#tryAppend method is executed to attempt to queue the current node s. This method returns null if the execution fails, otherwise the precursor node of the newly added node s is returned, and the node s itself is returned if there is no precursor node.

The implementation of method LinkedTransferQueue#tryAppend is as follows:

Private Node tryAppend (Node s, boolean haveData) {/ / attempt to queue node s in for (Node t = tail, p = t;;) {/ / move p to last node and append Node n, u / / temps for reads of next & tail / / the current queue is listed as empty if (p = = null & & (p = head) = = null) {/ / 1 / / directly set node s to head and return s node if (this.casHead (null, s)) {return s / / initialize}} / / Node s cannot be used as a successor to node p because the patterns of p and s complement each other and p does not match else if (p.cannotPrecede (haveData)) {/ / 2 return null / / lost race vs opposite mode} / / p is no longer the latest tail node, update else if ((n = p.next)! = null) {/ / 3 / / not last; keep traversing p = p! = t & t! = (u = tail)? (t = u) / / stale tail: (P! = n)? N: null; / / restart if off list} / / node s failed to enter the queue, indicating that p does not point to the latest tail node else if (! p.casNext (null, s)) {/ / 4 p = p.next / / re-read on CAS failure} / / successfully queue node s, move the tail pointer back Ensure that the relaxation degree is less than 2 else {/ / 5 if (p! = t) {/ / update if slack now > = 2 while ((tail! = t | |! this.casTail (t) S) / / move the tail pointer & & (t = tail)! = null & & (s = t.next)! = null / / advance and retry & (s = s.next)! = null & & s! = t) {} return p }}}

The core logic of this step is to queue the node s and move the tail pointer back when the relaxation of the tail node is large. The specific implementation process is summarized as follows:

If the queue is empty, the node s is directly queued and the node s object is returned

Otherwise, check whether node s can be queued. If the precursor node and node s pattern complement each other and do not match, it cannot be queued. Return to null and return to step 1 to start execution.

If node s can be queued, find the current real tail node of the queue and queue node s as a follow-up node

If queuing fails, the precursor node is not the latest queue tail node, and proceed to the next round of cycle retry.

If the queue is successful, it is judged whether the relaxation of the tail node is large, and if it is large, the tail pointer is moved back to reduce the relaxation of the tail node.

The following illustration is used to demonstrate the above execution process. Suppose the current operation is a producer thread that expects to insert a node with an element value of 5 into the queue, and all that exist in the queue are unmatched producer nodes, as shown in figure 1 below. At this time, the queue is not empty, and node s can join the queue, and each pointer points to as shown in figure 2 below. Because the next node of node p is not null, it means that p does not point to the latest tail node, and the pointers of p, t, and n need to be moved back until p points to the tail node, as shown in figures 3, 4 and 5 below.

Next, execute code 4 and try to update the next node of p node from null to s based on CAS, that is, node s is queued, as shown in figure 6 above. If the queue is successful, continue to execute code 5, move the tail pointer back to ensure that the relaxation of the tail node is less than 2, and finally return the precursor node of the node s, as shown in figures 7 and 8 above.

Finally, let's take a look at step 3. The author's detailed overview of this step is as follows:

Await match or cancellation

Wait for another thread to match node; instead cancelling if the current thread was interrupted or the wait timed out. On multiprocessors, we use front-of-queue spinning: If a node appears to be the first unmatched node in the queue, it spins a bit before blocking. In either case, before blocking it tries to unsplice any nodes between the current "head" and the first unmatched node.

Front-of-queue spinning vastly improves performance of heavily contended queues. And so long as it is relatively brief and "quiet", spinning does not much impact performance of less-contended queues. During spins threads check their interrupt status and generate a thread-local random number to decide to occasionally perform a Thread.yield. While yield has underdefined specs, we assume that it might help, and will not hurt, in limiting impact of spinning on busy systems. We also use smaller (1 + 2) spins for nodes that are not known to be front but whose predecessors have not blocked-- these "chained" spins avoid artifacts of front-of-queue rules which otherwise lead to alternating nodes spinning vs blocking. Further, front threads that represent phase changes (from data to request node or vice versa) compared to their predecessors receive additional chained spins, reflecting longer paths typically required to unblock threads during phase changes.

If the current mode of operation is ASYNC, the upstream method requires the thread to return immediately without blocking the wait after completing the queuing operation. For other modes of operation (except NOW and ASYNC), you need to execute the LinkedTransferQueue#awaitMatch method to make the current thread attach to the node that has just been queued and wait. In the case of TIMED operation mode, a timeout wait is performed, otherwise an indefinite wait is performed, during which the interrupt response is supported.

The method LinkedTransferQueue#awaitMatch is implemented as follows:

Private E awaitMatch (Node s, Node pred, E, boolean timed, long nanos) {/ / if you set a timeout, calculate the expiration timestamp final long deadline = timed? System.nanoTime () + nanos: 0L; Thread w = Thread.currentThread (); int spins =-1; / / initialized after first item and cancel checks ThreadLocalRandom randomYields = null; / / bound if needed for (;;) {Object item = s.item; / / the current node matches if (item! = e) {/ / matched s.forgetContents () / / avoid garbage return cast (item);} / / if the thread is interrupted or the wait times out, cancel if ((w.isInterrupted ()) | (timed & & nanos 0) {randomYields = ThreadLocalRandom.current ()) }} / / spin else if several times before blocking (spins > 0) {/ / spin-- spins; if (randomYields.nextInt (CHAINED_SPINS) = = 0) {/ / Random concession Thread.yield () / / occasionally yield}} / / bind the current thread object to the s node else if (s.waiter = = null) {s.waiter = w / / request unpark then recheck} / / if timeout is set, wait for else if (timed) {nanos = deadline-System.nanoTime (); if (nanos > 0L) {LockSupport.parkNanos (this, nanos) }} / / if no timeout is set, wait indefinitely for else {LockSupport.park (this);}

You can see that the thread will spin several times before entering the blocking state, which is mainly to improve the performance of LinkedTransferQueue on multicore CPU and avoid unnecessary blocking and wake-up operations of threads in scenarios where queuing and dequeuing are frequent. The implementation of the above method is basically consistent with the implementation of the TransferStack#awaitFulfill method in the previous article introduced in SynchronousQueue.

At this point, I believe you have a deeper understanding of "how to use JUC's LinkedTransferQueue". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report