歡迎光臨
每天分享高質量文章

【死磕Java併發】—– J.U.C之AQS:CLH同步佇列

此篇博客所有原始碼均來自JDK 1.8

作者:大明哥
原文地址:http://cmsblogs.com/?p=2205

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

在執行緒獲取同步狀態時如果獲取失敗,則加入CLH同步佇列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中則需要判斷當前執行緒是否需要阻塞,其主要方法在acquireQueued():

  1. if (shouldParkAfterFailedAcquire(p, node) &&

  2.                    parkAndCheckInterrupt())

  3.                    interrupted = true;

通過這段代碼我們可以看到,在獲取同步狀態失敗後,執行緒並不是立馬進行阻塞,需要檢查該執行緒的狀態,檢查狀態的方法為 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前執行緒是否應該被阻塞,代碼如下:

  1.    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

  2.        //前驅節點

  3.        int ws = pred.waitStatus;

  4.        //狀態為signal,表示當前執行緒處於等待狀態,直接放回true

  5.        if (ws == Node.SIGNAL)

  6.            return true;

  7.        //前驅節點狀態 > 0 ,則為Cancelled,表明該節點已經超時或者被中斷了,需要從同步佇列中取消

  8.        if (ws > 0) {

  9.            do {

  10.                node.prev = pred = pred.prev;

  11.            } while (pred.waitStatus > 0);

  12.            pred.next = node;

  13.        }

  14.        //前驅節點狀態為Condition、propagate

  15.        else {

  16.            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

  17.        }

  18.        return false;

  19.    }

這段代碼主要檢查當前執行緒是否需要被阻塞,具體規則如下:

  1. 如果當前執行緒的前驅節點狀態為SINNAL,則表明當前執行緒需要被阻塞,呼叫unpark()方法喚醒,直接傳回true,當前執行緒阻塞

  2. 如果當前執行緒的前驅節點狀態為CANCELLED(ws > 0),則表明該執行緒的前驅節點已經等待超時或者被中斷了,則需要從CLH佇列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,傳回false

  3. 如果前驅節點非SINNAL,非CANCELLED,則通過CAS的方式將其前驅節點設置為SINNAL,傳回false

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法傳回true,則呼叫parkAndCheckInterrupt()方法阻塞當前執行緒:

  1.    private final boolean parkAndCheckInterrupt() {

  2.        LockSupport.park(this);

  3.        return Thread.interrupted();

  4.    }

parkAndCheckInterrupt() 方法主要是把當前執行緒掛起,從而阻塞住執行緒的呼叫棧,同時傳回當前執行緒的中斷狀態。其內部則是呼叫LockSupport工具類的park()方法來阻塞該方法。

當執行緒釋放同步狀態後,則需要喚醒該執行緒的後繼節點:

  1.    public final boolean release(int arg) {

  2.        if (tryRelease(arg)) {

  3.            Node h = head;

  4.            if (h != null && h.waitStatus != 0)

  5.                //喚醒後繼節點

  6.                unparkSuccessor(h);

  7.            return true;

  8.        }

  9.        return false;

  10.    }

呼叫unparkSuccessor(Node node)喚醒後繼節點:

  1.    private void unparkSuccessor(Node node) {

  2.        //當前節點狀態

  3.        int ws = node.waitStatus;

  4.        //當前狀態 < 0 則設置為 0

  5.        if (ws < 0)

  6.            compareAndSetWaitStatus(node, ws, 0);

  7.        //當前節點的後繼節點

  8.        Node s = node.next;

  9.        //後繼節點為null或者其狀態 > 0 (超時或者被中斷了)

  10.        if (s == null || s.waitStatus > 0) {

  11.            s = null;

  12.            //從tail節點來找可用節點

  13.            for (Node t = tail; t != null && t != node; t = t.prev)

  14.                if (t.waitStatus <= 0)

  15.                    s = t;

  16.        }

  17.        //喚醒後繼節點

  18.        if (s != null)

  19.            LockSupport.unpark(s.thread);

  20.    }

可能會存在當前執行緒的後繼節點為null,超時、被中斷的情況,如果遇到這種情況了,則需要跳過該節點,但是為何是從tail尾節點開始,而不是從node.next開始呢?原因在於node.next仍然可能會存在null或者取消了,所以採用tail回溯辦法找第一個可用的執行緒。最後呼叫LockSupport的unpark(Thread thread)方法喚醒該執行緒。

LockSupport

從上面我可以看到,當需要阻塞或者喚醒一個執行緒的時候,AQS都是使用LockSupport這個工具類來完成的。

LockSupport是用來創建鎖和其他同步類的基本執行緒阻塞原語

每個使用LockSupport的執行緒都會與一個許可關聯,如果該許可可用,並且可在行程中使用,則呼叫park()將會立即傳回,否則可能阻塞。如果許可尚不可用,則可以呼叫 unpark 使其可用。但是註意許可不可重入,也就是說只能呼叫一次park()方法,否則會一直阻塞。

LockSupport定義了一系列以park開頭的方法來阻塞當前執行緒,unpark(Thread thread)方法來喚醒一個被阻塞的執行緒。如下:

park(Object blocker)方法的blocker引數,主要是用來標識當前執行緒在等待的物件,該物件主要用於問題排查和系統監控。

park方法和unpark(Thread thread)都是成對出現的,同時unpark必須要在park執行之後執行,當然並不是說沒有不呼叫unpark執行緒就會一直阻塞,park有一個方法,它帶了時間戳(parkNanos(long nanos):為了執行緒調度禁用當前執行緒,最多等待指定的等待時間,除非許可可用)。

park()方法的原始碼如下:

  1.    public static void park() {

  2.        UNSAFE.park(false, 0L);

  3.    }

unpark(Thread thread)方法原始碼如下:

  1.    public static void unpark(Thread thread) {

  2.        if (thread != null)

  3.            UNSAFE.unpark(thread);

  4.    }

從上面可以看出,其內部的實現都是通過UNSAFE(sun.misc.Unsafe UNSAFE)來實現的,其定義如下:

  1. public native void park(boolean var1, long var2);

  2. public native void unpark(Object var1);

兩個都是native本地方法。Unsafe 是一個比較危險的類,主要是用於執行低級別、不安全的方法集合。儘管這個類和所有的方法都是公開的(public),但是這個類的使用仍然受限,你無法在自己的java程式中直接使用該類,因為只有授信的代碼才能獲得該類的實體。

參考資料

  1. 方騰飛:《Java併發編程的藝術》

  2. LockSupport的park和unpark的基本使用,以及對執行緒中斷的響應性

赞(0)

分享創造快樂