歡迎光臨
每天分享高質量文章

Map 大家族的那點事兒 ( 7 ) :ConcurrentHashMap ( 下 )

(點擊上方公眾號,可快速關註)


來源:SylvanasSun’s Blog ,

sylvanassun.github.io/2018/03/16/2018-03-16-map_family/

接上文《 Map 大家族的那點事兒 ( 7 ) :ConcurrentHashMap ( 上 ) 》

計數

在Java 7中ConcurrentHashMap對每個Segment單獨計數,想要得到總數就需要獲得所有Segment的鎖,然後進行統計。由於Java 8拋棄了Segment,顯然是不能再這樣做了,而且這種方法雖然簡單準確但也捨棄了性能。

Java 8宣告了一個volatile變數baseCount用於記錄元素的個數,對這個變數的修改操作是基於CAS的,每當插入元素或刪除元素時都會呼叫addCount()函式進行計數。

private transient volatile long baseCount;

private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;

    // 嘗試使用CAS更新baseCount失敗

    // 轉用CounterCells進行更新

    if ((as = counterCells) != null ||

        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

        CounterCell a; long v; int m;

        boolean uncontended = true;

        // 在CounterCells未初始化

        // 或嘗試通過CAS更新當前執行緒的CounterCell失敗時

        // 呼叫fullAddCount(),該函式負責初始化CounterCells和更新計數

        if (as == null || (m = as.length – 1) < 0 ||

            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||

            !(uncontended =

              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

            fullAddCount(x, uncontended);

            return;

        }

        if (check <= 1)

            return;

        // 統計總數

        s = sumCount();

    }

    if (check >= 0) {

        // 判斷是否需要擴容,在上文中已經講過了

    }

}

counterCells是一個元素為CounterCell的陣列,該陣列的大小與當前機器的CPU數量有關,並且它不會被主動初始化,只有在呼叫fullAddCount()函式時才會進行初始化。

CounterCell是一個簡單的內部靜態類,每個CounterCell都是一個用於記錄數量的單元:

/**

 * Table of counter cells. When non-null, size is a power of 2.

 */

private transient volatile CounterCell[] counterCells;

/**

 * A padded cell for distributing counts.  Adapted from LongAdder

 * and Striped64.  See their internal docs for explanation.

 */

@sun.misc.Contended static final class CounterCell {

    volatile long value;

    CounterCell(long x) { value = x; }

}

註解@sun.misc.Contended用於解決偽共享問題。所謂偽共享,即是在同一快取行(CPU快取的基本單位)中儲存了多個變數,當其中一個變數被修改時,就會影響到同一快取行內的其他變數,導致它們也要跟著被標記為失效,其他變數的快取命中率將會受到影響。解決偽共享問題的方法一般是對該變數填充一些無意義的占位資料,從而使它獨享一個快取行。

ConcurrentHashMap的計數設計與LongAdder類似。在一個低併發的情況下,就只是簡單地使用CAS操作來對baseCount進行更新,但只要這個CAS操作失敗一次,就代表有多個執行緒正在競爭,那麼就轉而使用CounterCell陣列進行計數,陣列內的每個ConuterCell都是一個獨立的計數單元。

每個執行緒都會通過ThreadLocalRandom.getProbe() & m尋址找到屬於它的CounterCell,然後進行計數。ThreadLocalRandom是一個執行緒私有的偽隨機數生成器,每個執行緒的probe都是不同的(這點基於ThreadLocalRandom的內部實現,它在內部維護了一個probeGenerator,這是一個型別為AtomicInteger的靜態常量,每當初始化一個ThreadLocalRandom時probeGenerator都會先自增一個常量然後傳回的整數即為當前執行緒的probe,probe變數被維護在Thread物件中),可以認為每個執行緒的probe就是它在CounterCell陣列中的hash code。

這種方法將競爭資料按照執行緒的粒度進行分離,相比所有競爭執行緒對一個共享變數使用CAS不斷嘗試在性能上要效率多了,這也是為什麼在高併發環境下LongAdder要優於AtomicInteger的原因。

fullAddCount()函式根據當前執行緒的probe尋找對應的CounterCell進行計數,如果CounterCell陣列未被初始化,則初始化CounterCell陣列和CounterCell。該函式的實現與Striped64類(LongAdder的父類)的longAccumulate()函式是一樣的,把CounterCell陣列當成一個散串列,每個執行緒的probe就是hash code,散列函式也僅僅是簡單的(n – 1) & probe。

