歡迎光臨
每天分享高質量文章

【死磕Java併發】—–J.U.C之阻塞佇列:SynchronousQueue

原文出處http://cmsblogs.com/ 『chenssy

【註】:SynchronousQueue實現演算法看的暈乎乎的,寫了好久才寫完,如果當中有什麼錯誤之處,忘各位指正

作為BlockingQueue中的一員,SynchronousQueue與其他BlockingQueue有著不同特性:

  1. SynchronousQueue沒有容量。與其他BlockingQueue不同,SynchronousQueue是一個不儲存元素的BlockingQueue。每一個put操作必須要等待一個take操作,否則不能繼續添加元素,反之亦然。

  2. 因為沒有容量,所以對應 peek, contains, clear, isEmpty … 等方法其實是無效的。例如clear是不執行任何操作的,contains始終傳回false,peek始終傳回null。

  3. SynchronousQueue分為公平和非公平,預設情況下採用非公平性訪問策略,當然也可以通過建構式來設置為公平性訪問策略(為true即可)。

  4. 若使用 TransferQueue, 則佇列中永遠會存在一個 dummy node(這點後面詳細闡述)。

SynchronousQueue非常適合做交換工作,生產者的執行緒和消費者的執行緒同步以傳遞某些信息、事件或者任務。

SynchronousQueue

與其他BlockingQueue一樣,SynchronousQueue同樣繼承AbstractQueue和實現BlockingQueue接口:

  1. public class SynchronousQueue<E> extends AbstractQueue<E>

  2.    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue提供了兩個建構式:

  1.    public SynchronousQueue() {

  2.        this(false);

  3.    }

  4.    public SynchronousQueue(boolean fair) {

  5.        // 通過 fair 值來決定公平性和非公平性

  6.        // 公平性使用TransferQueue,非公平性採用TransferStack

  7.        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();

  8.    }

TransferQueue、TransferStack繼承Transferer,Transferer為SynchronousQueue的內部類,它提供了一個方法transfer(),該方法定義了轉移資料的規範,如下:

  1.    abstract static class Transferer<E> {

  2.        abstract E transfer(E e, boolean timed, long nanos);

  3.    }

transfer()方法主要用來完成轉移資料的,如果e != null,相當於將一個資料交給消費者,如果e == null,則相當於從一個生產者接收一個消費者交出的資料。

SynchronousQueue採用佇列TransferQueue來實現公平性策略,採用堆棧TransferStack來實現非公平性策略,他們兩種都是通過鏈表實現的,其節點分別為QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用,SynchronousQueue的put、take操作都是委托這兩個類來實現的。

TransferQueue

TransferQueue是實現公平性策略的核心類,其節點為QNode,其定義如下:

  1.    static final class TransferQueue<E> extends Transferer<E> {

  2.        /** 頭節點 */

  3.        transient volatile QNode head;

  4.        /** 尾節點 */

  5.        transient volatile QNode tail;

  6.        // 指向一個取消的結點

  7.        //當一個節點中最後一個插入時,它被取消了但是可能還沒有離開佇列

  8.        transient volatile QNode cleanMe;

  9.        /**

  10.         * 省略很多代碼O(∩_∩)O

  11.         */

  12.    }

在TransferQueue中除了頭、尾節點外還存在一個cleanMe節點。該節點主要用於標記,當刪除的節點是尾節點時則需要使用該節點。

同時,對於TransferQueue需要註意的是,其佇列永遠都存在一個dummy node,在構造時創建:

  1.        TransferQueue() {

  2.            QNode h = new QNode(null, false); // initialize to dummy node.

  3.            head = h;

  4.            tail = h;

  5.        }

