Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use the concurrent counting class LongAdder

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "how to use concurrent counting LongAdder". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

AtomicLong completes the atomic increment and decrement operation through the principle of CAS (that is, Compare And Swap), and there is no thread unsafe result in the case of concurrency. Value in AtomicLong is decorated with volatile, and each thread is visible to the latest value of value. Let's go deeper with the incrementAndGet () method.

Public final long incrementAndGet () {

Return unsafe.getAndAddLong (this, valueOffset, 1L) + 1L

}

Here is the method that calls unsafe

Public final long getAndAddLong (Object var1, long var2, long var4) {

Long var6

Do {

Var6 = this.getLongVolatile (var1, var2)

} while (! this.compareAndSwapLong (var1, var2, var6, var6 + var4))

Return var6

}

In the method, this.compareAndSwapLong () has four parameters, var1 is the class object that needs to be modified, var2 is the memory address of the field to be modified, var6 is the value of the field before modification, and var6+var4 is the value of the modified field. CompareAndSwapLong can successfully set the field to var6+var4 only if the actual value of the field is equal to the var6 value.

Go on and look a little deeper

Public final native boolean compareAndSwapLong (Object var1, long var2, long var4, long var6)

Here Unsafe.compareAndSwapLong is the native method, and the bottom layer completes the call through JNI (Java Native Interface), which is actually done by calling the CPU instruction with the help of C.

The do-while loop is used in the implementation, and if the CAS fails, it will continue to retry until it succeeds. When concurrency is particularly high, although there may be many loops here, thread safety can be ensured. However, if the spin CAS operation is not successful for a long time and the competition is strong, it will bring great overhead with CPU, occupy more execution resources, and may affect the computing of other main businesses.

How to optimize AtomicLong by LongAdder

When Doug Lea is in jdk1.5, it optimizes thread safety and concurrency performance for HashMap, and introduces ConcurrentHashMap for segmented lock implementation. General Java interview is basically inseparable from the question of ConcurrentHashMap, an online celebrity. In addition, in ForkJoinPool, Doug Lea uses fine-grained locks on WorkQueue in its work theft algorithm to reduce concurrency competition. For more details, please refer to my original article ForkJoin use and principle analysis. If you already have a deep understanding of ConcurrentHashMap, then the implementation of LongAdder will be relatively simple now.

Let's take a look at the increase () method implementation of LongAdder

Public void add (long x) {

Cell [] as; long b, v; int m; Cell a

/ / the first if makes two judgments: (1) if the cells is not empty, go directly to the second if statement. (2) the cas instruction will also be used to try add first, and return directly if it succeeds. If it fails, it means that there is competition, and add is needed.

If ((as = cells)! = null | |! casBase (b = base, b + x)) {

Boolean uncontended = true

If (as = = null | | (m = as.length-1)

< 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } 这里用到了Cell类对象,Cell对象是LongAdder高并发实现的关键。在casBase冲突严重的时候,就会去创建Cell对象并添加到cells中,下面会详细分析。 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } //提供CAS方法修改当前Cell对象上的value final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } 而这一句a = as[getProbe() & m]其实就是通过getProbe()拿到当前Thread的threadLocalRandomProbe的probe Hash值。这个值其实是一个随机值,这个随机值由当前线程ThreadLocalRandom.current()产生。不用Rondom的原因是因为这里已经是高并发了,多线程情况下Rondom会极大可能得到同一个随机值。因此这里使用threadLocalRandomProbe在高并发时会更加随机,减少冲突。更多ThreadLocalRandom信息想要深入了解可关注这篇文章并发包中ThreadLocalRandom类原理浅尝。拿到as数组中当前线程的Cell对象,然后再进行CAS的更新操作,我们在源码上进行分析。longAccumulate()是在父类Striped64.java中。 final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { //如果当前线程的随机数为0,则初始化随机数 ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; //如果当前cells数组不为空 if ((as = cells) != null && (n = as.length) >

0) {

/ / if the Cell element of the cells corresponding to the thread random number corresponding to the array is not empty

If (a = as [(n-1) & h]) = = null) {

/ / when using operations related to LongAdder's Cell array, you need to obtain the locks of the global cellsBusy before performing related operations. If another thread is currently in use, abandon this step and continue the for loop to retry.

If (cellsBusy = = 0) {/ / Try to attach new Cell

/ / the initial value of Cell is x, which means that it has been added after creation.

Cell r = new Cell (x); / / Optimistically create

/ / casCellsBusy acquires the lock, and cellsBusy acquires the lock through CAS. When cellsBusy is set to 1 successfully, the lock is acquired.

If (cellsBusy = = 0 & & casCellsBusy ()) {

Boolean created = false

Try {/ / Recheck under lock

Cell [] rs; int m, j

If ((rs = cells)! = null & &

(M = rs.length) > 0 & &

Rs [j = (m-1) & h] = = null) {

Rs [j] = r

Created = true

}

} finally {

/ / release the lock inside finally

CellsBusy = 0

}

If (created)

Break

Continue; / / Slot is now non-empty

}

}

Collide = false

}

Else if (! wasUncontended) / / CAS already known to fail

WasUncontended = true; / / Continue after rehash

/ / if an is not empty, cas x is added to a. If it is successful, it will be returned.

Else if (a.cas (v = a.value, (fn = = null)? V + x:

Fn.applyAsLong (v, x)

Break

/ / if the length n of cells is already greater than the number of CPU, it is meaningless to continue to expand the capacity, so it is directly marked as non-conflict.

Else if (n > = NCPU | | cells! = as)

Collide = false; / / At max size or stale

Else if (! collide)

Collide = true

/ / to this step, it means that an is not empty, but there are multiple threads competing for CAS operations on a, so it is necessary to expand the cells array, which is twice the original length.

Else if (cellsBusy = = 0 & & casCellsBusy ()) {

Try {

If (cells = = as) {/ / Expand table unless stale

Cell [] rs = new Cell [n

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report