CounterCell陣列的大小永遠是一個2的n次方,初始容量為2,每次擴容的新容量都是之前容量乘以二,處於性能考慮,它的最大容量上限是機器的CPU數量。

所以說CounterCell陣列的碰撞衝突是很嚴重的,因為它的bucket基數太小了。而發生碰撞就代表著一個CounterCell會被多個執行緒競爭,為瞭解決這個問題,Doug Lea使用無限迴圈加上CAS來模擬出一個自旋鎖來保證執行緒安全,自旋鎖的實現基於一個被volatile修飾的整數變數,該變數只會有兩種狀態:0和1,當它被設置為0時表示沒有加鎖,當它被設置為1時表示已被其他執行緒加鎖。這個自旋鎖用於保護初始化CounterCell、初始化CounterCell陣列以及對CounterCell陣列進行擴容時的安全。

CounterCell更新計數是依賴於CAS的,每次迴圈都會嘗試通過CAS進行更新,如果成功就退出無限迴圈,否則就呼叫ThreadLocalRandom.advanceProbe()函式為當前執行緒更新probe,然後重新開始迴圈,以期望下一次尋址到的CounterCell沒有被其他執行緒競爭。

如果連著兩次CAS更新都沒有成功,那麼會對CounterCell陣列進行一次擴容,這個擴容操作只會在當前迴圈中觸發一次,而且只能在容量小於上限時觸發。

fullAddCount()函式的主要流程如下:

  • 首先檢查當前執行緒有沒有初始化過ThreadLocalRandom,如果沒有則進行初始化。ThreadLocalRandom負責更新執行緒的probe,而probe又是在陣列中進行尋址的關鍵。

  • 檢查CounterCell陣列是否已經初始化,如果已初始化,那麼就根據probe找到對應的CounterCell。

  • 如果這個CounterCell等於null,需要先初始化CounterCell,通過把計數增量傳入建構式,所以初始化只要成功就說明更新計數已經完成了。初始化的過程需要獲取自旋鎖。

  • 如果不為null,就按上文所說的邏輯對CounterCell實施更新計數。

  • CounterCell陣列未被初始化,嘗試獲取自旋鎖,進行初始化。陣列初始化的過程會附帶初始化一個CounterCell來記錄計數增量,所以只要初始化成功就表示更新計數完成。

  • 如果自旋鎖被其他執行緒占用,無法進行陣列的初始化,只好通過CAS更新baseCount。