在TransferQueue中定義了QNode類來表示佇列中的節點,QNode節點定義如下:

  1.    static final class QNode {

  2.        // next 域

  3.        volatile QNode next;

  4.        // item資料項

  5.        volatile Object item;

  6.        //  等待執行緒,用於park/unpark

  7.        volatile Thread waiter;       // to control park/unpark

  8.        //樣式,表示當前是資料還是請求,只有當匹配的樣式相匹配時才會交換

  9.        final boolean isData;

  10.        QNode(Object item, boolean isData) {

  11.            this.item = item;

  12.            this.isData = isData;

  13.        }

  14.        /**

  15.         * CAS next域,在TransferQueue中用於向next推進

  16.         */

  17.        boolean casNext(QNode cmp, QNode val) {

  18.            return next == cmp &&

  19.                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

  20.        }

  21.        /**

  22.         * CAS itme資料項

  23.         */

  24.        boolean casItem(Object cmp, Object val) {

  25.            return item == cmp &&

  26.                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);

  27.        }

  28.        /**

  29.         *  取消本結點,將item域設置為自身

  30.         */

  31.        void tryCancel(Object cmp) {

  32.            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);

  33.        }

  34.        /**

  35.         * 是否被取消

  36.         * 與tryCancel相照應只需要判斷item釋放等於自身即可

  37.         */

  38.        boolean isCancelled() {

  39.            return item == this;

  40.        }

  41.        boolean isOffList() {

  42.            return next == this;

  43.        }

  44.        private static final sun.misc.Unsafe UNSAFE;

  45.        private static final long itemOffset;

  46.        private static final long nextOffset;

  47.        static {

  48.            try {

  49.                UNSAFE = sun.misc.Unsafe.getUnsafe();

  50.                Class> k = QNode.class;

  51.                itemOffset = UNSAFE.objectFieldOffset

  52.                        (k.getDeclaredField("item"));

  53.                nextOffset = UNSAFE.objectFieldOffset

  54.                        (k.getDeclaredField("next"));

  55.            } catch (Exception e) {

  56.                throw new Error(e);

  57.            }

  58.        }

  59.    }

上面代碼沒啥好看的,需要註意的一點就是isData,該屬性在進行資料交換起到關鍵性作用,兩個執行緒進行資料交換的時候,必須要兩者的樣式保持一致。

TransferStack

TransferStack用於實現非公平性,定義如下:

  1.    static final class TransferStack<E> extends Transferer<E> {

  2.        static final int REQUEST    = 0;

  3.        static final int DATA       = 1;

  4.        static final int FULFILLING = 2;

  5.        volatile SNode head;

  6.        /**

  7.         * 省略一堆代碼  O(∩_∩)O~

  8.         */

  9.    }

TransferStack中定義了三個狀態:REQUEST表示消費資料的消費者,DATA表示生產資料的生產者,FULFILLING,表示匹配另一個生產者或消費者。任何執行緒對TransferStack的操作都屬於上述3種狀態中的一種(對應著SNode節點的mode)。同時還包含一個head域,表示頭結點。

