歡迎光臨
每天分享高質量文章

AbstractQueuedSynchronizer 2 :共享樣式與基於 Condition 的等待 / 通知機制實現

(點選上方公眾號,可快速關註)


來源:五月的倉頡 ,

www.cnblogs.com/xrq730/p/7067904.html

共享樣式acquire實現流程

上文我們講解了AbstractQueuedSynchronizer獨佔樣式的acquire實現流程,本文趁熱打鐵繼續看一下AbstractQueuedSynchronizer共享樣式acquire的實現流程。連續兩篇文章的學習,也可以對比獨佔樣式acquire和共享樣式acquire的區別,加深對於AbstractQueuedSynchronizer的理解。

先看一下共享樣式acquire的實現,方法為acquireShared和acquireSharedInterruptibly,兩者差別不大,區別就在於後者有中斷處理,以acquireShared為例:

public final void acquireShared(int arg) {

      if (tryAcquireShared(arg) < 0)

            doAcquireShared(arg);

 }

這裡就能看出第一個差別來了:獨佔樣式acquire的時候子類重寫的方法tryAcquire傳回的是boolean,即是否tryAcquire成功;共享樣式acquire的時候,傳回的是一個int型變數,判斷是否<0。doAcquireShared方法的實現為:

private void doAcquireShared(int arg) {

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();

            if (p == head) {

                int r = tryAcquireShared(arg);

                if (r >= 0) {

                    setHeadAndPropagate(node, r);

                    p.next = null; // help GC

                    if (interrupted)

                        selfInterrupt();

                    failed = false;

                    return;

                }

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

我們來分析一下這段程式碼做了什麼:

  1. addWaiter,把所有tryAcquireShared<0的執行緒實體化出一個Node,構建為一個FIFO佇列,這和獨佔鎖是一樣的

  2. 拿當前節點的前驅節點,只有前驅節點是head的節點才能tryAcquireShared,這和獨佔鎖也是一樣的

  3. 前驅節點不是head的,執行”shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()”,for(;;)迴圈,”shouldParkAfterFailedAcquire()”方法執行2次,當前執行緒阻塞,這和獨佔鎖也是一樣的

確實,共享樣式下的acquire和獨佔樣式下的acquire大部分邏輯差不多,最大的差別在於tryAcquireShared成功之後,獨佔樣式的acquire是直接將當前節點設定為head節點即可,共享樣式會執行setHeadAndPropagate方法,顧名思義,即在設定head之後多執行了一步propagate操作。setHeadAndPropagate方法原始碼為:

private void setHeadAndPropagate(Node node, int propagate) {

    Node h = head; // Record old head for check below

    setHead(node);

    /*

     * Try to signal next queued node if:

     *   Propagation was indicated by caller,

     *     or was recorded (as h.waitStatus) by a previous operation

     *     (note: this uses sign-check of waitStatus because

     *      PROPAGATE status may transition to SIGNAL.)

     * and

     *   The next node is waiting in shared mode,

     *     or we don’t know, because it appears null

     *

     * The conservatism in both of these checks may cause

     * unnecessary wake-ups, but only when there are multiple

     * racing acquires/releases, so most need signals now or soon

     * anyway.

     */

    if (propagate > 0 || h == null || h.waitStatus < 0) {

        Node s = node.next;

        if (s == null || s.isShared())

            doReleaseShared();

    }

}

第3行的程式碼設定重設head,第2行的程式碼由於第3行的程式碼要重設head,因此先定義一個Node型變數h獲得原head的地址,這兩行程式碼很簡單。

第19行~第23行的程式碼是獨佔鎖和共享鎖最不一樣的一個地方,我們再看獨佔鎖acquireQueued的程式碼:

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null; // help GC

                failed = false;

                return interrupted;

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

這意味著獨佔鎖某個節點被喚醒之後,它只需要將這個節點設定成head就完事了,而共享鎖不一樣,某個節點被設定為head之後,如果它的後繼節點是SHARED狀態的,那麼將繼續透過doReleaseShared方法嘗試往後喚醒節點,實現了共享狀態的向後傳播。

共享樣式release實現流程

上面講了共享樣式下acquire是如何實現的,下麵再看一下release的實現流程,方法為releaseShared:

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();

        return true;

    }

    return false;

}

tryReleaseShared方法是子類實現的,如果tryReleaseShared成功,那麼執行doReleaseShared()方法:

private void doReleaseShared() {

    /*

     * Ensure that a release propagates, even if there are other

     * in-progress acquires/releases.  This proceeds in the usual

     * way of trying to unparkSuccessor of head if it needs

     * signal. But if it does not, status is set to PROPAGATE to

     * ensure that upon release, propagation continues.

     * Additionally, we must loop in case a new node is added

     * while we are doing this. Also, unlike other uses of

     * unparkSuccessor, we need to know if CAS to reset status

     * fails, if so rechecking.

     */

    for (;;) {

        Node h = head;

        if (h != null && h != tail) {

            int ws = h.waitStatus;

            if (ws == Node.SIGNAL) {

                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                    continue;            // loop to recheck cases

                unparkSuccessor(h);

            }

            else if (ws == 0 &&

                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                continue;                // loop on failed CAS

        }

        if (h == head)                   // loop if head changed

            break;

    }

}

主要是兩層邏輯:

  1. 頭結點本身的waitStatus是SIGNAL且能透過CAS演演算法將頭結點的waitStatus從SIGNAL設定為0,喚醒頭結點的後繼節點

  2. 頭結點本身的waitStatus是0的話,嘗試將其設定為PROPAGATE狀態的,意味著共享狀態可以向後傳播

Condition的await()方法實現原理—-構建等待佇列

我們知道,Condition是用於實現通知/等待機制的,和Object的wait()/notify()一樣,由於本文之前描述AbstractQueuedSynchronizer的共享樣式的篇幅不是很長,加之Condition也是AbstractQueuedSynchronizer的一部分,因此將Condition也放在這裡寫了。

Condition分為await()和signal()兩部分,前者用於等待、後者用於喚醒,首先看一下await()是如何實現的。Condition本身是一個介面,其在AbstractQueuedSynchronizer中的實現為ConditionObject:

public class ConditionObject implements Condition, java.io.Serializable {

        private static final long serialVersionUID = 1173984872572414699L;

        /** First node of condition queue. */

        private transient Node firstWaiter;

        /** Last node of condition queue. */

        private transient Node lastWaiter;

         

        …

}

這裡貼了一些欄位定義,後面都是方法就不貼了,會對重點方法進行分析的。從欄位定義我們可以看到,ConditionObject全域性性地記錄了第一個等待的節點與最後一個等待的節點。

像ReentrantLock每次要使用ConditionObject,直接new一個ConditionObject出來即可。我們關註一下await()方法的實現:

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

        LockSupport.park(this);

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null) // clean up if cancelled

        unlinkCancelledWaiters();

    if (interruptMode != 0)

        reportInterruptAfterWait(interruptMode);

}

第2行~第3行的程式碼用於處理中斷,第4行程式碼比較關鍵,新增Condition的等待者,看一下實現:

private Node addConditionWaiter() {

    Node t = lastWaiter;

    // If lastWaiter is cancelled, clean out.

    if (t != null && t.waitStatus != Node.CONDITION) {

        unlinkCancelledWaiters();

        t = lastWaiter;

    }

    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    if (t == null)

        firstWaiter = node;

    else

        t.nextWaiter = node;

    lastWaiter = node;

    return node;

}

首先拿到佇列(註意資料結構,Condition構建出來的也是一個佇列)中最後一個等待者,緊接著第4行的的判斷,判斷最後一個等待者的waitStatus不是CONDITION的話,執行第5行的程式碼,解綁取消的等待者,因為透過第8行的程式碼,我們看到,new出來的Node的狀態都是CONDITION的。

那麼unlinkCancelledWaiters做了什麼?裡面的流程就不看了,就是一些指標遍歷並判斷狀態的操作,總結一下就是:從頭到尾遍歷每一個Node,遇到Node的waitStatus不是CONDITION的就從佇列中踢掉,該節點的前後節點相連。

接著第8行的程式碼前面說過了,new出來了一個Node,儲存了當前執行緒,waitStatus是CONDITION,接著第9行~第13行的操作很好理解:

  1. 如果lastWaiter是null,說明FIFO佇列中沒有任何Node,firstWaiter=Node

  2. 如果lastWaiter不是null,說明FIFO佇列中有Node,原lastWaiter的next指向Node

  3. 無論如何,新加入的Node程式設計lastWaiter,即新加入的Node一定是在最後面

用一張圖表示一下構建的資料結構就是:

對比學習,我們總結一下Condition構建出來的佇列和AbstractQueuedSynchronizer構建出來的佇列的差別,主要體現在2點上:

  1. AbstractQueuedSynchronizer構建出來的佇列,頭節點是一個沒有Thread的空節點,其標識作用,而Condition構建出來的佇列,頭節點就是真正等待的節點

  2. AbstractQueuedSynchronizer構建出來的佇列,節點之間有next與pred相互標識該節點的前一個節點與後一個節點的地址,而Condition構建出來的佇列,只使用了nextWaiter標識下一個等待節點的地址