private final void fullAddCount(long x, boolean wasUncontended) {

    int h;

    // 當前執行緒的probe等於0,證明該執行緒的ThreadLocalRandom還未被初始化

    // 以及當前執行緒是第一次進入該函式

    if ((h = ThreadLocalRandom.getProbe()) == 0) {

        // 初始化ThreadLocalRandom,當前執行緒會被設置一個probe

        ThreadLocalRandom.localInit();      // force initialization

        // probe用於在CounterCell陣列中尋址

        h = ThreadLocalRandom.getProbe();

        // 未競爭標誌

        wasUncontended = true;

    }

    // 衝突標誌

    boolean collide = false;                // True if last slot nonempty

    for (;;) {

        CounterCell[] as; CounterCell a; int n; long v;

        // CounterCell陣列已初始化

        if ((as = counterCells) != null && (n = as.length) > 0) {

            // 如果尋址到的Cell為空,那麼創建一個新的Cell

            if ((a = as[(n – 1) & h]) == null) {

                // cellsBusy是一個只有0和1兩個狀態的volatile整數

                // 它被當做一個自旋鎖,0代表無鎖,1代表加鎖

                if (cellsBusy == 0) {            // Try to attach new Cell

                    // 將傳入的x作為初始值創建一個新的CounterCell

                    CounterCell r = new CounterCell(x); // Optimistic create

                    // 通過CAS嘗試對自旋鎖加鎖

                    if (cellsBusy == 0 &&

                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

                        // 加鎖成功,宣告Cell是否創建成功的標誌

                        boolean created = false;

                        try {               // Recheck under lock

                            CounterCell[] rs; int m, j;

                            // 再次檢查CounterCell陣列是否不為空

                            // 並且尋址到的Cell為空

                            if ((rs = counterCells) != null &&

                                (m = rs.length) > 0 &&

                                rs[j = (m – 1) & h] == null) {

                                // 將之前創建的新Cell放入陣列

                                rs[j] = r;

                                created = true;

                            }

                        } finally {

                            // 釋放鎖

                            cellsBusy = 0;

                        }

                        // 如果已經創建成功,中斷迴圈

                        // 因為新Cell的初始值就是傳入的增量,所以計數已經完畢了

                        if (created)

                            break;

                        // 如果未成功

                        // 代表as[(n – 1) & h]這個位置的Cell已經被其他執行緒設置

                        // 那麼就從迴圈頭重新開始

                        continue;           // Slot is now non-empty

                    }

                }

                collide = false;

            }

            // as[(n – 1) & h]非空

            // 在addCount()函式中通過CAS更新當前執行緒的Cell進行計數失敗

            // 會傳入wasUncontended = false,代表已經有其他執行緒進行競爭

            else if (!wasUncontended)       // CAS already known to fail

                // 設置未競爭標誌,之後會重新計算probe,然後重新執行迴圈

                wasUncontended = true;      // Continue after rehash

            // 嘗試進行計數,如果成功,那麼就退出迴圈

            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))

                break;

            // 嘗試更新失敗,檢查counterCell陣列是否已經擴容

            // 或者容量達到最大值(CPU的數量)

            else if (counterCells != as || n >= NCPU)

                // 設置衝突標誌,防止跳入下麵的擴容分支

                // 之後會重新計算probe

                collide = false;            // At max size or stale

            // 設置衝突標誌,重新執行迴圈

            // 如果下次迴圈執行到該分支,並且衝突標誌仍然為true

            // 那麼會跳過該分支,到下一個分支進行擴容

            else if (!collide)

                collide = true;

            // 嘗試加鎖,然後對counterCells陣列進行擴容

            else if (cellsBusy == 0 &&

                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

                try {

                    // 檢查是否已被擴容

                    if (counterCells == as) {// Expand table unless stale

                        // 新陣列容量為之前的1倍

                        CounterCell[] rs = new CounterCell[n << 1];

                        // 遷移資料到新陣列

                        for (int i = 0; i < n; ++i)

                            rs[i] = as[i];

                        counterCells = rs;

                    }

                } finally {

                    // 釋放鎖

                    cellsBusy = 0;

                }

                collide = false;

                // 重新執行迴圈

                continue;                   // Retry with expanded table

            }

            // 為當前執行緒重新計算probe

            h = ThreadLocalRandom.advanceProbe(h);

        }

        // CounterCell陣列未初始化,嘗試獲取自旋鎖,然後進行初始化

        else if (cellsBusy == 0 && counterCells == as &&

                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

            boolean init = false;

            try {                           // Initialize table

                if (counterCells == as) {

                    // 初始化CounterCell陣列,初始容量為2

                    CounterCell[] rs = new CounterCell[2];

                    // 初始化CounterCell

                    rs[h & 1] = new CounterCell(x);

                    counterCells = rs;

                    init = true;

                }

            } finally {

                cellsBusy = 0;

            }

            // 初始化CounterCell陣列成功,退出迴圈

            if (init)

                break;

        }

        // 如果自旋鎖被占用,則只好嘗試更新baseCount

        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))

            break;                          // Fall back on using base

    }

}

對於統計總數,只要能夠理解CounterCell的思想,就很簡單了。仔細想一想,每次計數的更新都會被分攤在baseCount和CounterCell陣列中的某一CounterCell,想要獲得總數,把它們統計相加就是了。

public int size() {

    long n = sumCount();

    return ((n < 0L) ? 0 :

            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

            (int)n);

}

 final long sumCount() {

    CounterCell[] as = counterCells; CounterCell a;

    long sum = baseCount;

    if (as != null) {

        for (int i = 0; i < as.length; ++i) {

            if ((a = as[i]) != null)

                sum += a.value;

        }

    }

    return sum;

}

其實size()函式傳回的總數可能並不是百分百精確的,試想如果前一個遍歷過的CounterCell又進行了更新會怎麼樣?儘管只是一個估算值,但在大多數場景下都還能接受,而且性能上是要比Java 7好上太多了。

添加元素

添加元素的主要邏輯與HashMap沒什麼區別,有所區別的複雜操作如擴容和計數我們上文都已經深入解析過了,所以整體來說putVal()函式還是比較簡單的,可能唯一需要註意的就是在對節點進行操作的時候需要通過互斥鎖保證執行緒安全,這個互斥鎖的粒度很小,只對需要操作的這個bucket加鎖。