內部節點SNode定義如下:

  1.    static final class SNode {

  2.        // next 域

  3.        volatile SNode next;

  4.        // 相匹配的節點

  5.        volatile SNode match;

  6.        // 等待的執行緒

  7.        volatile Thread waiter;

  8.        // item 域

  9.        Object item;                // data; or null for REQUESTs

  10.        // 模型

  11.        int mode;

  12.        /**

  13.         * item域和mode域不需要使用volatile修飾,因為它們在volatile/atomic操作之前寫,之後讀

  14.         */

  15.        SNode(Object item) {

  16.            this.item = item;

  17.        }

  18.        boolean casNext(SNode cmp, SNode val) {

  19.            return cmp == next &&

  20.                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

  21.        }

  22.        /**

  23.         * 將s結點與本結點進行匹配,匹配成功,則unpark等待執行緒

  24.         */

  25.        boolean tryMatch(SNode s) {

  26.            if (match == null &&

  27.                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {

  28.                Thread w = waiter;

  29.                if (w != null) {    // waiters need at most one unpark

  30.                    waiter = null;

  31.                    LockSupport.unpark(w);

  32.                }

  33.                return true;

  34.            }

  35.            return match == s;

  36.        }

  37.        void tryCancel() {

  38.            UNSAFE.compareAndSwapObject(this, matchOffset, null, this);

  39.        }

  40.        boolean isCancelled() {

  41.            return match == this;

  42.        }

  43.        // Unsafe mechanics

  44.        private static final sun.misc.Unsafe UNSAFE;

  45.        private static final long matchOffset;

  46.        private static final long nextOffset;

  47.        static {

  48.            try {

  49.                UNSAFE = sun.misc.Unsafe.getUnsafe();

  50.                Class> k = SNode.class;

  51.                matchOffset = UNSAFE.objectFieldOffset

  52.                        (k.getDeclaredField("match"));

  53.                nextOffset = UNSAFE.objectFieldOffset

  54.                        (k.getDeclaredField("next"));

  55.            } catch (Exception e) {

  56.                throw new Error(e);

  57.            }

  58.        }

  59.    }

上面簡單介紹了TransferQueue、TransferStack,由於SynchronousQueue的put、take操作都是呼叫Transfer的transfer()方法,只不過是傳遞的引數不同而已,put傳遞的是e引數,所以樣式為資料(公平isData = true,非公平mode= DATA),而take操作傳遞的是null,所以樣式為請求(公平isData = false,非公平mode = REQUEST),如下:

  1.    // put操作

  2.    public void put(E e) throws InterruptedException {

  3.        if (e == null) throw new NullPointerException();

  4.        if (transferer.transfer(e, false, 0) == null) {

  5.            Thread.interrupted();

  6.            throw new InterruptedException();

  7.        }

  8.    }

  9.    // take操作

  10.    public E take() throws InterruptedException {

  11.        E e = transferer.transfer(null, false, 0);

  12.        if (e != null)

  13.            return e;

  14.        Thread.interrupted();

  15.        throw new InterruptedException();

  16.    }

公平樣式

公平性呼叫TransferQueue的transfer方法:

  1.    E transfer(E e, boolean timed, long nanos) {

  2.        QNode s = null;

  3.        // 當前節點樣式

  4.        boolean isData = (e != null);

  5.        for (;;) {

  6.            QNode t = tail;

  7.            QNode h = head;

  8.            // 頭、尾節點 為null,沒有初始化

  9.            if (t == null || h == null)

  10.                continue;

  11.            // 頭尾節點相等(佇列為null) 或者當前節點和佇列節點樣式一樣

  12.            if (h == t || t.isData == isData) {

  13.                // tn = t.next

  14.                QNode tn = t.next;

  15.                // t != tail表示已有其他執行緒操作了,修改了tail,重新再來

  16.                if (t != tail)

  17.                    continue;

  18.                // tn != null,表示已經有其他執行緒添加了節點,tn 推進,重新處理

  19.                if (tn != null) {

  20.                    // 當前執行緒幫忙推進尾節點,就是嘗試將tn設置為尾節點

  21.                    advanceTail(t, tn);

  22.                    continue;

  23.                }

  24.                //  呼叫的方法的 wait 型別的, 並且 超時了, 直接傳回 null

  25.                // timed 在take操作闡述

  26.                if (timed && nanos <= 0)

  27.                    return null;

  28.                // s == null,構建一個新節點Node

  29.                if (s == null)

  30.                    s = new QNode(e, isData);

  31.                // 將新建的節點加入到佇列中,如果不成功,繼續處理

  32.                if (!t.casNext(null, s))

  33.                    continue;

  34.                // 替換尾節點

  35.                advanceTail(t, s);

  36.                // 呼叫awaitFulfill, 若節點是 head.next, 則進行自旋

  37.                // 若不是的話, 直接 block, 直到有其他執行緒 與之匹配, 或它自己進行執行緒的中斷

  38.                Object x = awaitFulfill(s, e, timed, nanos);

  39.                // 若傳回的x == s表示,當前執行緒已經超時或者中斷,不然的話s == null或者是匹配的節點

  40.                if (x == s) {

  41.                    // 清理節點S

  42.                    clean(t, s);

  43.                    return null;

  44.                }

  45.                // isOffList:用於判斷節點是否已經從佇列中離開了

  46.                if (!s.isOffList()) {

  47.                    // 嘗試將S節點設置為head,移出t

  48.                    advanceHead(t, s);

  49.                    if (x != null)

  50.                        s.item = s;

  51.                    // 釋放執行緒 ref

  52.                    s.waiter = null;

  53.                }

  54.                // 傳回

  55.                return (x != null) ? (E)x : e;

  56.            }

  57.            // 這裡是從head.next開始,因為TransferQueue總是會存在一個dummy node節點

  58.            else {

  59.                // 節點

  60.                QNode m = h.next;

  61.                // 不一致讀,重新開始

  62.                // 有其他執行緒更改了執行緒結構

  63.                if (t != tail || m == null || h != head)

  64.                    continue;

  65.                /**

  66.                 * 生產者producer和消費者consumer匹配操作

  67.                 */

  68.                Object x = m.item;

  69.                // isData == (x != null):判斷isData與x的樣式是否相同,相同表示已經匹配了

  70.                // x == m :m節點被取消了

  71.                // !m.casItem(x, e):如果嘗試將資料e設置到m上失敗

  72.                if (isData == (x != null) ||  x  == m || !m.casItem(x, e)) {

  73.                    // 將m設置為頭結點,h出列,然後重試

  74.                    advanceHead(h, m);

  75.                    continue;

  76.                }

  77.                // 成功匹配了,m設置為頭結點h出列,向前推進

  78.                advanceHead(h, m);

  79.                // 喚醒m上的等待執行緒

  80.                LockSupport.unpark(m.waiter);

  81.                return (x != null) ? (E)x : e;

  82.            }

  83.        }

  84.    }

整個transfer的演算法如下:

  1. 如果佇列為null或者尾節點樣式與當前節點樣式一致,則嘗試將節點加入到等待佇列中(採用自旋的方式),直到被匹配或、超時或者取消。匹配成功的話要麼傳回null(producer傳回的)要麼傳回真正傳遞的值(consumer傳回的),如果傳回的是node節點本身則表示當前執行緒超時或者取消了。

  2. 如果佇列不為null,且佇列的節點是當前節點匹配的節點,則進行資料的傳遞匹配並傳回匹配節點的資料

  3. 在整個過程中都會檢測並幫助其他執行緒推進

當佇列為空時,節點入列然後通過呼叫awaitFulfill()方法自旋,該方法主要用於自旋/阻塞節點,直到節點被匹配傳回或者取消、中斷。

  1.    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {

  2.        // 超時控制

  3.        final long deadline = timed ? System.nanoTime() + nanos : 0L;

  4.        Thread w = Thread.currentThread();

  5.        // 自旋次數

  6.        // 如果節點Node恰好是head節點,則自旋一段時間,這裡主要是為了效率問題,如果裡面阻塞,會存在喚醒、執行緒背景關係切換的問題

  7.        // 如果生產者、消費者者裡面到來的話,就避免了這個阻塞的過程

  8.        int spins = ((head.next == s) ?

  9.                (timed ? maxTimedSpins : maxUntimedSpins) : 0);

  10.        // 自旋

  11.        for (;;) {

  12.            // 執行緒中斷了,剔除當前節點

  13.            if (w.isInterrupted())

  14.                s.tryCancel(e);

  15.            // 如果執行緒進行了阻塞 -> 喚醒或者中斷了,那麼x != e 肯定成立,直接傳回當前節點即可

  16.            Object x = s.item;

  17.            if (x != e)

  18.                return x;

  19.            // 超時判斷

  20.            if (timed) {

  21.                nanos = deadline - System.nanoTime();

  22.                // 如果超時了,取消節點,continue,在if(x != e)肯定會成立,直接傳回x

  23.                if (nanos <= 0L) {

  24.                    s.tryCancel(e);

  25.                    continue;

  26.                }

  27.            }

  28.            // 自旋- 1

  29.            if (spins > 0)

  30.                --spins;

  31.            // 等待執行緒

  32.            else if (s.waiter == null)

  33.                s.waiter = w;

  34.            // 進行沒有超時的 park

  35.            else if (!timed)

  36.                LockSupport.park(this);

  37.            // 自旋次數過了, 直接 + timeout 方式 park

  38.            else if (nanos > spinForTimeoutThreshold)

  39.                LockSupport.parkNanos(this, nanos);

  40.        }

  41.    }

在自旋/阻塞過程中做了一點優化,就是判斷當前節點是否為對頭元素,如果是的則先自旋,如果自旋次數過了,則才阻塞,這樣做的主要目的就在如果生產者、消費者立馬來匹配了則不需要阻塞,因為阻塞、喚醒會消耗資源。在整個自旋的過程中會不斷判斷是否超時或者中斷了,如果中斷或者超時了則呼叫tryCancel()取消該節點。

tryCancel

  1.            void tryCancel(Object cmp) {

  2.                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);

  3.            }

