Java 併發包中原子操作類原理剖析

原子變量操作類

遞增和遞減操作代碼

 1public final long getAndIncrement() {
 2    return unsafe.getAndAddLong(this, valueOffset, 1L);
 3}
 4
 5public final long getAndDecrement() {
 6    return unsafe.getAndAddLong(this, valueOffset, -1L);
 7}
 8
 9public final long incrementAndGet() {
10    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
11}
12
13public final long decrementAndGet() {
14    return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
15}
16
17
 1public final long getAndAddLong(Object var1, long var2, long var4) {
 2    long var6;
 3    //CAS操作設置var1對象偏移爲var2處的值增加var4
 4    do {
 5        var6 = this.getLongVolatile(var1, var2);
 6    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
 7
 8    return var6;
 9}
10
11

compareAndSet 方法

1public final boolean compareAndSet(long expect, long update) {
2    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
3}
4
5

可見,內部還是調用了 Unsafe 類中的 CAS 方法。

AtomicLong 使用示例

 1public class AtomicLongDemo {
 2    private static AtomicLong al = new AtomicLong(0);
 3
 4    public static long addNext() {
 5        return al.getAndIncrement();
 6    }
 7
 8    public static void main(String[] args) {
 9        for (int i = 0; i < 100; i++) {
10            new Thread() {
11                @Override
12                public void run() {
13                    AtomicLongDemo.addNext();
14                }
15            }.start();
16        }
17
18        // 等待線程運行完
19        try {
20            TimeUnit.SECONDS.sleep(1);
21        } catch (InterruptedException e) {
22            e.printStackTrace();
23        }
24
25        System.out.println("final result is " + AtomicLongDemo.addNext());
26    }
27}
28
29

AtomicLong 使用 CAS 非阻塞算法,性能比使用 synchronized 等的阻塞算法實現同步好很多。但在高併發下,大量線程會同時去競爭更新同一個原子變量,由於同時只有一個線程的 CAS 會成功,會造成大量的自旋嘗試,十分浪費 CPU 資源。因此,JDK8 中新增了原子操作類 LongAdder。

JDK8 中新增的原子操作類 LongAdder

由上可知,AtomicLong 的性能瓶頸是多個線程同時去競爭一個變量的更新權導致的。而 LongAdder 通過將一個變量分解成多個變量,讓同樣多的線程去競爭多個資源解決了此問題。

原理

如圖,LongAdder 內部維護了多個 Cell,每個 Cell 內部有一個初始值爲 0 的 long 類型變量,這樣,在同等併發下,對單個變量的爭奪會變少。此外,多個線程爭奪同一個變量失敗時,會到另一個 Cell 上去嘗試,增加了重試成功的可能性。當 LongAdder 要獲取當前值時,將所有 Cell 的值於 base 相加返回即可。

LongAdder 維護了一個初始值爲 null 的 Cell 數組和一個基值變量 base。當一開始 Cell 數組爲空且併發線程較少時,僅使用 base 進行累加。當併發增大時,會動態地增加 Cell 數組的容量。

Cell 類中使用了 @sun.misc.Contented 註解進行了字節填充,解決了由於連續分佈於數組中且被多個線程操作可能造成的僞共享問題 (關於僞共享,可查看《僞共享(false sharing),併發編程無聲的性能殺手》這篇文章)。

源碼分析

先看 LongAdder 的定義

1public class LongAdder extends Striped64 implements Serializable
2
3

Striped64 類中有如下三個變量:

1transient volatile Cell[] cells;
2
3transient volatile long base;
4
5transient volatile int cellsBusy;
6
7

cellsBusy 用於實現自旋鎖,狀態值只有 0 和 1,當創建 Cell 元素、擴容 Cell 數組或初始化 Cell 數組時,使用 CAS 操作該變量來保證同時只有一個變量可以進行其中之一的操作。

下面看 Cell 的定義:

 1@sun.misc.Contended static final class Cell {
 2    volatile long value;
 3    Cell(long x) { value = x; }
 4    final boolean cas(long cmp, long val) {
 5        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
 6    }
 7
 8    // Unsafe mechanics
 9    private static final sun.misc.Unsafe UNSAFE;
10    private static final long valueOffset;
11    static {
12        try {
13            UNSAFE = sun.misc.Unsafe.getUnsafe();
14            Class<?> ak = Cell.class;
15            valueOffset = UNSAFE.objectFieldOffset
16                (ak.getDeclaredField("value"));
17        } catch (Exception e) {
18            throw new Error(e);
19        }
20    }
21}
22
23

將 value 聲明僞 volatile 確保了內存可見性,CAS 操作保證了 value 值的原子性,@sun.misc.Contented 註解的使用解決了僞共享問題。

下面來看 LongAdder 中的幾個方法:

 1public long sum() {
 2    Cell[] as = cells; Cell a;
 3    long sum = base;
 4    if (as != null) {
 5        for (int i = 0; i < as.length; ++i) {
 6            if ((a = as[i]) != null)
 7                sum += a.value;
 8        }
 9    }
10    return sum;
11}
12
13

sum 的結果並非一個精確值,因爲計算總和時並沒有對 Cell 數組加鎖,累加過程中 Cell 的值可能被更改。

 1public void reset() {
 2    Cell[] as = cells; Cell a;
 3    base = 0L;
 4    if (as != null) {
 5        for (int i = 0; i < as.length; ++i) {
 6            if ((a = as[i]) != null)
 7                a.value = 0L;
 8        }
 9    }
10}
11
12

reset 非常簡單,將 base 和 Cell 數組中非空元素的值置爲 0.

 1public long sumThenReset() {
 2    Cell[] as = cells; Cell a;
 3    long sum = base;
 4    base = 0L;
 5    if (as != null) {
 6        for (int i = 0; i < as.length; ++i) {
 7            if ((a = as[i]) != null) {
 8                sum += a.value;
 9                a.value = 0L;
10            }
11        }
12    }
13    return sum;
14}
15
16

sumThenReset 同樣非常簡單,將某個 Cell 的值加到 sum 中後隨即重置。

 1public void add(long x) {
 2    Cell[] as; long b, v; int m; Cell a;
 3    // 判斷cells是否爲空,如果不爲空則直接進入內層判斷,
 4    // 否則嘗試通過CAS在base上進行add操作,若CAS成功則結束,否則進入內層
 5    if ((as = cells) != null || !casBase(b = base, b + x)) {
 6        // 記錄cell上的CAS操作是否失敗
 7        boolean uncontended = true;
 8        if (as == null || (m = as.length - 1) < 0 ||
 9            // 計算當前線程應該訪問cells數組的哪個元素
10            (a = as[getProbe() & m]) == null ||
11            // 嘗試通過CAS操作在對應cell上add
12            !(uncontended = a.cas(v = a.value, v + x)))
13            longAccumulate(x, null, uncontended);
14    }
15}
16
17

add 方法會判斷 cells 數組是否爲空,非空則進入內層,否則嘗試直接通過 CAS 操作在 base 上進行 add。內層代碼中,聲明瞭一個 uncontented 變量來記錄調用 longAccumulate 方法前在相應 cell 上是否進行了失敗的 CAS 操作。

下面重點來看 longAccumelate 方法:

longAccumulate 時 Striped64 類中定義的,其中包含了初始化 cells 數組,改變 cells 數組長度,新建 cell 等邏輯。

 1final void longAccumulate(long x, LongBinaryOperator fn,
 2                              boolean wasUncontended) {
 3    int h;
 4    if ((h = getProbe()) == 0) {
 5        ThreadLocalRandom.current(); // 初始化當前線程的probe,以便於找到線程對應的cell
 6        h = getProbe();
 7        wasUncontended = true; // 標記執行longAccumulate前對相應cell的CAS操作是否失敗,失敗爲false
 8    }
 9    boolean collide = false; // 是否衝突,如果當前線程嘗試訪問的cell元素與其他線程衝突,則爲true
10    for (;;) {
11        Cell[] as; Cell a; int n; long v;
12        // 當前cells不爲空且元素個數大於0則進入內層,否則嘗試初始化
13        if ((as = cells) != null && (n = as.length) > 0) {
14            if ((a = as[(n - 1) & h]) == null) {
15                if (cellsBusy == 0) {       // 嘗試添加新的cell
16                    Cell r = new Cell(x);
17                    if (cellsBusy == 0 && casCellsBusy()) {
18                        boolean created = false;
19                        try {               // Recheck under lock
20                            Cell[] rs; int m, j;
21                            if ((rs = cells) != null &&
22                                (m = rs.length) > 0 &&
23                                rs[j = (m - 1) & h] == null) {
24                                rs[j] = r;
25                                created = true;
26                            }
27                        } finally {
28                            cellsBusy = 0;
29                        }
30                        if (created)
31                            break;
32                        continue;
33                    }
34                }
35                collide = false;
36            }
37            else if (!wasUncontended)  // 如果已經進行了失敗的CAS操作
38                wasUncontended = true; // 則不調用下面的a.cas()函數(反正肯定是失敗的),而是重新計算probe值來嘗試
39            else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
40                break;
41            else if (n >= NCPU || cells != as)
42                collide = false; // 如果當前cells長度大於CPU個數則不進行擴容,因爲每個cell都使用一個CPU處理時性能纔是最高的
43                                 // 如果當前cells已經過時(其他線程對cells執行了擴容操作,改變了cells指向),也不會擴容
44            else if (!collide)
45                collide = true;  // 執行到此處說明a.cas()執行失敗,即有衝突,將collide置爲true,
46                                 // 跳過擴容階段,重新獲取probe,到cells不同位置嘗試cas,再次失敗則擴容
47            // 擴容
48            else if (cellsBusy == 0 && casCellsBusy()) {
49                try {
50                    if (cells == as) {      
51                        Cell[] rs = new Cell[n << 1];
52                        for (int i = 0; i < n; ++i)
53                            rs[i] = as[i];
54                        cells = rs;
55                    }
56                } finally {
57                    cellsBusy = 0;
58                }
59                collide = false;
60                continue; // 擴容後再次嘗試(擴容後cells長度改變,
61                          // 根據(n - 1) & h計算當前線程在cells中對應元素下標會變化,減少再次衝突的可能性)
62            }
63            h = advanceProbe(h); // 重新計算線程probe,減小下次訪問cells元素時的衝突機會
64        }
65        // 初始化cells數組
66        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
67            boolean init = false;
68            try {                           
69                if (cells == as) {
70                    Cell[] rs = new Cell[2];
71                    rs[h & 1] = new Cell(x);
72                    cells = rs;
73                    init = true;
74                }
75            } finally {
76                cellsBusy = 0;
77            }
78            if (init)
79                break;
80        }
81        // 嘗試通過base的CAS操作進行add,成功則結束當前函數,否則再次循環
82        else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
83            break; 
84    }
85}
86
87