整個過程中,我們看到沒有使用任何CAS操作,firstWaiter和lastWaiter也沒有用volatile修飾,其實原因很簡單:要await()必然要先lock(),既然lock()了就表示沒有競爭,沒有競爭自然也沒必要使用volatile+CAS的機制去保證什麼。

Condition的await()方法實現原理—-執行緒等待

前面我們看了Condition構建等待佇列的過程,接下來我們看一下等待的過程,await()方法的程式碼比較短,再貼一下:

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

        LockSupport.park(this);

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null) // clean up if cancelled

        unlinkCancelledWaiters();

    if (interruptMode != 0)

        reportInterruptAfterWait(interruptMode);

}

構建完畢佇列之後,執行第5行的fullyRelease方法,顧名思義:fullyRelease方法的作用是完全釋放Node的狀態。方法實現為:

final int fullyRelease(Node node) {

    boolean failed = true;

    try {

        int savedState = getState();

        if (release(savedState)) {

            failed = false;

            return savedState;

        } else {

            throw new IllegalMonitorStateException();

        }

    } finally {

        if (failed)

            node.waitStatus = Node.CANCELLED;

    }

}

這裡第4行獲取state,第5行release的時候將整個state傳過去,理由是某執行緒可能多次呼叫了lock()方法,比如呼叫了10次lock,那麼此執行緒就將state加到了10,所以這裡要將10傳過去,將狀態全部釋放,這樣後面的執行緒才能重新從state=0開始競爭鎖,這也是方法被命名為fullyRelease的原因,因為要完全釋放鎖,釋放鎖之後,如果有競爭鎖的執行緒,那麼就喚醒第一個,這都是release方法的邏輯了,前面的文章詳細講解過。

接著看await()方法的第7行判斷”while(!isOnSyncQueue(node))”:

final boolean isOnSyncQueue(Node node) {

    if (node.waitStatus == Node.CONDITION || node.prev == null)

        return false;

    if (node.next != null) // If has successor, it must be on queue

        return true;

    /*

     * node.prev can be non-null, but not yet on queue because

     * the CAS to place it on queue can fail. So we have to

     * traverse from tail to make sure it actually made it.  It

     * will always be near the tail in calls to this method, and

     * unless the CAS failed (which is unlikely), it will be

     * there, so we hardly ever traverse much.

     */

    return findNodeFromTail(node);

}

註意這裡的判斷是Node是否在AbstractQueuedSynchronizer構建的佇列中而不是Node是否在Condition構建的佇列中,如果Node不在AbstractQueuedSynchronizer構建的佇列中,那麼呼叫LockSupport的park方法阻塞。

至此呼叫await()方法的執行緒構建Condition等待佇列–釋放鎖–等待的過程已經全部分析完畢。 

Condition的signal()實現原理

上面的程式碼分析了構建Condition等待佇列–釋放鎖–等待的過程,接著看一下signal()方法通知是如何實現的:

public final void signal() {

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignal(first);

}

首先從第2行的程式碼我們看到,要能signal(),當前執行緒必須持有獨佔鎖,否則丟擲異常IllegalMonitorStateException。

那麼真正操作的時候,獲取第一個waiter,如果有waiter,呼叫doSignal方法:

private void doSignal(Node first) {

    do {

        if ( (firstWaiter = first.nextWaiter) == null)

            lastWaiter = null;

        first.nextWaiter = null;

    } while (!transferForSignal(first) &&

             (first = firstWaiter) != null);

}

第3行~第5行的程式碼很好理解:

  1. 重新設定firstWaiter,指向第一個waiter的nextWaiter

  2. 如果第一個waiter的nextWaiter為null,說明當前佇列中只有一個waiter,lastWaiter置空

  3. 因為firstWaiter是要被signal的,因此它沒什麼用了,nextWaiter置空

接著執行第6行和第7行的程式碼,這裡重點就是第6行的transferForSignal方法:

final boolean transferForSignal(Node node) {

    /*

     * If cannot change waitStatus, the node has been cancelled.

     */

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        return false;

 

    /*

     * Splice onto queue and try to set waitStatus of predecessor to

     * indicate that thread is (probably) waiting. If cancelled or

     * attempt to set waitStatus fails, wake up to resync (in which

     * case the waitStatus can be transiently and harmlessly wrong).

     */

    Node p = enq(node);

    int ws = p.waitStatus;

    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

        LockSupport.unpark(node.thread);

    return true;

}