取消過程就是將節點的item設置為自身(itemOffset是item的偏移量)。所以在呼叫awaitFulfill()方法時,如果當前執行緒被取消、中斷、超時了那麼傳回的值肯定時S,否則傳回的則是匹配的節點。如果傳回值是節點S,那麼if(x == s)必定成立,如下:

  1.                    Object x = awaitFulfill(s, e, timed, nanos);

  2.                    if (x == s) {                   // wait was cancelled

  3.                        clean(t, s);

  4.                        return null;

  5.                    }

如果傳回的x == s成立,則呼叫clean()方法清理節點S:

  1.    void clean(QNode pred, QNode s) {

  2.        //

  3.        s.waiter = null;

  4.        while (pred.next == s) {

  5.            QNode h = head;

  6.            QNode hn = h.next;

  7.            // hn節點被取消了,向前推進

  8.            if (hn != null && hn.isCancelled()) {

  9.                advanceHead(h, hn);

  10.                continue;

  11.            }

  12.            // 佇列為空,直接return null

  13.            QNode t = tail;

  14.            if (t == h)

  15.                return;

  16.            QNode tn = t.next;

  17.            // 不一致,說明有其他執行緒改變了tail節點,重新開始

  18.            if (t != tail)

  19.                continue;

  20.            // tn != null 推進tail節點,重新開始

  21.            if (tn != null) {

  22.                advanceTail(t, tn);

  23.                continue;

  24.            }

  25.            // s 不是尾節點 移出

  26.            if (s != t) {

  27.                QNode sn = s.next;

  28.                // 如果s已經被移除退出迴圈,否則嘗試斷開s

  29.                if (sn == s || pred.casNext(s, sn))

  30.                    return;

  31.            }

  32.            // s是尾節點,則有可能會有其他執行緒在添加新節點,則cleanMe出場

  33.            QNode dp = cleanMe;

  34.            // 如果dp不為null,說明是前一個被取消節點,將其移除

  35.            if (dp != null) {

  36.                QNode d = dp.next;

  37.                QNode dn;

  38.                if (d == null ||               // 節點d已經刪除

  39.                        d == dp ||                 // 原來的節點 cleanMe 已經通過 advanceHead 進行刪除

  40.                        !d.isCancelled() ||        // 原來的節點 s已經刪除

  41.                        (d != t &&                 // d 不是tail節點

  42.                                (dn = d.next) != null &&  //

  43.                                dn != d &&                //   that is on list

  44.                                dp.casNext(d, dn)))       // d unspliced

  45.                    // 清除 cleanMe 節點, 這裡的 dp == pred 若成立, 說明清除節點s,成功, 直接return, 不然的話要再次迴圈

  46.                    casCleanMe(dp, null);

  47.                if (dp == pred)

  48.                    return;

  49.            } else if (casCleanMe(null, pred))  // 原來的 cleanMe 是 null, 則將 pred 標記為 cleamMe 為下次 清除 s 節點做標識

  50.                return;

  51.        }

  52.    }