public V put(K key, V value) {

    return putVal(key, value, false);

}

/** Implementation for put and putIfAbsent */

final V putVal(K key, V value, boolean onlyIfAbsent) {

    if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode());

    int binCount = 0; // 節點計數器,用於判斷是否需要樹化

    // 無限迴圈+CAS,無鎖的標準套路

    for (Node[] tab = table;;) {

        Node f; int n, i, fh;

        // 初始化table

        if (tab == null || (n = tab.length) == 0)

            tab = initTable();

        // bucket為null,通過CAS創建頭節點,如果成功就結束迴圈

        else if ((f = tabAt(tab, i = (n – 1) & hash)) == null) {

            if (casTabAt(tab, i, null,

                         new Node(hash, key, value, null)))

                break;                   // no lock when adding to empty bin

        }

        // bucket為ForwardingNode

        // 當前執行緒前去協助進行擴容

        else if ((fh = f.hash) == MOVED)

            tab = helpTransfer(tab, f);

        else {

            V oldVal = null;

            synchronized (f) {

                if (tabAt(tab, i) == f) {

                    // 節點是鏈表

                    if (fh >= 0) {

                        binCount = 1;

                        for (Node e = f;; ++binCount) {

                            K ek;

                            // 找到標的,設置value

                            if (e.hash == hash &&

                                ((ek = e.key) == key ||

                                 (ek != null && key.equals(ek)))) {

                                oldVal = e.val;

                                if (!onlyIfAbsent)

                                    e.val = value;

                                break;

                            }

                            Node pred = e;

                            // 未找到節點,插入新節點到鏈表尾部

                            if ((e = e.next) == null) {

                                pred.next = new Node(hash, key,

                                                          value, null);

                                break;

                            }

                        }

                    }

                    // 節點是紅黑樹

                    else if (f instanceof TreeBin) {

                        Node p;

                        binCount = 2;

                        if ((p = ((TreeBin)f).putTreeVal(hash, key,

                                                       value)) != null) {

                            oldVal = p.val;

                            if (!onlyIfAbsent)

                                p.val = value;

                        }

                    }

                }

            }

            // 根據bucket中的節點數決定是否樹化

            if (binCount != 0) {

                if (binCount >= TREEIFY_THRESHOLD)

                    treeifyBin(tab, i);

                // oldVal不等於null,說明沒有新節點

                // 所以直接傳回,不進行計數

                if (oldVal != null)

                    return oldVal;

                break;

            }

        }

    }

    // 計數

    addCount(1L, binCount);

    return null;

}

至於刪除元素的操作位於函式replaceNode(Object key, V value, Object cv),當table[key].val等於期望值cv時(或cv等於null),更新節點的值為value,如果value等於null,那麼刪除該節點。

remove()函式通過呼叫replaceNode(key, null, null)來達成刪除標的節點的目的,replaceNode()的具體實現與putVal()沒什麼差別,只不過對鏈表的操作有所不同而已,所以就不多敘述了。

並行計算

Java 8除了對ConcurrentHashMap重新設計以外,還引入了基於Lambda運算式的Stream API。它是對集合物件功能上的增強(所以不止ConcurrentHashMap,其他集合也都實現了該API),以一種優雅的方式來批量操作、聚合或遍歷集合中的資料。

最重要的是,它還提供了並行樣式,充分利用了多核CPU的優勢實現並行計算。讓我們看看如下的示例代碼:

public static void main(String[] args) {

    ConcurrentHashMap map = new ConcurrentHashMap<>();

    String keys = “ABCDEFG”;

    for (int i = 1; i <= keys.length(); i++) {

        map.put(String.valueOf(keys.charAt(i – 1)), i);

    }

    map.forEach(2,

            (k, v) -> System.out.println(“key-” + k + “:value-” + v + “. by thread->” + Thread.currentThread().getName()));

}

這段代碼通過兩個執行緒(包括主執行緒)並行地遍歷map中的元素,然後輸出到控制台,輸出如下:

key-A:value-1. by thread->main

key-D:value-4. by thread->ForkJoinPool.commonPool-worker-2

key-B:value-2. by thread->main

key-E:value-5. by thread->ForkJoinPool.commonPool-worker-2

