In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the relevant knowledge of "what are the two Bug hidden in the Java8 ConcurrentHashMap source code?" in the operation of the actual case, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Java 7 ConcurrenHashMap source code I suggest you all take a look at that version of the source code is the Java multithreaded programming textbook. In the source code of Java 7, the author is very cautious about the use of pessimistic locks, most of which are converted to spin locks plus volatile to get the same semantics, even if they have to use them in the end, the author will use various techniques to reduce the critical area of locks. We also mentioned in the previous article that spin lock is a better choice when the critical section is small because it avoids thread switching context due to blocking, but it is also a lock in essence. During spin waiting, only one thread can enter the critical section, and other threads will only spin to consume CPU time slices. The implementation of ConcurrentHashMap in Java 8 avoids the limitation of spin lock and provides higher concurrency performance through some ingenious design and techniques. If the source code of the Java 7 version teaches us how to convert pessimistic locks into spin locks, then we can even see the methods and techniques of how to convert spin locks to no locks in Java 8.
Read a book thin
Image
Photo Source: https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/
Before we start this article, we should first have such a picture in mind. If some students are familiar with HashMap, then this picture should be no stranger. In fact, the ConcurrentHashMap and HashMap of Java 8 are basically the same in the design of the overall data structure.
ConcurrentHashMap in Java 7 uses a lot of programming skills to improve performance, but there is still a lot of room for improvement in the design of Segment. The design of ConcurrrentHashMap in Java 7 has the following points to improve:
1. When Segment expands, non-expansion threads have to suspend and wait when writing to this Segment.
two。 The read operation of ConcurrentHashMap requires two hash addressing, which actually results in additional performance loss in the case of more reads and less writes.
3. Although unlocked reads are attempted first in the implementation of the size () method, if another thread writes during this process, the thread that calls size () will lock the entire ConcurrentHashMap, which is the only global lock for the whole ConcurrrentHashMap, which still has performance risks to the underlying components.
4. In extreme cases (such as when the client implements a poorly performing hash function) the complexity of the get () method degrades to O (n).
For 1 and 2, the design in Java 8 discards the use of Segment and reduces the granularity of pessimistic locks to the bucket dimension, so there is no need to hash twice when calling get. The design of size () is the biggest highlight of the Java 8 release, which we will explain in more detail in a later article. As for the red-black tree, this article still does not elaborate too much. The next chapter will dig into the details and read the book thick. the modules involved are: initialization, put method, expansion method transfer and size () method, while other modules, such as hash function, have changed little, so I will not go into it any more.
Prepare knowledge
ForwardingNode
Static final class ForwardingNode extends Node {final Node [] nextTable; ForwardingNode (Node [] tab) {/ / MOVED =-1 the hash of forwardingNode is-1 super (MOVED, null, null, null); this.nextTable = tab;}}
In addition to normal Node and TreeNode, ConcurrentHashMap introduces a new data type ForwardingNode. We only show its construction method here. ForwardingNode can be used in two ways:
Indicates that a bucket has been copied to a new bucket array during dynamic expansion
If there is a call to the get method during dynamic expansion, ForwardingNode will forward the request to the new bucket array to avoid blocking the call to the get method, and ForwardingNode will save the expanded bucket array nextTable during construction.
UNSAFE.compareAndSwap***
This is a tool for implementing CAS in Java 8 version of ConcurrentHashMap. Taking the int type as an example, the method definition is as follows:
/ * * Atomically update Java variable to x if it is currently * holding expected. * @ return true if successful * / public final native boolean compareAndSwapInt (Object o, long offset, int expected, int x)
The corresponding semantics are:
If the value of the object o starting address offset is equal to expected, set the value to x and return true to indicate that the update was successful, otherwise return false indicates that CAS failed
Initialize public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) {if (! (loadFactor > 0.0f) | | initialCapacity
< 0 || concurrencyLevel = (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中说过 this.sizeCtl = cap; } 即使是最复杂的一个初始化方法代码也是比较简单的,这里我们只需要注意两个点: concurrencyLevel在Java 7中是Segment数组的长度,由于在Java 8中已经废弃了Segment,因此concurrencyLevel只是一个保留字段,无实际意义 sizeCtl这个值第一次出现,这个值如果等于-1则表明系统正在初始化,如果是其他负数则表明系统正在扩容,在扩容时sizeCtl二进制的低十六位等于扩容的线程数加一,高十六位(除符号位之外)包含桶数组的大小信息 put方法public V put(K key, V value) { return putVal(key, value, false); } put方法将调用转发到putVal方法: final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; // 【A】延迟初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 【B】当前桶是空的,直接更新 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } // 【C】如果当前的桶的第一个元素是一个ForwardingNode节点,则该线程尝试加入扩容 else if ((ffh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 【D】否则遍历桶内的链表或树,并插入 else { // 暂时折叠起来,后面详细看 } } // 【F】流程走到此处,说明已经put成功,map的记录总数加一 addCount(1L, binCount); return null; } 从整个代码结构上来看流程还是比较清楚的,我用括号加字母的方式标注了几个非常重要的步骤,put方法依然牵扯出很多的知识点 桶数组的初始化private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 说明已经有线程在初始化了,本线程开始自旋 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // CAS保证只有一个线程能走到这个分支 try { if ((tab = table) == null || tab.length == 0) { int n = (sc >0)? Sc: DEFAULT_CAPACITY; @ SuppressWarnings ("unchecked") Node [] nt = (Node []) new Node [n]; tabtable = tab = nt; / / sc = n-n sc 4 = 0.75n sc = n-(n > 2) }} finally {/ / restore sizeCtl > 0 is equivalent to releasing lock sizeCtl = sc;} break;}} return tab;}
In the process of initializing the bucket array, how does the system ensure that there is no concurrency problem? the key point lies in the use of spin locks. When multiple threads execute the initTable method, CAS can ensure that only one thread can enter the real initialization branch, and the other threads are spinning waiting. Let's focus on three points in this code:
As mentioned earlier, when a thread starts initializing a bucket array, the sizeCtl is set to-1 through CAS, which is used as a sign for other threads to start spinning waiting.
When the initialization of the bucket array is finished, the value of sizeCtl is restored to a positive number, which is equal to 0.75 times the length of the bucket array. This value has the same meaning as the THRESHOLD in the previous HashMap, and is the critical point for the system to trigger capacity expansion.
The operation on sizeCtl in the finally statement does not use CAS because CAS guarantees that only one thread can execute to this place
Add the first element of the bucket array, static final Node tabAt (Node [] tab, int I) {return (Node) U.getObjectVolatile (tab) ((long) I > RESIZE_STAMP_SHIFT)! = rs | | sc = = rs + 1 | | sc = = rs + MAX_RESIZERS | | transferIndex > RESIZE_STAMP_SHIFT)! = rs | | sc = = rs + 1 | | sc = = rs + MAX_RESIZERS | | transferIndex > > RESIZE_STAMP_SHIFT)! = rs ensures that all threads are expanded based on the same old bucket array transferIndex > 3) / NCPU: n)
< MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // 初始化新的桶数组 try { @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n = bound || finishing) advance = false; else if ((nextIndex = transferIndex) stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n | | I + n > = nextn) {int sc; if (finishing) {nextTable = null; table = nextTab; sizeCtl = (n > > 1); return } if (U.compareAndSwapInt (this, SIZECTL, sc = sizeCtl, sc-1) {/ / determine whether it will be the last expansion thread if ((sc-2)! = resizeStamp (n))
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.