這個clean()方法感覺有點兒難度,我也看得不是很懂。這裡是取用http://www.jianshu.com/p/95cb570c8187

  1. 刪除的節點不是queue尾節點, 這時 直接 pred.casNext(s, s.next) 方式來進行刪除(和ConcurrentLikedQueue中差不多)

  2. 刪除的節點是隊尾節點

  • 此時 cleanMe == null, 則 前繼節點pred標記為 cleanMe, 為下次刪除做準備

  • 此時 cleanMe != null, 先刪除上次需要刪除的節點, 然後將 cleanMe至null, 讓後再將 pred 賦值給 cleanMe

非公平樣式

非公平樣式transfer方法如下:

  1.    E transfer(E e, boolean timed, long nanos) {

  2.        SNode s = null; // constructed/reused as needed

  3.        int mode = (e == null) ? REQUEST : DATA;

  4.        for (;;) {

  5.            SNode h = head;

  6.            // 棧為空或者當前節點樣式與頭節點樣式一樣,將節點壓入棧內,等待匹配

  7.            if (h == null || h.mode == mode) {

  8.                // 超時

  9.                if (timed && nanos <= 0) {

  10.                    // 節點被取消了,向前推進

  11.                    if (h != null && h.isCancelled())

  12.                        //  重新設置頭結點(彈出之前的頭結點)

  13.                        casHead(h, h.next);

  14.                    else

  15.                        return null;

  16.                }

  17.                // 不超時

  18.                // 生成一個SNode節點,並嘗試替換掉頭節點head (head -> s)

  19.                else if (casHead(h, s = snode(s, e, h, mode))) {

  20.                    // 自旋,等待執行緒匹配

  21.                    SNode m = awaitFulfill(s, timed, nanos);

  22.                    // 傳回的m == s 表示該節點被取消了或者超時、中斷了

  23.                    if (m == s) {

  24.                        // 清理節點S,return null

  25.                        clean(s);

  26.                        return null;

  27.                    }

  28.                    // 因為通過前面一步將S替換成了head,如果h.next == s ,則表示有其他節點插入到S前面了,變成了head

  29.                    // 且該節點就是與節點S匹配的節點

  30.                    if ((h = head) != null && h.next == s)

  31.                        // 將s.next節點設置為head,相當於取消節點h、s

  32.                        casHead(h, s.next);

  33.                    // 如果是請求則傳回匹配的域,否則傳回節點S的域

  34.                    return (E) ((mode == REQUEST) ? m.item : s.item);

  35.                }

  36.            }

  37.            // 如果棧不為null,且兩者樣式不匹配(h != null && h.mode != mode)

  38.            // 說明他們是一隊對等匹配的節點,嘗試用當前節點s來滿足h節點

  39.            else if (!isFulfilling(h.mode)) {

  40.                // head 節點已經取消了,向前推進

  41.                if (h.isCancelled())

  42.                    casHead(h, h.next);

  43.                // 嘗試將當前節點打上"正在匹配"的標記,並設置為head

  44.                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

  45.                    // 迴圈loop

  46.                    for (;;) {

  47.                        // s為當前節點,m是s的next節點,

  48.                        // m節點是s節點的匹配節點

  49.                        SNode m = s.next;

  50.                        // m == null,其他節點把m節點匹配走了

  51.                        if (m == null) {

  52.                            // 將s彈出

  53.                            casHead(s, null);

  54.                            // 將s置空,下輪迴圈的時候還會新建

  55.                            s = null;

  56.                            // 退出該迴圈,繼續主迴圈

  57.                            break;

  58.                        }

  59.                        // 獲取m的next節點

  60.                        SNode mn = m.next;

  61.                        // 嘗試匹配

  62.                        if (m.tryMatch(s)) {

  63.                            // 匹配成功,將s 、 m彈出

  64.                            casHead(s, mn);     // pop both s and m

  65.                            return (E) ((mode == REQUEST) ? m.item : s.item);

  66.                        } else

  67.                            // 如果沒有匹配成功,說明有其他執行緒已經匹配了,把m移出

  68.                            s.casNext(m, mn);

  69.                    }

  70.                }

  71.            }

  72.            // 到這最後一步說明節點正在匹配階段

  73.            else {

  74.                // head 的next的節點,是正在匹配的節點,m 和 h配對

  75.                SNode m = h.next;

  76.                // m == null 其他執行緒把m節點搶走了,彈出h節點

  77.                if (m == null)

  78.                    casHead(h, null);

  79.                else {

  80.                    SNode mn = m.next;

  81.                    if (m.tryMatch(h))

  82.                        casHead(h, mn);

  83.                    else

  84.                        h.casNext(m, mn);

  85.                }

  86.            }

  87.        }

  88.    }

