In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use ConcurrentLinkedQueue of JUC". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to use ConcurrentLinkedQueue of JUC".
ConcurrentLinkedQueue is a thread-safe, unbounded, non-blocking queue. In JUC packets, thread-safe queues can be divided into two categories: blocking queues and non-blocking queues. The former ensures thread safety based on locks, while the latter ensures thread safety based on CAS mechanism. Blocking queues generally have the word "Blocking" in the class name.
From the Linked keyword we can infer that the underlying ConcurrentLinkedQueue depends on the linked list implementation, and a single linked list is implemented inside the ConcurrentLinkedQueue to store queue elements. The node Node class is defined as follows:
Private static class Node {/ * * Node element value * / volatile E item; / * * pointer to the next node * / volatile Node next; / /. Omit method definition}
The Node class is a typical linked list node definition. In addition, the Node class defines some methods to modify the element values and next pointers of the node, all of which are based on Unsafe implementation. The head and tail fields of ConcurrentLinkedQueue point to the header node and tail node of the queue, respectively, as follows:
Public class ConcurrentLinkedQueue extends AbstractQueue implements Queue, Serializable {private transient volatile Node head; private transient volatile Node tail; public ConcurrentLinkedQueue () {head = tail = new Node (null);} / /. Implementation of ellipsis method}
When we construct an empty ConcurrentLinkedQueue object, both the head and tail of the linked list point to a tag node with an element value of null. Adding null elements is not allowed in ConcurrentLinkedQueue because nodes with a value of null play a special role in ConcurrentLinkedQueue.
Queue interface
The Queue API inherits from the Collection API and adds queue-related operations, as defined as follows:
Public interface Queue extends Collection {boolean add (E); boolean offer (E e); E poll (); E peek (); E element (); E remove ();}
The meaning of each method is explained as follows:
Add: add elements to the queue, return true if successful, and for bounded queues, throw an IllegalStateException exception if the queue is full.
Offer: add elements to the queue, return true if successful, and for bounded queues, return false if the queue is full instead of throwing an exception.
Poll: removes the queue header node and returns the node element value, or null if the queue is empty.
Peek: only gets the header node element value without deleting the node, and returns null if the queue is empty.
Element: only gets the element value of the header node without deleting the node, and throws a NoSuchElementException exception if the queue is empty.
Remove: removes the queue header node and returns the node element value. If the queue is empty, a NoSuchElementException exception is thrown.
Core method implementation
ConcurrentLinkedQueue is implemented from the Queue interface, so let's take a look at its implementation for the core operation methods declared in Queue.
Add elements: add & offer
ConcurrentLinkedQueue implements the methods of adding elements to the queue declared in the Queue interface, namely Queue#add and Queue#offer. Both methods append elements to the end of the queue, and because there is no capacity limit on ConcurrentLinkedQueue, there is no problem that the queue is full. So, for ConcurrentLinkedQueue, there is no difference in implementation between the two methods.
Let's take a look at ConcurrentLinkedQueue's implementation of Queue#offer, as follows:
Public boolean offer (E e) {/ / the element to be added cannot be null checkNotNull (e); / / create the Node node final Node newNode = newNode (e) corresponding to the element to be added; / / add to the tail node for (Node t = tail, p = t;;) {Node Q = p.next If (Q = = null) {/ / 1 / / p is already a tail node. Based on CAS, set the next node if (p.casNext (null, newNode)) {/ * * Successful CAS is the linearization point for e to become an element of this queue, * and for newNode to become "live" of node newNode to p. * / if (p! = t) {/ / hop two nodes at a time / / update tail node this.casTail (t, newNode); / / Failure is OK. } return true;} / / Lost CAS race to another thread; re-read next} else if (p = = Q) {/ / 2 / * * We have fallen off list. If tail is unchanged, it will also be off-list, * in which case we need to jump to head, from which all live nodes are always reachable. * Else the new tail is a better bet. * / p = (t! = (t = tail))? T: head;} else {/ / 3 / / Check for tail updates after two hops. P = (p! = t & & t! = (t = tail))? T: Q;}}
For the element to be added, the above method will first determine whether it is null. As mentioned earlier, ConcurrentLinkedQueue does not allow null elements to be added to it, mainly because the node whose element value is null is a special tag node in ConcurrentLinkedQueue. If the element to be added is not null, the above method wraps the element as a Node node (making the node N) to the end of the queue.
The following illustration shows a variety of different element addition scenarios. In this section, cyan is used to represent head nodes, and orange is used to represent tail nodes. Assuming that the elements of the current queue are composed as shown in figure (1) below, the Q node is null (note: all dotted nodes in this article refer to the node itself as null, while the element value of non-node is null), that is, run to the above code 1 position. You need to change the next node of p from null to N node based on CAS operation, and return true if successful. At this point, p is not equal to t, and the elements of the queue after the completion of the operation are composed as shown in figure (2) below.
Considering that there is multiple thread contention in the above process, suppose that there are now two threads An and B, where A successfully updates the next node of p from null to node1 node when executing Node#casNext in code 1, as shown in figure (1) below. When thread B executes Node#casNext again, the attempt to update the next node of p from null to node2 node will fail, because the next node of p is no longer null, so thread B will enter the next round of for loop, but at this time Q is no longer null and is not equal to p, so enter code 3, the result of this step is to assign Q to p, as shown in figure (2) below. Thread B then proceeds to the next round of the for loop, executing Node Q = p.nexttion, as shown in figure (3) below. Because Q equals null at this time, continue to execute code 1 to change the next node of p from null to node2, as shown in figure (4) below. However, p is not equal to t at this time, so you need to perform ConcurrentLinkedQueue#casTail to update the tail node, as shown in figure (5) below.
Finally, let's analyze the circumstances under which code 2 will be executed. Assuming that the elements of the current queue are composed as shown in figure (1) below, this structure is generally caused by other threads executing the poll method, which will be analyzed in the next section. At this time, the tail node forms a self-reference, and when the for loop starts, both p and t point to the tail node. When the next node of p is assigned to Q, because the next node of p is the tail node itself, Q also points to the tail node. At this point, the Q node is not null and p is equal to Q, so execute code 2 to assign the head node to p, as shown in figure (2) below. So the purpose of this step is to jump out of the self-citation, and the subsequent implementation process can be sorted out by the reader according to figures (3), (4) and (5) below.
In addition to the ConcurrentLinkedQueue#offer method described above, ConcurrentLinkedQueue also implements the ConcurrentLinkedQueue#add method to append elements to the end of the queue, but because ConcurrentLinkedQueue is an unbounded queue, this method simply delegates the request to the ConcurrentLinkedQueue#offer method to execute.
Get element: poll & peek & element
The Queue interface declares three methods for getting elements, including Queue#poll, Queue#peek, and Queue#element. The difference between Queue#poll and Queue#peek is generally familiar, while the Queue#element method is functionally similar to the Queue#peek method in that it gets the element value of the header node of the queue without deleting the node, except that when the queue column is empty, the method Queue#peek returns null, while Queue#element throws an exception. ConcurrentLinkedQueue actually delegates the implementation of the Queue#element method to the ConcurrentLinkedQueue#peek method, only handles the return value of the method, and throws a NoSuchElementException exception if the return value is null.
Let's first take a look at ConcurrentLinkedQueue's implementation of the method Queue#poll, as follows:
Public E poll () {restartFromHead: for (;;) {/ / get the element for (Node h = head, p = h, Q;) {E item = p.item from the header node / / if the current header node is not null, try to change it to null if (item! = null & & p.casItem (item, null)) {/ / 1 / / CAS successfully / / Successful CAS is the linearization point for item to be removed from this queue based on CAS. If (p! = h) {/ / hop two nodes at a time this.updateHead (h, (Q = p.next)! = null)? Q: P);} return item;} / / the current queue is listed as empty else if ((Q = p.next) = = null) {/ / 2 this.updateHead (h, p); return null;} else if (p = = Q) {/ / 3 continue restartFromHead } else {/ / 4 p = Q;} final void updateHead (Node h, Node p) {/ / update the header node from h to p if (h! = p & & this.casHead (h, p)) {/ / update h next node to h own h.lazySetNext (h);}}
The above method uses continue tag syntax to control the execution logic of the code, where the tag name is restartFromHead. In addition, the break keyword also supports tag syntax and works with continue to achieve functions similar to goto keywords.
The following illustration also shows a variety of different scenarios for acquiring elements. In this section, cyan is used to represent the head node and orange is used to represent the tail node. Suppose that the elements of the current queue are composed as shown in figure (1) below, when p and h point to the head node, while the Q node is not assigned, so it is not marked yet. At this point, the node element value pointed to by p is not null, so try to execute the Node#casItem method to modify the node element value to null based on CAS, that is, run the above code 1. Assume that the CAS operation of the current thread is successful, as shown in figure (2) below, because p is equal to h at this time, the node element value is returned directly, that is, the dequeue is successful.
Continue to demonstrate some other cases, assuming that the element value of the header node of the queue is now null, so jump directly to code 2 and assign Q to the next node of p, as shown in figure (1) below, but because the node is not null, it continues to execute. At this point p is not equal to Q, so execute code 4 to assign Q to p, as shown in figure (2) below, and then move on to the next loop.
At this point, the element value of node p is not null, so enter code 1. Consider a scenario where multiple threads compete. Suppose there are now two threads An and B, where A successfully updates the element value of p to null when executing Node#casItem in code 1, as shown in figure (3-1) below. Because p is not equal to h at this time, execute the ConcurrentLinkedQueue#updateHead method to update the header node from h to p, and redirect h's next pointer to yourself, as shown in figure (4-1) below. Finally, the node element value is returned, that is, the queue is successfully dequeued.
Because thread A has successfully operated, thread B is bound to fail when executing the Node#casItem method, so continue to execute code 2, pointing Q to the next node of p, as shown in figure (3-2) above. Because the Q node is null at this time, execute the ConcurrentLinkedQueue#updateHead method to update the header node from h to p, and redirect h's next pointer to itself, as shown in figure 4-2 above. Finally, the null value is returned, which means the queue is successfully dequeued.
Finally, let's analyze the circumstances under which code 3 will be executed. Assuming that the elements of the current queue are composed as shown in figure (1) below, and that there are currently two threads (An and B) competing to perform queue operations, thread A first executes code 1 to change the node p element value to null based on CAS, as shown in figure (2) below. However, before thread A continues to execute the ConcurrentLinkedQueue#updateHead method in code 1 to try to update the head node, thread B enters the for loop, as shown in figure (3) below, where the h and p pointers of thread B point to the head node, but before thread B continues to execute downward, thread An executes the ConcurrentLinkedQueue#updateHead method, updates the head node from h to p, modifies h's next pointer to itself, and finally returns the element value. As shown in figure (4) below.
At this point, as shown in figure (5) above, when thread B continues to execute, it finds that the element value of p node is null, so jump execution code 2 assigns the next node of p to Q, as shown in figure (6) above. Because the p node is not null and is self-referenced at this time, p is equal to Q, and continue to execute code 3 to jump out of this for cycle and start all over again. When you enter the for loop again, thread B sees the queue structure as shown in figure (7) above.
At the end of this section, let's take a look at the implementation of the ConcurrentLinkedQueue#peek method, which is different from the ConcurrentLinkedQueue#poll method in that it only takes the line header element and does not remove the header node. The method is implemented as follows:
Public E peek () {restartFromHead: for (;;) {for (Node h = head, p = h, Q;;) {E item = p.item; if (item! = null | | (Q = p.next) = = null) {/ / 1 this.updateHead (h, p); return item } else if (p = = Q) {/ / 2 continue restartFromHead;} else {/ / 3 p = Q;}
The above implementation may not seem easy to understand at first, but if it is analyzed in combination with the diagram as above, it will be clear at a glance. Instead of demonstrating the execution logic of each step, interested readers can draw it themselves.
Remove element: remove
For the delete element operation, Queue only declares the Queue#remove method, which is used to delete the queue header node and return the node element value, which is different from the Queue#poll method which returns the null value, which throws an exception when the queue column is empty. ConcurrentLinkedQueue also implements the remove method with parameters (inherited from the Collection interface), which is used to delete the node corresponding to the target element value from the current queue, and to delete the first if there is more than one. Let's take a look at the implementation of the remove method with parameters, as follows:
Public boolean remove (Object o) {if (o! = null) {Node next, pred = null; / / traverses the element for in the queue (Node p = this.first (); p! = null; pred = p, p = next) {boolean removed = false; E item = p.item If (item! = null) {/ / if the current traversal element is not the one you want to delete, continue to get the successor node if (! o.equals (item)) {next = this.succ (p); continue } / / the current ergodic element is the element you want to delete. Set the node to null removed = p.casItem (item, null) based on CAS;} / * * points here into two cases: * 1. The current node is null. * 2. The current node is the node to be deleted. * / / get the successor node next = this.succ (p) of the current node; / / update the next pointer of the precursor node to the successor node if of the current node (pred! = null & & next! = null) {/ / unlink pred.casNext (p, next) } if (removed) {return true;} return false;}
The deletion process starts traversing from the queue header, and when the element to be deleted is encountered, the corresponding node element value is updated to null based on CAS. In the traversal process, all nodes whose element value is null will be eliminated.
Other operations: size & contains
ConcurrentLinkedQueue provides a ConcurrentLinkedQueue#size method to get the length of the queue. This method traverses the queue from scratch, counts nodes whose element value is not null, and takes Integer.MAX_VALUE as the upper bound of the count. It is important to note that, unlike the general set, the time complexity of the whole process of calculating queue size in ConcurrentLinkedQueue is O (n), and the result is inaccurate. If other threads add or delete the queue during this period, it will not be reflected in the return value of the ConcurrentLinkedQueue#size method.
The method ConcurrentLinkedQueue#contains is used to determine whether the queue contains the element value specified by the parameter. In implementation, it is similar to the ConcurrentLinkedQueue#size method, it needs to traverse the queue from scratch and compare the element value.
Thank you for your reading, the above is the content of "how to use JUC ConcurrentLinkedQueue", after the study of this article, I believe you have a deeper understanding of how to use JUC ConcurrentLinkedQueue, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.