歡迎光臨
每天分享高質量文章

淺析幾種執行緒安全模型

多執行緒程式設計一直是老生常談的問題,在Java中,隨著JDK的逐漸發展,JDK提供給我們的併發模型也越來越多,本文摘取三例使用不同原理的模型,分析其大致原理。

COW之CopyOnWriteArrayList

cow是copy-on-write的簡寫,這種模型來源於linux系統fork命令,Java中一種使用cow模型來實現的併發類是CopyOnWriteArrayList。相比於Vector,它的讀操作是無需加鎖的:

public E get(int index) {

     return (E) elements[index];

}

之所以有如此神奇功效,其採取的是空間換取時間的方法,檢視其add方法:

public synchronized boolean add(E e) {

     Object[] newElements = new Object[elements.length + 1];

     System.arraycopy(elements, 0, newElements, 0, elements.length);

     newElements[elements.length] = e;

     elements = newElements;

     return true;

}

我們註意到,CopyOnWriteArrayList的add方法是需要加鎖的,但其內部並沒有直接對elements陣列做操作,而是先copy一份當前的資料到一個新的陣列,然後對新的陣列進行賦值操作。這樣做就讓get操作從同步中解脫出來。因為更改的資料並沒有發生在get所需的陣列中。而是放生在新生成的副本中,所以不需要同步。但應該註意的是,儘管如此,get操作還是可能會讀取到臟資料的。

CopyOnWriteArrayList的另一特點是允許多執行緒遍歷,且其它執行緒更改資料並不會導致遍歷執行緒丟擲ConcurrentModificationException 異常,來看下iterator(),

public Iterator iterator() {

     Object[] snapshot = elements;

     return new CowIterator(snapshot, 0, snapshot.length);

}

這個CowIterator 是 ListIterator的子類,這個Iterator的特點是它並不支援對資料的更改操作:

public void add(E object) {

     throw new UnsupportedOperationException();

}

public void remove() {

    throw new UnsupportedOperationException();

}

public void set(E object) {

    throw new UnsupportedOperationException();

}

這樣做的原因也很容易理解,我們可以簡單地的認為CowIterator中的snapshot是不可變陣列,因為list中有資料更新都會生成新陣列,而不會改變snapshot, 所以此時Iterator沒辦法再將更改的資料寫回list了。同理,list資料有更新也不會反映在CowIterator中。CowIterator只是保證其迭代過程不會發生異常。

CAS之ConcurrentHashMap(JDK1.8)

CAS是Compare and Swap的簡寫,即比較與替換,CAS造作將比較和替換封裝為一組原子操作,不會被外部打斷。這種原子操作的保證往往由處理器層面提供支援。

在Java中有一個非常神奇的Unsafe類來對CAS提供語言層面的介面。但類如其名,此等神器如果使用不當,會造成武功盡失的,所以Unsafe不對外開放,想使用的話需要透過反射等技巧。這裡不對其做展開。介紹它的原因是因為它是JDK1.8中ConcurrentHashMap的實現基礎。

ConcurrentHashMap與HashMap對資料的儲存有著相似的地方,都採用陣列+連結串列+紅黑樹的方式。基本邏輯是內部使用Node來儲存map中的一項key, value結構,對於hash不衝突的key,使用陣列來儲存Node資料,而每一項Node都是一個連結串列,用來儲存hash衝突的Node,當連結串列的大小達到一定程度會轉為紅黑樹,這樣會使在衝突資料較多時也會有比較好的查詢效率。

瞭解了ConcurrentHashMap的儲存結構後,我們來看下在這種結構下,ConcurrentHashMap是如何實現高效的併發操作,這得益於ConcurrentHashMap中的如下三個函式。

static final Node tabAt(Node[] tab, int i) {

    return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);

}

static final boolean casTabAt(Node[] tab, int i,

                                    Node c, Node v) {

    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);

}

static final void setTabAt(Node[] tab, int i, Node v) {

    U.putOrderedObject(tab, ((long)i << ASHIFT) + ABASE, v);

}

其中的U就是我們前文提到的Unsafe的一個實體,這三個函式都透過Unsafe的幾個方法保證了是原子性:

  • tabAt作用是傳回tab陣列第i項
  • casTabAt函式是對比tab第i項是否與c相等,相等的話將其設定為v。
  • setTabAt將tab的第i項設定為v

有了這三個函式就可以保證ConcurrentHashMap的執行緒安全嗎?並不是的,ConcurrentHashMap內部也使用比較多的synchronized,不過與HashTable這種對所有操作都使用synchronized不同,ConcurrentHashMap只在特定的情況下使用synchronized,來較少鎖的定的區域。來看下putVal方法(精簡版):

final V putVal(K key, V value, boolean onlyIfAbsent) {

    if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode());

    int binCount = 0;

    for (Node[] tab = table;;) {

        Node f; int n, i, fh;

        if (tab == null || (n = tab.length) == 0)

            tab = initTable();

        else if ((f = tabAt(tab, i = (n – 1) & hash)) == null) {

            if (casTabAt(tab, i, null,

                         new Node(hash, key, value, null)))

                    break;                   // no lock when adding to embin

        }

        else if ((fh = f.hash) == MOVED)

            tab = helpTransfer(tab, f);

        else {

            V oldVal = null;

            synchronized (f) {

                    ….

            }

        }

    }

    addCount(1L, binCount);

    return null;

}