整個處理過程分為三種情況,具體如下:

  1. 如果當前棧為空獲取節點樣式與棧頂樣式一樣,則嘗試將節點加入棧內,同時通過自旋方式等待節點匹配,最後傳回匹配的節點或者null(被取消)

  2. 如果棧不為空且節點的樣式與首節點樣式匹配,則嘗試將該節點打上FULFILLING標記,然後加入棧中,與相應的節點匹配,成功後將這兩個節點彈出棧並傳回匹配節點的資料

  3. 如果有節點在匹配,那麼幫助這個節點完成匹配和出棧操作,然後在主迴圈中繼續執行

當節點加入棧內後,通過呼叫awaitFulfill()方法自旋等待節點匹配:

  1.    SNode awaitFulfill(SNode s, boolean timed, long nanos) {

  2.        // 超時

  3.        final long deadline = timed ? System.nanoTime() + nanos : 0L;

  4.        // 當前執行緒

  5.        Thread w = Thread.currentThread();

  6.        // 自旋次數

  7.        // shouldSpin 用於檢測當前節點是否需要自旋

  8.        // 如果棧為空、該節點是首節點或者該節點是匹配節點,則先採用自旋,否則阻塞

  9.        int spins = (shouldSpin(s) ?

  10.                (timed ? maxTimedSpins : maxUntimedSpins) : 0);

  11.        for (;;) {

  12.            // 執行緒中斷了,取消該節點

  13.            if (w.isInterrupted())

  14.                s.tryCancel();

  15.            // 匹配節點

  16.            SNode m = s.match;

  17.            // 如果匹配節點m不為空,則表示匹配成功,直接傳回

  18.            if (m != null)

  19.                return m;

  20.            // 超時

  21.            if (timed) {

  22.                nanos = deadline - System.nanoTime();

  23.                // 節點超時,取消

  24.                if (nanos <= 0L) {

  25.                    s.tryCancel();

  26.                    continue;

  27.                }

  28.            }

  29.            // 自旋;每次自旋的時候都需要檢查自身是否滿足自旋條件,滿足就 - 1,否則為0

  30.            if (spins > 0)

  31.                spins = shouldSpin(s) ? (spins-1) : 0;

  32.            // 第一次阻塞時,會將當前執行緒設置到s上

  33.            else if (s.waiter == null)

  34.                s.waiter = w;

  35.            // 阻塞 當前執行緒

  36.            else if (!timed)

  37.                LockSupport.park(this);

  38.            // 超時

  39.            else if (nanos > spinForTimeoutThreshold)

  40.                LockSupport.parkNanos(this, nanos);

  41.        }

  42.    }