key-C:value-3. by thread->main

key-F:value-6. by thread->ForkJoinPool.commonPool-worker-2

key-G:value-7. by thread->ForkJoinPool.commonPool-worker-2

很明顯,有兩個執行緒在進行工作,那麼這是怎麼實現的呢?我們先來看看forEach()函式:

public void forEach(long parallelismThreshold,

                    BiConsumer super K,? super V> action) {

    if (action == null) throw new NullPointerException();

    new ForEachMappingTask

        (null, batchFor(parallelismThreshold), 0, 0, table,

         action).invoke();

}

parallelismThreshold是需要並行執行該操作的執行緒數量,action則是回呼函式(我們想要執行的操作)。action的型別為BiConsumer,是一個用於支持Lambda運算式的FunctionalInterface,它接受兩個輸入引數並傳回0個結果。

@FunctionalInterface

public interface BiConsumer {

    /**

     * Performs this operation on the given arguments.

     *

     * @param t the first input argument

     * @param u the second input argument

     */

    void accept(T t, U u);

看來實現並行計算的關鍵在於ForEachMappingTask物件,通過它的繼承關係結構圖可以發現,ForEachMappingTask其實就是ForkJoinTask。

集合的並行計算是基於Fork/Join框架實現的,工作執行緒交由ForkJoinPool執行緒池維護。它推崇分而治之的思想,將一個大的任務分解成多個小的任務,通過fork()函式(有點像Linux的fork()系統呼叫來創建子行程)來開啟一個工作執行緒執行其中一個小任務,通過join()函式等待工作執行緒執行完畢(需要等所有工作執行緒執行完畢才能合併最終結果),只要所有的小任務都已經處理完成,就代表這個大的任務也完成了。

像上文中的示例代碼就是將遍歷這個大任務分解成了N個小任務,然後交由兩個工作執行緒進行處理。

static final class ForEachMappingTask

    extends BulkTask {

    final BiConsumer super K, ? super V> action;

    ForEachMappingTask

        (BulkTask p, int b, int i, int f, Node[] t,

         BiConsumer super K,? super V> action) {

        super(p, b, i, f, t);

        this.action = action;

    }

    public final void compute() {

        final BiConsumer super K, ? super V> action;

        if ((action = this.action) != null) {

            for (int i = baseIndex, f, h; batch > 0 &&

                     (h = ((f = baseLimit) + i) >>> 1) > i;) {

                // 記錄待完成任務的數量

                addToPendingCount(1);

                // 開啟一個工作執行緒執行任務

                // 其餘引數是任務的區間以及table和回呼函式

                new ForEachMappingTask

                    (this, batch >>>= 1, baseLimit = h, f, tab,

                     action).fork();

            }

            for (Node p; (p = advance()) != null; )

                // 呼叫回呼函式

                action.accept(p.key, p.val);

            // 與addToPendingCount()相反

            // 它會減少待完成任務的計數器

            // 如果該計數器為0,代表所有任務已經完成了

            propagateCompletion();

        }

    }

}

其他並行計算函式的實現也都差不多,只不過具體的Task實現不同,例如search():

public U search(long parallelismThreshold,

                    BiFunction super K, ? super V, ? extends U> searchFunction) {

    if (searchFunction == null) throw new NullPointerException();

    return new SearchMappingsTask

        (null, batchFor(parallelismThreshold), 0, 0, table,

         searchFunction, new AtomicReference()).invoke();

}

為了節省篇幅(說實話現在似乎很少有人能耐心看完一篇長文(:з」∠)),有關Stream API是如何使用Fork/Join框架進行工作以及實現細節就不多講了,以後有機會再說吧。

參考文獻

  • Associative array – Wikiwand

    http://www.wikiwand.com/en/Associative_array

  • Hash table – Wikiwand

    http://www.wikiwand.com/en/Hash_table

  • Hash function – Wikiwand

    http://www.wikiwand.com/en/Hash_function

  • ConcurrentHashMap (Java Platform SE 8 )

    https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html

  • LongAdder (Java Platform SE 8 )

    https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/LongAdder.html

  • Java Magic. Part 4: sun.misc.Unsafe

    http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/

  • ConcurrentHashMap in Java 8 – DZone Java

    https://dzone.com/articles/concurrenthashmap-in-java8

系列


【關於投稿】


如果大家有原創好文投稿,請直接給公號發送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