整個put流程大致如下:

  • 判斷key與value是否為空,為空拋異常
  • 計算kek的hash值,然後進入死迴圈,一般來講,caw演演算法與死迴圈是搭檔。
  • 判斷table是否初始化,未初始化進行初始化操作
  • Node在table中的標的位置是否為空,為空的話使用caw操作進行賦值,當然,這種賦值是有可能失敗的,所以前面的死迴圈發揮了重試的作用。
  • 如果當前正在擴容,則嘗試協助其擴容,死迴圈再次發揮了重試的作用,有趣的是ConcurrentHashMap是可以多執行緒同時擴容的。這裡說協助的原因在於,對於陣列擴容,一般分為兩步:1.新建一個更大的陣列;2.將原陣列資料copy到新陣列中。對於第一步,ConcurrentHashMap透過CAW來控制一個int變數保證新建陣列這一步只會執行一次。對於第二步,ConcurrentHashMap採用CAW + synchronized + 移動後標記 的方式來達到多執行緒擴容的目的。感興趣可以檢視transfer函式。
  • 最後的一個else分支,黑科技的流程已嘗試無效,標的Node已經存在值,只能鎖住當前Node來進行put操作,當然,這裡省略了很多程式碼,包括連結串列轉紅黑樹的操作等等。

相比於put,get的程式碼更好理解一下:

public V get(Object key) {

    Node[] tab; Node e, p; int n, eh; K ek;

    int h = spread(key.hashCode());

    if ((tab = table) != null && (n = tab.length) > 0 &&

        (e = tabAt(tab, (n – 1) & h)) != null) {

        if ((eh = e.hash) == h) {

            if ((ek = e.key) == key || (ek != null && key.equals(ek)))

                return e.val;

        }

        else if (eh < 0)

            return (p = e.find(h, key)) != null ? p.val : null;

        while ((e = e.next) != null) {

            if (e.hash == h &&

                ((ek = e.key) == key || (ek != null && key.equals(ek))))

                return e.val;

        }

    }

    return null;

}

  • 檢查表是否為空
  • 獲取key的hash h,獲取key在table中對應的Node e
  • 判斷Node e的第一項是否與預期的Node相等,相等話, 則傳回e.val
  • 如果e.hash < 0, 說明e為紅黑樹,呼叫e的find介面來進行查詢。
  • 走到這一步,e為連結串列無疑,且第一項不是需要查詢的資料,一直呼叫next來進行查詢即可。

讀寫分離之LinkedBlockingQueue

還有一種實現執行緒安全的方式是透過將讀寫進行分離,這種方式的一種實現是LinkedBlockingQueue。LinkedBlockingQueue整體設計的也十分精巧,它的全域性變數分為三類:

  • final 型
  • Atomic 型
  • 普通變數

final型變數由於宣告後就不會被修改,所以自然執行緒安全,Atomic型內部採用了cas模型來保證執行緒安全。對於普通型變數,LinkedBlockingQueue中只包含head與last兩個表示佇列的頭與尾。並且私有,外部無法更改,所以,LinkedBlockingQueue只需要保證head與last的安全即可保證真個佇列的執行緒安全。並且LinkedBlockingQueue屬於FIFO型佇列,一般情況下,讀寫會在不同元素上工作,所以, LinkedBlockingQueue定義了兩個可重入鎖,巧妙的透過對head與last分別加鎖,實現讀寫分離,來實現良好的安全併發特性:

/** Lock held by take, poll, etc */

private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */

private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */

private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */

private final Condition notFull = putLock.newCondition();

首先看下它的offer 方法:

public boolean offer(E e) {

    if (e == null) throw new NullPointerException();

    final AtomicInteger count = this.count;

    if (count.get() == capacity)

        return false;

    int c = -1;

    Node node = new Node(e);

    final ReentrantLock putLock = this.putLock;

    putLock.lock();

    try {

        if (count.get() < capacity) {

            enqueue(node);

            c = count.getAndIncrement();

            if (c + 1 < capacity)

                notFull.signal();

        }

    } finally {

        putLock.unlock();

    }

    if (c == 0)

        signalNotEmpty();

    return c >= 0;

}

可見,在對佇列進行新增元素時,只需要對putLock進行加鎖即可,保證同一時刻只有一個執行緒可以對last進行插入。同樣的,在從佇列進行提取元素時,也只需要獲取takeLock鎖來對head操作即可:

public E poll() {

    final AtomicInteger count = this.count;

    if (count.get() == 0)

        return null;

    E x = null;

    int c = -1;

    final ReentrantLock takeLock = this.takeLock;

    takeLock.lock();

    try {

        if (count.get() > 0) {

            x = dequeue();

            c = count.getAndDecrement();

            if (c > 1)

                notEmpty.signal();

        }

    } finally {

        takeLock.unlock();

    }

    if (c == capacity)

        signalNotFull();

    return x;

}

LinkedBlockingQueue整體還是比較好理解的,但有幾個點需要特殊註意:

  • LinkedBlockingQueue是一個阻塞佇列,當佇列無元素為空時,所有取元素的執行緒會透過notEmpty 的await()方法進行等待,直到再次有資料enqueue時,notEmpty發出signal訊號。對於佇列達到上限時也是同理。
  • 對於remove,contains,toArray, toString, clear之類方法,會呼叫fullyLock方法,來同時獲取讀寫鎖。但對於size方法,由於佇列內部維護了AtomicInteger型別的count變數,是不需要加鎖進行獲取的。
贊(0)

分享創造快樂