Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to build java High performance queues

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the knowledge of "how to build a java high-performance queue". Many people will encounter this dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Queue

Queue is a first-in, first-out (First In First Out,FIFO) data structure, similar to queuing in real-life scenarios, first-come-first-served.

Using arrays and linked lists to implement simple queues, which we have introduced earlier, will not be discussed here. Students who are interested can click on the following link to view:

Review the four basic data structures: array, linked list, queue and stack

Speaking of high-performance queues, of course, it means queues that can work well in a high concurrency environment. Here, good queues mainly refer to two aspects: concurrency security and good performance.

Concurrency secure queue

In Java, by default, there are also some concurrency secure queues:

Queue bounded lock data structure ArrayBlockingQueue bounded locked array LinkedBlockingQueue optional bounded chained list ConcurrentLinkedQueue unbounded unlocked list SynchronousQueue unbounded unlocked queue or stack LinkedTransferQueue unbounded unlocked list PriorityBlockingQueue unbounded locked heap DelayQueue unbounded locked heap

The source code parsing shortcut entry for these queues: the end of the Java concurrent collection

To sum up, the main data structures to realize concurrent security queues are array, linked list and heap, and heap is mainly used to implement priority queue, which is not universal, so let's not discuss it for the time being.

From the perspective of boundedness, only ArrayBlockingQueue and LinkedBlockingQueue can implement bounded queues, and the others are unbounded queues.

From the point of view of locking, both ArrayBlockingQueue and LinkedBlockingQueue adopt the locking method, and the others are realized by using the lock-free technology of CAS.

From a security point of view, we generally choose bounded queues to prevent memory overflows caused by producers' excessive speed.

From a performance point of view, we generally need to consider a lock-free way to reduce the performance loss caused by thread context switching.

From the perspective of JVM, we generally choose the implementation of arrays, because linked lists will frequently add and delete nodes, resulting in frequent garbage collection, which is also a kind of performance loss.

Therefore, the best choice is: array + bounded + unlocked.

JDK does not provide such queues, so many open source frameworks implement high-performance queues themselves, such as Disruptor and jctools used in Netty.

High performance queue

Instead of discussing a specific framework here, we will only introduce general techniques for implementing high-performance queues and implement one ourselves.

Circular array

From the above discussion, we know that the data structure used to implement a high-performance queue can only be an array, while an array to implement a queue must use a circular array.

Circular arrays are generally implemented by setting two pointers: putIndex and takeIndex, or writeIndex and readIndex, one for writing and one for reading.

When the write pointer reaches the end of the array, it starts from scratch, and of course, you can't go over the read pointer. Similarly, when the read pointer reaches the end of the array, it starts from the beginning, and of course, you can't read unwritten data.

In order to prevent the overlap between the write pointer and the read pointer, and it is impossible to tell whether the queue is full or empty, another size field is generally added:

Therefore, the data structure of using a circular array to implement a queue is generally:

Public class ArrayQueue {private T [] array; private long wrtieIndex; private long readIndex; private long size;}

In a single-threaded case, this will not be a problem, but in a multithreaded environment, it can cause serious pseudo-sharing problems.

What is pseudo-sharing?

In the computer, there are many storage units, what we contact most is memory, which is also called main memory. In addition, CPU also has three levels of cache: L1, L2, L3 L1 is closest to CPU, of course, its storage space is also very small, L2 is slightly larger than L1, L3 is the largest, and multiple core data can be cached at the same time. When CPU fetches data, it first reads from the L1 cache, if it is not read from the L2 cache, if it is not read from the L3 cache, if there is no three-level cache, it will be read from memory at last. The farther away from the CPU core, the longer the relative time, so if you want to do some very frequent operations, try to ensure that the data is cached in L1, which can greatly improve performance.

Cache Lin

The data in the three-level cache does not mean caching one data at a time, but caching a batch of data at a time, also known as Cache Line, which is usually 64 bytes.

Each time, when CPU goes to memory to get the data, it will also take the data behind it (composed of 64 bytes). Let's take the long array as an example. When CPU fetches a long in the array, it will fetch the next 7 long together to the cache row.

This can speed up the data processing to a certain extent, because when dealing with the data with subscript 0 at this time, the data with subscript 1 may have to be processed at the next moment, and it is much faster to fetch it directly from the cache.

However, this brings a new problem-pseudo-sharing.

Pseudo sharing

Imagine that two threads (CPU) are processing the data in this array at the same time, and both CPU are cached. One CPU is adding 1 to the data of array [0] and the other CPU is adding 1 to the data of array [1]. Then, when writing back to main memory, which cache row's data will prevail (when writing back to main memory, it will also be written back in the form of a cache line), so, at this point, The two cache lines need to be "locked", one CPU first modify the data, write back to the main memory, the other CPU can read the data and modify the data, and then write back to the main memory, this is bound to bring performance loss, the emergence of this phenomenon is called pseudo-sharing, this kind of "locking" way is called memory barrier, about the memory barrier knowledge we will not expand the description.

So, how to solve the problems caused by pseudo-sharing?

Taking queues implemented by circular arrays as an example, writeIndex, readIndex, and size are now handled like this:

So, we only need to add seven long between writeIndex and readIndex to isolate them, and the same is true between readIndex and size.