awaitFulfill()方法會一直自旋/阻塞直到匹配節點。在S節點阻塞之前會先呼叫shouldSpin()方法判斷是否採用自旋方式,為的就是如果有生產者或者消費者馬上到來,就不需要阻塞了,在多核條件下這種優化是有必要的。同時在呼叫park()阻塞之前會將當前執行緒設置到S節點的waiter上。匹配成功,傳回匹配節點m。

shouldSpin()方法如下:

  1.        boolean shouldSpin(SNode s) {

  2.            SNode h = head;

  3.            return (h == s || h == null || isFulfilling(h.mode));

  4.        }

同時在阻塞過程中會一直檢測當前執行緒是否中斷了,如果中斷了,則呼叫tryCancel()方法取消該節點,取消過程就是將當前節點的math設置為當前節點。所以如果執行緒中斷了,那麼在傳回m時一定是S節點自身。

  1.            void tryCancel() {

  2.                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);

  3.            }

awaitFullfill()方法如果傳回的m == s,則表示當前節點已經中斷取消了,則需要呼叫clean()方法,清理節點S:

  1.    void clean(SNode s) {

  2.        // 清理item域

  3.        s.item = null;

  4.        // 清理waiter域

  5.        s.waiter = null;

  6.        // past節點

  7.        SNode past = s.next;

  8.        if (past != null && past.isCancelled())

  9.            past = past.next;

  10.        // 從棧頂head節點,取消從棧頂head到past節點之間所有已經取消的節點

  11.        // 註意:這裡如果遇到一個節點沒有取消,則會退出while

  12.        SNode p;

  13.        while ((p = head) != null && p != past && p.isCancelled())

  14.            casHead(p, p.next);     // 如果p節點已經取消了,則剔除該節點

  15.        // 如果經歷上面while p節點還沒有取消,則再次迴圈取消掉所有p 到past之間的取消節點

  16.        while (p != null && p != past) {

  17.            SNode n = p.next;

  18.            if (n != null && n.isCancelled())

  19.                p.casNext(n, n.next);

  20.            else

  21.                p = n;

  22.        }

  23.    }

clean()方法就是將head節點到S節點之間所有已經取消的節點全部移出。【不清楚為何要用兩個while,一個不行麽】

至此,SynchronousQueue的原始碼分析完成了,說下我個人感覺吧:個人感覺SynchronousQueue實現好複雜(可能是自己智商不夠吧~~~~(>_

赞(0)

分享創造快樂

© 2021 知識星球   网站地图