ConcurrentHashMap
我们对JDK1.8版本的ConcurrentHashMap进行说明,1.8版本的ConcurrentHashMap相比之前的版本主要做了两处改进:
- 使用CAS代替分段锁。
- 红黑树,这一点和HashMap是一致的。
put
最核心的便是put方法:
1public V put(K key, V value) {
2 return putVal(key, value, false);
3}
最后一个参数为onlyIfAbsent,表示只有在key对应的value不存在时才将value加入,所以putVal是put和putIfAbsent两个方法的真正实现。
1final V putVal(K key, V value, boolean onlyIfAbsent) {
2 if (key == null || value == null) throw new NullPointerException();
3 int hash = spread(key.hashCode());
4 int binCount = 0;
5 //volatile读
6 for (Node<K,V>[] tab = table;;) {
7 Node<K,V> f; int n, i, fh;
8 if (tab == null || (n = tab.length) == 0)
9 //初始化
10 tab = initTable();
11 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
12 if (casTabAt(tab, i, null,
13 new Node<K,V>(hash, key, value, null)))
14 break; // no lock when adding to empty bin
15 }
16 else if ((fh = f.hash) == MOVED)
17 tab = helpTransfer(tab, f);
18 else {
19 //节点添加
20 V oldVal = null;
21 synchronized (f) {
22 if (tabAt(tab, i) == f) {
23 if (fh >= 0) {
24 binCount = 1;
25 for (Node<K,V> e = f;; ++binCount) {
26 K ek;
27 if (e.hash == hash &&
28 ((ek = e.key) == key ||
29 (ek != null && key.equals(ek)))) {
30 oldVal = e.val;
31 if (!onlyIfAbsent)
32 e.val = value;
33 break;
34 }
35 Node<K,V> pred = e;
36 if ((e = e.next) == null) {
37 pred.next = new Node<K,V>(hash, key, value, null);
38 break;
39 }
40 }
41 }
42 else if (f instanceof TreeBin) {
43 Node<K,V> p;
44 binCount = 2;
45 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
46 oldVal = p.val;
47 if (!onlyIfAbsent)
48 p.val = value;
49 }
50 }
51 }
52 }
53 if (binCount != 0) {
54 if (binCount >= TREEIFY_THRESHOLD)
55 treeifyBin(tab, i);
56 if (oldVal != null)
57 return oldVal;
58 break;
59 }
60 }
61 }
62 addCount(1L, binCount);
63 return null;
64}
table便是其数据的存放载体:
1transient volatile Node<K,V>[] table;
它是volatile的。
初始化
如果table为空或大小为0,那么将对其进行初始化操作,initTable:
1private final Node<K,V>[] initTable() {
2 Node<K,V>[] tab; int sc;
3 //volatile读
4 while ((tab = table) == null || tab.length == 0) {
5 //volatile读
6 if ((sc = sizeCtl) < 0)
7 Thread.yield(); // lost initialization race; just spin
8 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
9 try {
10 if ((tab = table) == null || tab.length == 0) {
11 int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
12 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
13 table = tab = nt;
14 //sizeCtl设为当前大小的3 / 4
15 sc = n - (n >>> 2);
16 }
17 } finally {
18 sizeCtl = sc;
19 }
20 break;
21 }
22 }
23 return tab;
24}
sizeCtl是ConcurrentHashMap的初始化,扩容操作中一个至关重要的控制变量,其声明:
1private transient volatile int sizeCtl;
其取值可能为:
-
0: 初始值。
-
-1: 正在进行初始化。
-
负值(小于-1): 表示正在进行扩容,因为ConcurrentHashMap支持多线程并行扩容。
-
正数: 表示下一次触发扩容的临界值大小,即当前值 * 0.75(负载因子)。
从源码中可以看出,ConcurrentHashMap只允许一个线程进行初始化操作,当其它线程竞争失败(sizeCtl < 0)时便会进行自旋,直到竞争成功(初始化)线程完成初始化,那么此时table便不再为null,也就退出了while循环。
Thread.yield方法用于提示CPU可以放弃当前线程的执行,当然这只是一个提示(hint),这里对此方法的调用是一个优化手段。
对SIZECTL字段CAS更新的成功便标志者线程赢得了竞争,可以进行初始化工作了,剩下的就是一个数组的构造过程,一目了然。
头结点设置
如果key对应的bin为空,那么我们只需要将给定的节点 设为头结点即可,这里对应putVal源码中的下面的部分:
1else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
2 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
3 break;
4}
这里tabAt是一次volatile读,casTabAt为CAS操作。
节点添加
如果key对应的bin不为 null,那么就说明需要进行节点添加,从源码可以看出,这里对bin的头结点进行了加锁操作。我的理解为,这里需要遍历整个链表或搜索红黑树以判断给定的节点(值)是否已存在,同时需要记录链表节点的个数,以决定是否需要将其转化为红黑树。
转为红黑树
指putVal源码中的:
1if (binCount != 0) {
2 if (binCount >= TREEIFY_THRESHOLD)
3 treeifyBin(tab, i);
4 if (oldVal != null)
5 return oldVal;
6 break;
7}
注意,这段代码是在上述(节点添加部分)同步代码块之外执行的。
TREEIFY_THRESHOLD表示将链表转为红黑树的链表长度的临界值,默认为8.
1private final void treeifyBin(Node<K,V>[] tab, int index) {
2 Node<K,V> b; int n, sc;
3 if (tab != null) {
4 if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
5 //扩容
6 tryPresize(n << 1);
7 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
8 synchronized (b) {
9 if (tabAt(tab, index) == b) {
10 TreeNode<K,V> hd = null, tl = null;
11 for (Node<K,V> e = b; e != null; e = e.next) {
12 TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
13 if ((p.prev = tl) == null)
14 hd = p;
15 else
16 tl.next = p;
17 tl = p;
18 }
19 setTabAt(tab, index, new TreeBin<K,V>(hd));
20 }
21 }
22 }
23 }
24}
扩容
如果当前bin的个数未达到MIN_TREEIFY_CAPACITY,那么不再转为红黑树,转而进行扩容。MIN_TREEIFY_CAPACITY默认为64.tryPresize:
1private final void tryPresize(int size) {
2 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
3 tableSizeFor(size + (size >>> 1) + 1);
4 int sc;
5 //volatile读,没有正在进行初始化或扩容的操作
6 while ((sc = sizeCtl) >= 0) {
7 Node<K,V>[] tab = table; int n;
8 //这里实际上进行了初始化工作
9 if (tab == null || (n = tab.length) == 0) {
10 n = (sc > c) ? sc : c;
11 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
12 try {
13 if (table == tab) {
14 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
15 table = nt;
16 sc = n - (n >>> 2);
17 }
18 } finally {
19 sizeCtl = sc;
20 }
21 }
22 }
23 //已达到最大值,无法再进行扩容
24 else if (c <= sc || n >= MAXIMUM_CAPACITY)
25 break;
26 else if (tab == table) {
27 int rs = resizeStamp(n);
28 if (sc < 0) {
29 //竞争失败
30 Node<K,V>[] nt;
31 //判断是否已经完成
32 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
33 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
34 break;
35 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
36 transfer(tab, nt);
37 }
38 //竞争成功
39 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
40 transfer(tab, null);
41 }
42 }
43}
前面提到过了,ConcurrentHashMap支持多线程并行扩容,具体来说,是支持多线程将节点从老的数组拷贝到新的数组,而新数组创建仍是一个线程完成(不然多个线程创建多个对象,最后只使用一个,这不是浪费是什么?)
竞争成功的线程为transfer方法的nextTab参数传入null,这将导致新数组的创建。竞争失败的线程将会判断当前节点转移工作是否已经完成,如果已经完成,那么意味着扩容的完成,退出即可,如果没有完成,那么此线程将会进行辅助转移。
判断是否已经完成的条件只能理解(nt = nextTable) == null || transferIndex <= 0两个。
转移
1private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
2 int n = tab.length, stride;
3 //1. 分片
4 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
5 stride = MIN_TRANSFER_STRIDE; // subdivide range
6 //nextTab初始化,CAS保证了只会有一个线程执行这里的代码
7 if (nextTab == null) {
8 try {
9 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
10 nextTab = nt;
11 } catch (Throwable ex) { // try to cope with OOME
12 sizeCtl = Integer.MAX_VALUE;
13 return;
14 }
15 nextTable = nextTab;
16 transferIndex = n;
17 }
18 int nextn = nextTab.length;
19 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
20 boolean advance = true;
21 boolean finishing = false; // to ensure sweep before committing nextTab
22 for (int i = 0, bound = 0;;) {
23 Node<K,V> f; int fh;
24 while (advance) {
25 int nextIndex, nextBound;
26 //分片的最大下标i实际上就是在这里完成减一的,因为从下面可以看出,每处理完一个桶位便将advance设为true //从而便又进入了内层循环,但是注意,当最后一次(即bound)处理完成时,i会被再次减一,从而导致进入下面的 //分支再次读取transferIndex,这就说明了转移线程会在转移完一个分片后继续尝试剩余的分片(桶位)
27 if (--i >= bound || finishing)
28 advance = false;
29 else if ((nextIndex = transferIndex) <= 0) {
30 //所有bin均转移完毕
31 i = -1;
32 advance = false;
33 }
34 //申请分片
35 else if (U.compareAndSwapInt
36 (this, TRANSFERINDEX, nextIndex,
37 nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
38 //bound表示此分片的截止(最小)下标
39 bound = nextBound;
40 //i表示此分片的最大下标
41 i = nextIndex - 1;
42 //advance意为前进,跳出内层循环
43 advance = false;
44 }
45 }
46 if (i < 0 || i >= n || i + n >= nextn) {
47 //进入到这里就意味着所有的桶位都已被处理完毕或是被包含在某个转移线程的申请分片中(即待转移)
48 int sc;
49 if (finishing) {
50 //进行收尾工作,此工作一定是由最后一个分片申请线程进行的,这里用volatile写将nextTable置为null
51 //,table指向新数组
52 nextTable = null;
53 table = nextTab;
54 //sizeCtl设为新数组大小的3 / 4
55 sizeCtl = (n << 1) - (n >>> 1);
56 return;
57 }
58 //转移线程开始转移之前会将sizeCtl自增,转移完成之后自减,所以判断转移是否已经完成的方式便是sizeCtl是 //否等于初始值
59 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
60 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
61 //还有其它线程尚未转移完成,直接退出,将收尾工作交给最后完成的那个线程
62 return;
63 //进行到这里就说明当前线程为最后一个完成的线程,有意思的是这里又将advance置为true且i置为n(原)
64 //数组的大小,作用就是最后再全部扫描一遍所有的桶位,看是否还有漏网之鱼
65 finishing = advance = true;
66 i = n;
67 }
68 }
69 else if ((f = tabAt(tab, i)) == null)
70 //2.
71 advance = casTabAt(tab, i, null, fwd);
72 else if ((fh = f.hash) == MOVED)
73 advance = true; // already processed
74 else {
75 synchronized (f) {
76 //3. 转移算法
77 //双重检查
78 if (tabAt(tab, i) == f) {
79 Node<K,V> ln, hn;
80 if (fh >= 0) {
81 //runBit代表了当前桶位是否需要移动
82 int runBit = fh & n;
83 Node<K,V> lastRun = f;
84 //这里是找出最后一个和头结点的移动属性相同的
85 for (Node<K,V> p = f.next; p != null; p = p.next) {
86 int b = p.hash & n;
87 if (b != runBit) {
88 runBit = b;
89 lastRun = p;
90 }
91 }
92 if (runBit == 0) {
93 ln = lastRun;
94 hn = null;
95 }
96 else {
97 hn = lastRun;
98 ln = null;
99 }
100 //构造无需移动和需要移动的链表
101 for (Node<K,V> p = f; p != lastRun; p = p.next) {
102 int ph = p.hash; K pk = p.key; V pv = p.val;
103 if ((ph & n) == 0)
104 ln = new Node<K,V>(ph, pk, pv, ln);
105 else
106 hn = new Node<K,V>(ph, pk, pv, hn);
107 }
108 //设置到新数组
109 setTabAt(nextTab, i, ln);
110 setTabAt(nextTab, i + n, hn);
111 //将原数组的当前桶位设为MOVED,即已处理完(转移)
112 setTabAt(tab, i, fwd);
113 advance = true;
114 }
115 else if (f instanceof TreeBin) {
116 TreeBin<K,V> t = (TreeBin<K,V>)f;
117 TreeNode<K,V> lo = null, loTail = null;
118 TreeNode<K,V> hi = null, hiTail = null;
119 int lc = 0, hc = 0;
120 for (Node<K,V> e = t.first; e != null; e = e.next) {
121 int h = e.hash;
122 TreeNode<K,V> p = new TreeNode<K,V>
123 (h, e.key, e.val, null, null);
124 if ((h & n) == 0) {
125 if ((p.prev = loTail) == null)
126 lo = p;
127 else
128 loTail.next = p;
129 loTail = p;
130 ++lc;
131 }
132 else {
133 if ((p.prev = hiTail) == null)
134 hi = p;
135 else
136 hiTail.next = p;
137 hiTail = p;
138 ++hc;
139 }
140 }
141 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
142 (hc != 0) ? new TreeBin<K,V>(lo) : t;
143 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
144 (lc != 0) ? new TreeBin<K,V>(hi) : t;
145 setTabAt(nextTab, i, ln);
146 setTabAt(nextTab, i + n, hn);
147 setTabAt(tab, i, fwd);
148 advance = true;
149 }
150 }
151 }
152 }
153 }
154}
分片
每个线程针对一个分片来进行转移操作,所谓的一个分片其实就是bin数组的一段。默认的最小分片大小为16,如果所在机器 只有一个CPU核心,那么就取16,否则取(数组大小 / 8 / CPU核心数)与16的较大者。
transferIndex
全局变量transferIndex表示低于此值的bin尚未被转移,分片的申请便是通过对此变量的CAS操作来完成,初始值为原数组大小,减为0表示 所有桶位均已转移完毕。
ForwardingNode
从transfer方法的源码可以看出,当一个桶位(原数组)处理完时,会将其头结点设置一个ForwardingNode。简略版源码:
1static final class ForwardingNode<K,V> extends Node<K,V> {
2 final Node<K,V>[] nextTable;
3 ForwardingNode(Node<K,V>[] tab) {
4 super(MOVED, null, null, null);
5 this.nextTable = tab;
6 }
7}
其哈希值为MOVED。到这里我们便可以理解putVal方法这部分源码的作用了:
1else if ((fh = f.hash) == MOVED)
2 tab = helpTransfer(tab, f);
helpTransfer方法的实现和tryPresize方法的相关代码很像,在此不再赘述。
转移算法
我们还是以链表为例,对于2的整次幂扩容来说,节点的转移其实只有两种情况:
- 无需转移,即扩容前后节点的桶位不变。
- 扩容后的桶位号为扩容前 + 原数组的大小,假设原数组大小为8,扩容后为16,有节点哈希值为11,原先在桶位3,那么扩容后位3 + 8 = 11.
所以关键便在于如何判断是否需要转移。还是以大小8和16为例,8的取余mask为:
0111
而16的mask为:
1111
所以我们只要用哈希值 & 8,判断结果是否为零即可。
红黑树
再来回顾一下treeifyBin方法的相关源码:
1else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
2 synchronized (b) {
3 //双重检查
4 if (tabAt(tab, index) == b) {
5 TreeNode<K,V> hd = null, tl = null;
6 for (Node<K,V> e = b; e != null; e = e.next) {
7 TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
8 if ((p.prev = tl) == null)
9 hd = p;
10 else
11 tl.next = p;
12 tl = p;
13 }
14 setTabAt(tab, index, new TreeBin<K,V>(hd));
15 }
16 }
17}
可见,向红黑树的转换是在锁的保护下进行的,通过一个for循环将所有的节点以TreeNode包装起来,注意,在循环里只是通过next属性进行连接,此时实际上还是一个链表形态,而真正的转化是在TreeBin的构造器中完成的。
和ForwardingNode一样,TreeBin同样具有特殊的哈希值:
1static final int TREEBIN = -2;
get
1public V get(Object key) {
2 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
3 int h = spread(key.hashCode());
4 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
5 if ((eh = e.hash) == h) {
6 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
7 //命中头结点
8 return e.val;
9 }
10 else if (eh < 0)
11 return (p = e.find(h, key)) != null ? p.val : null;
12 while ((e = e.next) != null) {
13 //遍历当前桶位的节点链表
14 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
15 return e.val;
16 }
17 }
18 return null;
19}
有意思的在于第二个分支,即哈希值小于零。从上面put方法部分可以得知,共有两种情况节点的哈希值小于0:
- ForwardingNode,已被转移。
- TreeBin,红黑树节点。
ForwardingNode
find方法源码:
1Node<K,V> find(int h, Object k) {
2 outer: for (Node<K,V>[] tab = nextTable;;) {
3 Node<K,V> e; int n;
4 if (k == null || tab == null || (n = tab.length) == 0 ||
5 (e = tabAt(tab, (n - 1) & h)) == null)
6 return null;
7 for (;;) {
8 int eh; K ek;
9 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
10 return e;
11 if (eh < 0) {
12 if (e instanceof ForwardingNode) {
13 //跳转到nextTable搜索
14 tab = ((ForwardingNode<K,V>)e).nextTable;
15 continue outer;
16 }
17 else
18 //红黑树
19 return e.find(h, k);
20 }
21 if ((e = e.next) == null)
22 return null;
23 }
24 }
25}
红黑树
TreeBin.find:
1final Node<K,V> find(int h, Object k) {
2 if (k != null) {
3 for (Node<K,V> e = first; e != null; ) {
4 int s; K ek;
5 if (((s = lockState) & (WAITER|WRITER)) != 0) {
6 if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
7 return e;
8 e = e.next;
9 }
10 else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) {
11 TreeNode<K,V> r, p;
12 try {
13 p = ((r = root) == null ? null : r.findTreeNode(h, k, null));
14 } finally {
15 Thread w;
16 if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
17 (READER|WAITER) && (w = waiter) != null)
18 LockSupport.unpark(w);
19 }
20 return p;
21 }
22 }
23 }
24 return null;
25}
这里使用了读写锁的方式,而加锁的方式和AQS一个套路。当可以获得读锁时,采用搜索红黑树的方法进行节点搜索,这样时间复杂度是O(LogN),而如果获得读锁失败(即表示当前有其它线程正在改变树的结构,比如进行红黑树的再平衡),那么将采用线性的搜索策略。
为什么可以进行线性搜索呢?因为红黑树的节点TreeNode继承自Node,所以仍然保留有next指针(即线性遍历的能力)。这一点可以从put-转为红黑树-红黑树一节得到反映,线性搜索的线程安全性通过next属性来保证:
1volatile Node<K,V> next;
TreeBin的构造器同样对树的结构进行了改变,ConcurrentHashMap使用volatile读写来保证线程安全的发布。
从读写锁的引入可以看出,ConcurrentHashMap为保证最大程度的并行执行作出的努力。putTreeVal方法只有在更新树的结构时才会动用锁:
1lockRoot();
2try {
3 root = balanceInsertion(root, x);
4} finally {
5 unlockRoot();
6}
除此之外,由于读没有加锁,所以线程可以看到正在进行迁移的桶,但这其实并不会影响正确性,因为迁移是构造了新的链表,并不会影响原有的桶。
计数
在putVal方法的结尾通过调用addCount方法(略去大小检查,扩容部分,这里我们只关心计数)进行计数:
1private final void addCount(long x, int check) {
2 CounterCell[] as; long b, s;
3 if ((as = counterCells) != null ||
4 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
5 CounterCell a; long v; int m;
6 boolean uncontended = true;
7 if (as == null || (m = as.length - 1) < 0 ||
8 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
9 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
10 fullAddCount(x, uncontended);
11 return;
12 }
13 }
14}
计数的关键便是counterCells属性:
1private transient volatile CounterCell[] counterCells;
CounterCell是ConcurrentHashMapd的内部类:
1@sun.misc.Contended static final class CounterCell {
2 volatile long value;
3 CounterCell(long x) { value = x; }
4}
Contended注解的作用是将类的字段以64字节的填充行包围以解决伪共享问题。其实这里的计数方式就是改编自LongAdder,以最大程度地降低CAS失败空转的几率。
条件判断:
1if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
2 //...
3}
非常有意思 ,如果counterCells为null,那么尝试用baseCount进行计数,如果事实上只有一个线程或多个线程单竞争的频率较低,对baseCount的CAS操作并不会失败,所以可以得到结论 : 如果竞争程度较低(没有CAS失败),那么其实用的是volatile变量baseCount来计数,只有当线程竞争严重(出现CAS失败)时才会改用LongAdder的方式。
baseCount声明如下:
1private transient volatile long baseCount;
再来看一下什么条件下会触发fullAddCount方法:
1if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
2 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
3 //...
4}
ThreadLocalRandom.getProbe()的返回值决定了线程和哪一个CounterCell相关联,查看源码可以发现,此方法返回的其实是Thread的下列字段的值:
1@sun.misc.Contended("tlr")
2int threadLocalRandomProbe;
我们暂且不管这个值是怎么算出来,将其当做一个线程唯一的值即可。所以fullAddCount执行的条件是(或):
- CounterCell数组为null。
- CounterCell数组大小为0.
- CounterCell数组线程对应的下标值为null。
- CAS更新线程特定的CounterCell失败。
fullAddCount方法的实现其实和LongAdder的父类Striped64的longAccumulate大体一致:
1private final void fullAddCount(long x, boolean wasUncontended) {
2 int h;
3 if ((h = ThreadLocalRandom.getProbe()) == 0) {
4 ThreadLocalRandom.localInit(); // force initialization
5 h = ThreadLocalRandom.getProbe();
6 wasUncontended = true;
7 }
8 boolean collide = false; // True if last slot nonempty
9 for (;;) {
10 CounterCell[] as; CounterCell a; int n; long v;
11 //1.
12 if ((as = counterCells) != null && (n = as.length) > 0) {
13 if ((a = as[(n - 1) & h]) == null) {
14 if (cellsBusy == 0) { // Try to attach new Cell
15 CounterCell r = new CounterCell(x); // Optimistic create
16 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
17 boolean created = false;
18 try { // Recheck under lock
19 CounterCell[] rs; int m, j;
20 if ((rs = counterCells) != null &&
21 (m = rs.length) > 0 &&
22 rs[j = (m - 1) & h] == null) {
23 rs[j] = r;
24 created = true;
25 }
26 } finally {
27 cellsBusy = 0;
28 }
29 if (created)
30 //新Cell创建成功,退出方法
31 break;
32 continue; // Slot is now non-empty
33 }
34 }
35 collide = false;
36 }
37 else if (!wasUncontended) // CAS already known to fail
38 wasUncontended = true; // Continue after rehash
39 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
40 break;
41 else if (counterCells != as || n >= NCPU)
42 collide = false; // At max size or stale
43 else if (!collide)
44 collide = true;
45 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
46 try {
47 //扩容
48 if (counterCells == as) {// Expand table unless stale
49 CounterCell[] rs = new CounterCell[n << 1];
50 for (int i = 0; i < n; ++i)
51 rs[i] = as[i];
52 counterCells = rs;
53 }
54 } finally {
55 cellsBusy = 0;
56 }
57 collide = false;
58 continue; // Retry with expanded table
59 }
60 //rehash
61 h = ThreadLocalRandom.advanceProbe(h);
62 }
63 //2.
64 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
65 boolean init = false;
66 try {
67 //获得锁之后再次检测是否已被初始化
68 if (counterCells == as) {
69 CounterCell[] rs = new CounterCell[2];
70 rs[h & 1] = new CounterCell(x);
71 counterCells = rs;
72 init = true;
73 }
74 } finally {
75 //锁释放
76 cellsBusy = 0;
77 }
78 if (init)
79 //计数成功,退出方法
80 break;
81 }
82 //3.
83 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
84 break; // Fall back on using base
85 }
86}
从源码中可以看出,在初始情况下probe其实是0的,也就是说在一开始的时候都是更新到第一个cell中的,直到出现CAS失败。
整个方法的逻辑较为复杂,我们按照上面列出的fullAddCount执行条件进行对应说明。
cell数组为null或empty
容易看出,这里对应的是fullAddCount方法的源码2处。cellBusy的定义如下:
1private transient volatile int cellsBusy;
这里其实将其当做锁来使用,即只允许在某一时刻只有一个线程正在进行CounterCell数组的初始化或扩容,其值为1说明有线程正在进行上述操作。
默认创建了大小为2的CounterCell数组。
下标为null或CAS失败
这里便对应源码的1处,各种条件分支不再展开详细描述,注意一下几点:
rehash
当Cell数组不为null和empty时,每次循环便会导致重新哈希值,这样做的目的是用再次生成哈希值的方式降低线程竞争。
最大CounterCell数
取NCPU:
1static final int NCPU = Runtime.getRuntime().availableProcessors();
不过从上面扩容部分源码可以看出,最大值并不一定是NCPU,因为采用的是2倍扩容,准确来说是最小的大于等于NCPU的2的整次幂(初始大小为2)。
注意下面这个分支:
1else if (counterCells != as || n >= NCPU)
2 collide = false;
此分支会将collide置为false,从而致使下次循环else if (!collide)必定得到满足,这也就保证了扩容分支不会被执行。
baseCount分支
还会尝试对此变量进行更新,有意思。
size
1public int size() {
2 long n = sumCount();
3 return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
4}
核心在于sumCount方法:
1final long sumCount() {
2 CounterCell[] as = counterCells; CounterCell a;
3 long sum = baseCount;
4 if (as != null) {
5 for (int i = 0; i < as.length; ++i) {
6 if ((a = as[i]) != null)
7 sum += a.value;
8 }
9 }
10 return sum;
11}
求和的时候带上了baseCount,剩下的就 一目了然了。