This eliminates the pseudo-sharing problem between writeIndex and readIndex, because writeIndex and readIndex must be updated in two different threads, so the performance improvement after eliminating pseudo-sharing is obvious.

If there are multiple producers, writeIndex is bound to be contended, at this point, how to modify writeIndex amicably? That is, one producer thread modifies the writeIndex, and the other producer thread needs to be visible immediately.

The first thing you think of is volatile, yes, but volatile alone is not enough. Volatile can only guarantee visibility and order, not atomicity, so you need to add the atomic instruction CAS,CAS. Atomic classes AtomicInteger and AtomicLong both have the function of CAS, so should we use them directly? Definitely not. Look closely and find that they are all implemented by calling Unsafe in the end.

OK, next is the turn of the most powerful bottom killer-Unsafe.

Unsafe

Unsafe not only provides instructions for CAS, but also provides many other low-level methods of manipulation, such as manipulating direct memory, changing the value of private variables, instantiating a class, blocking / waking up threads, methods with memory barriers, and so on.

About Unsafe, you can read this article: Unsafe Analysis of java Magic Class

Of course, building high-performance queues mainly uses Unsafe's CAS instructions and methods with memory barriers:

/ / Atomic instruction public final native boolean compareAndSwapLong (Object var1, long var2, long var4, long var6); / / get the value in the form of volatile, which is equivalent to adding the volatile keyword public native long getLongVolatile (Object var1, long var2) to the variable; / / delay the update, and the changes to the variable are not immediately written back to main memory, that is, another thread will not immediately see the public native void putOrderedLong (Object var1, long var2, long var4)

Well, that's enough of the underlying knowledge, and it's time to show the real technology-handwritten high-performance queues.

Handwritten high performance queue

Let's assume a scenario where there are multiple producers (Multiple Producer) but only one consumer (Single Consumer). This is a classic scenario in Netty. How can such a queue be implemented?

Go directly to the code:

/ * * Multi-producer single consumer queue * * @ param * / public class MpscArrayQueue {long p01, p02, p03, p04, p05, p06, p07; / / private T [] array; long p1, p2, p3, p4, p5, p6, p7; / / write pointer, multiple producers, so declared as volatile private volatile long writeIndex; long p11, p12, p13, p14, p15, p16, p17 / / read pointer, there is only one consumer, so do not declare as volatile private long readIndex; long p21, p22, p23, p24, p25, p26, p27; / / number of elements, producers and consumers may change, so declare as volatile private volatile long size; long p31, p32, p33, p34, p35, p36, p37; / / Unsafe variable private static final Unsafe UNSAFE / / Array Base offset private static final long ARRAY_BASE_OFFSET; / / Array element offset private static final long ARRAY_ELEMENT_SHIFT; / / writeIndex offset private static final long WRITE_INDEX_OFFSET; / / readIndex offset private static final long READ_INDEX_OFFSET; / / size offset private static final long SIZE_OFFSET; static {Field f = null Try {/ / get the instance of Unsafe f = Unsafe.class.getDeclaredField ("theUnsafe"); f.setAccessible (true); UNSAFE = (Unsafe) f.get (null); / / calculate the array base offset ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset (Object [] .class) / / calculate the element offset in the array / / simply understand that in a 64-bit system, there are 4 bytes of compressed pointers and 8 bytes of uncompressed pointers int scale = UNSAFE.arrayIndexScale (Object [] .class); if (4 = = scale) {ARRAY_ELEMENT_SHIFT = 2 } else if (8 = = scale) {ARRAY_ELEMENT_SHIFT = 3;} else {throw new IllegalStateException ("unknown pointer size");} / / calculate the offset of writeIndex WRITE_INDEX_OFFSET = UNSAFE .objectFieldOffset (MpscArrayQueue.class.getDeclaredField ("writeIndex")) / calculate the offset of readIndex READ_INDEX_OFFSET = UNSAFE .objectFieldOffset (MpscArrayQueue.class.getDeclaredField ("readIndex")); / / calculate the offset of size SIZE_OFFSET = UNSAFE .objectFieldOffset (MpscArrayQueue.class.getDeclaredField ("size"));} catch (Exception e) {throw new RuntimeException () }} / / Constructor public MpscArrayQueue (int capacity) {/ / rounded to the Nth power of 2 (without considering crossing the boundary) capacity = 1 = this.array.length) {return false;} / / recapture the value of writeIndex writeIndex = this.writeIndex for each loop / / the value of the atomic update writeIndex in the while loop / / if it fails to resume the above process} while (! UNSAFE.compareAndSwapLong (this, WRITE_INDEX_OFFSET, writeIndex, writeIndex + 1)) / / this indicates that the above atomic update is successful / / then put the value of the element in the location of writeIndex / / and update size long eleOffset = calcElementOffset (writeIndex, this.array.length-1); / / defer the update to main memory, and update UNSAFE.putOrderedObject (this.array, eleOffset, t) when read / / Update to death until do {size = this.size;} while (! UNSAFE.compareAndSwapLong (this, SIZE_OFFSET, size, size + 1)); return true;} / / consumption element public T take () {long size = this.size; / / if size is 0, the queue is empty and if is returned directly (size)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report