方法本意是將一個節點從Condition佇列轉換為AbstractQueuedSynchronizer佇列,總結一下方法的實現:

  1. 嘗試將Node的waitStatus從CONDITION置為0,這一步失敗直接傳回false

  2. 當前節點進入呼叫enq方法進入AbstractQueuedSynchronizer佇列

  3. 當前節點透過CAS機制將waitStatus置為SIGNAL

最後上面的步驟全部成功,傳回true,傳回true喚醒等待節點成功。從喚醒的程式碼我們可以得出一個重要結論:某個await()的節點被喚醒之後並不意味著它後面的程式碼會立即執行,它會被加入到AbstractQueuedSynchronizer佇列的尾部,只有前面等待的節點獲取鎖全部完畢才能輪到它。

程式碼分析到這裡,我想類似的signalAll方法也沒有必要再分析了,顯然signalAll方法的作用就是將所有Condition佇列中等待的節點逐一佇列中從移除,由CONDITION狀態變為SIGNAL狀態並加入AbstractQueuedSynchronizer佇列的尾部。

程式碼示例

可能大家看了我分析半天程式碼會有點迷糊,這裡最後我貼一段我用於驗證上面Condition結論的示例程式碼,首先建立一個Thread,我將之命名為ConditionThread:

/**

 * @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html

 */

public class ConditionThread implements Runnable {

 

    private Lock lock;

     

    private Condition condition;

     

    public ConditionThread(Lock lock, Condition condition) {

        this.lock = lock;

        this.condition = condition;

    }

     

    @Override

    public void run() {

         

        if (“執行緒0”.equals(JdkUtil.getThreadName())) {

            thread0Process();

        } else if (“執行緒1”.equals(JdkUtil.getThreadName())) {

            thread1Process();

        } else if (“執行緒2”.equals(JdkUtil.getThreadName())) {

            thread2Process();

        }

         

    }

     

    private void thread0Process() {

        try {

            lock.lock();

            System.out.println(“執行緒0休息5秒”);

            JdkUtil.sleep(5000);

            condition.signal();

            System.out.println(“執行緒0喚醒等待執行緒”);

        } finally {

            lock.unlock();

        }

    }

     

    private void thread1Process() {

        try {

            lock.lock();

            System.out.println(“執行緒1阻塞”);

            condition.await();

            System.out.println(“執行緒1被喚醒”);

        } catch (InterruptedException e) {

             

        } finally {

            lock.unlock();

        }

    }

     

    private void thread2Process() {

        try {

            System.out.println(“執行緒2想要獲取鎖”);

            lock.lock();

            System.out.println(“執行緒2獲取鎖成功”);

        } finally {

            lock.unlock();

        }

    } 

}

這個類裡面的方法就不解釋了,反正就三個方法片段,根據執行緒名判斷,每個線層執行的是其中的一個程式碼片段。寫一段測試程式碼:

/**

 * @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html

 */

@Test

public void testCondition() throws Exception {

    Lock lock = new ReentrantLock();

    Condition condition = lock.newCondition();

         

    // 執行緒0的作用是signal

    Runnable runnable0 = new ConditionThread(lock, condition);

    Thread thread0 = new Thread(runnable0);

    thread0.setName(“執行緒0”);

    // 執行緒1的作用是await

    Runnable runnable1 = new ConditionThread(lock, condition);

    Thread thread1 = new Thread(runnable1);

    thread1.setName(“執行緒1”);

    // 執行緒2的作用是lock

    Runnable runnable2 = new ConditionThread(lock, condition);

    Thread thread2 = new Thread(runnable2);

    thread2.setName(“執行緒2”);

         

    thread1.start();

    Thread.sleep(1000);

    thread0.start();

    Thread.sleep(1000);

    thread2.start();

         

    thread1.join();

}

測試程式碼的意思是:

  1. 執行緒1先啟動,獲取鎖,呼叫await()方法等待

  2. 執行緒0後啟動,獲取鎖,休眠5秒準備signal()

  3. 執行緒2最後啟動,獲取鎖,由於執行緒0未使用完畢鎖,因此執行緒2排隊,可以此時由於執行緒0還未signal(),因此執行緒1在執行緒0執行signal()後,在AbstractQueuedSynchronizer佇列中的順序是在執行緒2後面的

程式碼執行結果為:

1 執行緒1阻塞

2 執行緒0休息5秒

3 執行緒2想要獲取鎖

4 執行緒0喚醒等待執行緒

5 執行緒2獲取鎖成功

6 執行緒1被喚醒

符合我們的結論:signal()並不意味著被喚醒的執行緒立即執行。由於執行緒2先於執行緒0排隊,因此看到第5行列印的內容,執行緒2先獲取鎖。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