代碼比較複雜,細節的解釋都寫在註釋中了。大體邏輯就是判斷 cells 是否爲空或者長度爲 0:如果空或者長度爲 0 則嘗試進行 cells 數組初始化,初始化失敗的話則嘗試通過 CAS 操作在 base 上進行 add,仍然失敗則重走一次流程;如果 cells 不爲空且長度大於 0,則獲取當前線程對應於 cells 中的元素,如果該元素爲 null 則嘗試創建,否則嘗試通過 CAS 操作在上面進行 add,仍失敗則擴容。

LongAccumulator

LongAdder 是 LongAccumulator 的特例,兩者都繼承自 Striped64。

看如下代碼:

 1public LongAccumulator(LongBinaryOperator accumulatorFunction,
 2                        long identity) {
 3    this.function = accumulatorFunction;
 4    base = this.identity = identity;
 5}
 6
 7public interface LongBinaryOperator {
 8    long applyAsLong(long left, long right);
 9}
10
11

LongAccumulator 構造器允許傳入一個雙目運算符接口用於自定義加法規則,還允許傳入一個初始值。

自定義的加法函數是如何被應用的呢?以上提到的 longAccumulate() 方法中有如下代碼:

1a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x)))
2
3

LongAdder 的 add() 方法中調用 longAccumulate() 方法時傳入的是 null,而 LongAccumulator 的 accumulate() 方法傳入的是 this.function,即自定義的加法函數。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ocGITPHjXNERTmju2KjFVQ