歡迎光臨
每天分享高質量文章

Java 併發之 Condition 的實現分析

(點擊上方公眾號,可快速關註)


來源:zy_lebron ,

youngforzy.top/2017/12/01/Java併發之Condition的實現分析/

一、Condition的概念

介紹

回憶 synchronized 關鍵字,它配合 Object 的 wait()、notify() 系列方法可以實現等待/通知樣式。

對於 Lock,通過 Condition 也可以實現等待/通知樣式。

Condition 是一個接口。

Condition 接口的實現類是 Lock(AQS)中的 ConditionObject。

Lock 接口中有個 newCondition() 方法,通過這個方法可以獲得 Condition 物件(其實就是 ConditionObject)。

因此,通過 Lock 物件可以獲得 Condition 物件。

Lock lock  = new ReentrantLock();

Condition c1 = lock.newCondition();

Condition c2 = lock.newCondition();

二、Condition的實現分析

實現

ConditionObject 類是 AQS 的內部類,實現了 Condition 接口。

public class ConditionObject implements Condition, java.io.Serializable {

        private transient Node firstWaiter;

        private transient Node lastWaiter;

        …

可以看到,等待佇列和同步佇列一樣,使用的都是同步器 AQS 中的節點類 Node。

同樣擁有首節點和尾節點,

每個 Condition 物件都包含著一個 FIFO 佇列。

結構圖:

等待

呼叫 Condition 的 await() 方法會使執行緒進入等待佇列,並釋放鎖,執行緒狀態變為等待狀態。

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    //釋放同步狀態(鎖)

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    //判斷節點是否放入同步對列

    while (!isOnSyncQueue(node)) {

        //阻塞

        LockSupport.park(this);

        //如果已經中斷了,則退出

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

            if (node.nextWaiter != null) // clean up if cancelled

            unlinkCancelledWaiters();

            if (interruptMode != 0)

                reportInterruptAfterWait(interruptMode);

}

分析上述方法的大概過程:

1. 將當前執行緒創建為節點,加入等待佇列;

2. 釋放鎖,喚醒同步佇列中的後繼節點;

3. while迴圈判斷節點是否放入同步佇列:

  • 沒有放入,則阻塞,繼續 while 迴圈(如果已經中斷了,則退出)

  • 放入,則退出 while 迴圈,執行後面的判斷

4. 退出 while 說明節點已經在同步佇列中,呼叫 acquireQueued() 方法加入同步狀態競爭。

5. 競爭到鎖後從 await() 方法傳回,即退出該方法。

addConditionWaiter() 方法:

private Node addConditionWaiter() {

    Node t = lastWaiter;

    if (t != null && t.waitStatus != Node.CONDITION) {

        //清除條件佇列中所有狀態不為Condition的節點

        unlinkCancelledWaiters();

        t = lastWaiter;

    }

    //將該執行緒創建節點,放入等待佇列

    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    if (t == null)

        firstWaiter = node;

    else

        t.nextWaiter = node;

    lastWaiter = node;

    return node;

}

過程分析:同步佇列的首節點移動到等待佇列。加入尾節點之前會清除所有狀態不為 Condition 的節點。

通知

呼叫 Condition 的 signal() 方法,可以喚醒等待佇列的首節點(等待時間最長),喚醒之前會將該節點移動到同步佇列中。

public final void signal() {

    //判斷是否獲取了鎖

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignal(first);

}

過程:

  1. 先判斷當前執行緒是否獲取了鎖;

  2. 然後對首節點呼叫 doSignal() 方法。

private void doSignal(Node first) {

    do {

        if ( (firstWaiter = first.nextWaiter) == null)

            lastWaiter = null;

        first.nextWaiter = null;

    } while (!transferForSignal(first) &&

       (first = firstWaiter) != null);

}

過程:

  1. 修改首節點;

  2. 呼叫 transferForSignal() 方法將節點移動到同步佇列。

final boolean transferForSignal(Node node) {

    //將節點狀態變為0   

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        return false;

    //將該節點加入同步佇列

    Node p = enq(node);

    int ws = p.waitStatus;

    //如果結點p的狀態為cancel 或者修改waitStatus失敗,則直接喚醒

    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

        LockSupport.unpark(node.thread);

    return true;

}

呼叫同步器的 enq 方法,將節點移動到同步佇列,

滿足條件後使用 LockSupport 喚醒該執行緒。

當 Condition 呼叫 signalAll() 方法:

public final void signalAll() {

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignalAll(first);

}

private void doSignalAll(Node first) {

    lastWaiter = firstWaiter = null;

    do {

        Node next = first.nextWaiter;

        first.nextWaiter = null;

        transferForSignal(first);

        first = next;

    } while (first != null);

}

可以看到 doSignalAll() 方法使用了 do-while 迴圈來喚醒每一個等待佇列中的節點,直到 first 為 null 時,停止迴圈。

一句話總結 signalAll() 的作用:將等待佇列中的全部節點移動到同步佇列中,並喚醒每個節點的執行緒。

總結

整個過程可以分為三步:

第一步:一個執行緒獲取鎖後,通過呼叫 Condition 的 await() 方法,會將當前執行緒先加入到等待佇列中,並釋放鎖。然後就在 await() 中的一個 while 迴圈中判斷節點是否已經在同步佇列,是則嘗試獲取鎖,否則一直阻塞。

第二步:當執行緒呼叫 signal() 方法後,程式首先檢查當前執行緒是否獲取了鎖,然後通過 doSignal(Node first) 方法將節點移動到同步佇列,並喚醒節點中的執行緒。

第三步:被喚醒的執行緒,將從 await() 中的 while 迴圈中退出來,然後呼叫 acquireQueued() 方法競爭同步狀態。競爭成功則退出 await() 方法,繼續執行。

【關於投稿】


如果大家有原創好文投稿,請直接給公號發送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