歡迎光臨
每天分享高質量文章

disruptor 原始碼解讀

(點選上方公眾號,可快速關註)


來源:笨狐狸 ,

blog.csdn.net/liweisnake/article/details/78842176

disruptor經過幾年的發展,似乎已經成為效能最佳化的大殺器,幾乎每個想最佳化效能的專案宣稱自己用上了disruptor,效能都會呈現質的躍進。畢竟,最好的例子就是LMAX自己的架構設計,支撐了600w/s的吞吐。

本文試圖從程式碼層面將關鍵問題做些解答。

基本概念

Disruptor: 實際上就是整個基於ringBuffer實現的生產者消費者樣式的容器。

RingBuffer: 著名的環形佇列,可以類比為BlockingQueue之類的佇列,ringBuffer的使用,使得記憶體被迴圈使用,減少了某些場景的記憶體分配回收擴容等耗時操作。

 

EventProcessor: 事件處理器,實際上可以理解為消費者模型的框架,實現了執行緒Runnable的run方法,將迴圈判斷等操作封在了裡面。

EventHandler: 事件處置器,與前面處理器的不同是,事件處置器不負責框架內的行為,僅僅是EventProcessor作為消費者框架對外預留的擴充套件點罷了。

Sequencer: 作為RingBuffer生產者的父介面,其直接實現有SingleProducerSequencer和MultiProducerSequencer。

EventTranslator: 事件轉換器。實際上就是新事件向舊事件改寫的介面定義。

SequenceBarrier: 消費者路障。規定了消費者如何向下走。都說disruptor無鎖,事實上,該路障算是變向的鎖。

WaitStrategy: 當生產者生產得太快而消費者消費得太慢時的等待策略。

把上面幾個關鍵概念畫個圖,大概長這樣:

所以接下來主要也就從生產者,消費者以及ringBuffer3個維度去看disruptor是如何玩的。

生產者

生產者釋出訊息的過程從disruptor的publish方法為入口,實際呼叫了ringBuffer的publish方法。publish方法主要做了幾件事,一是先確保能拿到後面的n個sequence;二是使用translator來填充新資料到相應的位置;三是真正的宣告這些位置已經釋出完成。

public void publishEvent(EventTranslator translator)  

{  

  final long sequence = sequencer.next();  

  translateAndPublish(translator, sequence);  

}  

 public void publishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize)  

 {  

     checkBounds(translators, batchStartsAt, batchSize);  

     final long finalSequence = sequencer.next(batchSize);  

     translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);  

 }

獲取生產者下一個sequence的方法,細節已經註釋,實際上最終目的就是確保生產者和消費者互相不越界。

public long next(int n)  

{  

    if (n < 1)  

    {  

        throw new IllegalArgumentException(“n must be > 0”);  

    }  

    //該生產者釋出的最大序列號  

    long nextValue = this.nextValue;  

    //該生產者欲釋出的序列號  

    long nextSequence = nextValue + n;  

    //改寫點,即該生產者如果釋出了這次的序列號,那它最終會落在哪個位置,實際上是nextSequence做了算術處理以後的值,最終目的是統一計算,否則就要去判絕對值以及取模等麻煩操作  

    long wrapPoint = nextSequence – bufferSize;  

    //所有消費者中消費得最慢那個的前一個序列號  

    long cachedGatingSequence = this.cachedValue;  

   

    //這裡兩個判斷條件:一是看生產者生產是不是超過了消費者,所以判斷的是改寫點是否超過了最慢消費者;二是看消費者是否超過了當前生產者的最大序號,判斷的是消費者是不是比生產者還快這種異常情況  

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)  

    {  

        cursor.setVolatile(nextValue);  // StoreLoad fence  

   

        long minSequence;  

        //改寫點是不是已經超過了最慢消費者和當前生產者序列號的最小者(這兩個有點難理解,實際上就是改寫點不能超過最慢那個生產者,也不能超過當前自身,比如一次釋出超過bufferSize),gatingSequences的處理也是類似算術處理,也可以看成是相對於原點是正還是負  

        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  

        {  

            //喚醒阻塞的消費者  

            waitStrategy.signalAllWhenBlocking();  

            //等上1納秒  

            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?  

        }  

        //把這個最慢消費者快取下來,以便下一次使用  

        this.cachedValue = minSequence;  

    }  

    //把當前序列號更新為欲釋出序列號  

    this.nextValue = nextSequence;  

   

    return nextSequence;  

}

translator由使用者在呼叫時自己實現,其實就是預留的一個擴充套件點,將改寫事件預留出來。大部分實現都是將ByteBuffer複製到Event中,參考disruptor github官方例子。

最後宣告新序列號釋出完成,實際上就是設定了cursor,並且通知可能阻塞的消費者,這裡已經釋出完新的Event了,快來消費吧。

public void publish(long sequence)  

{  

    cursor.set(sequence);  

    waitStrategy.signalAllWhenBlocking();  

}

以上就是單生產者的分析,MultiProducerSequencer可以類似分析。

等待策略

等待策略實際上就是用來同步生產者和消費者的方法。SequenceBarrier只有一個實現ProcessingSequenceBarrier,中間就用到了WaitStrategy

BlockingWaitStrategy就是真正的加鎖阻塞策略,採用的就是ReentrantLock以及Condition來控制阻塞與喚醒。

TimeoutBlockingWaitStrategy是BlockingWaitStrategy中條件帶超時的版本。

LiteBlockingWaitStrategy是BlockingWaitStrategy的改進版,走了ReentrantLock和CAS輕量級鎖結合的方式,不過註釋說這算是實驗性質的微效能改進。

BusySpinWaitStrategy算是一個自旋鎖,其實現很有趣,即不停的呼叫Thread類的onSpinWait方法。

YieldingWaitStrategy是自旋鎖的一種改進,自旋鎖對於cpu來說太重,於是YieldingWaitStrategy先自旋100次,如果期間沒有達成退出等待的條件,則主動讓出cpu給其他執行緒作為懲罰。

SleepingWaitStrategy又是YieldingWaitStrategy的一種改進,SleepingWaitStrategy頭100次先自旋,如果期間沒有達成退出條件,則接下來100次主動讓出cpu作為懲罰,如果還沒有達成條件,則不再計數,每次睡1納秒。

PhasedBackoffWaitStrategy相對複雜點,基本上是10000次自旋以後要麼出讓cpu,然後繼續自旋,要麼就採取新的等待策略。

消費者

EventProcessor是整個消費者事件處理框架,其主體就是執行緒的run方法,來看BatchEventProcessor,總體比較簡單。

public void run()  

{  

    if (!running.compareAndSet(false, true))  

    {  

        throw new IllegalStateException(“Thread is already running”);  

    }  

    sequenceBarrier.clearAlert();  

   

    notifyStart();  

   

    T event = null;  

    long nextSequence = sequence.get() + 1L;  

    try 

    {  

        while (true)  

        {  

            try 

            {  

                //等待至少一個可用的sequence出來  

               final long availableSequence = sequenceBarrier.waitFor(nextSequence);  

                if (batchStartAware != null)  

                {  

                    batchStartAware.onBatchStart(availableSequence – nextSequence + 1);  

                }  

                //一個一個消費事件  

                while (nextSequence <= availableSequence)  

                {  

                    //從ringBuffer裡獲取下一個事件  

                    event = dataProvider.get(nextSequence);  

                    //消費這個事件  

                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);  

                    nextSequence++;  

                }  

                //當前的sequence推進到availableSequence  

                sequence.set(availableSequence);  

            }  

            catch (final TimeoutException e)  

            {  

                notifyTimeout(sequence.get());  

            }  

            catch (final AlertException ex)  

            {  

                if (!running.get())  

                {  

                    break;  

                }  

            }  

            catch (final Throwable ex)  

            {  

                exceptionHandler.handleEventException(ex, nextSequence, event);  

                sequence.set(nextSequence);  

                nextSequence++;  

            }  

        }  

    }  

    finally 

    {  

        notifyShutdown();  

        running.set(false);  

    }  

}

RingBuffer

RingBuffer這邊程式碼比較簡單,主要就是封裝了一下釋出的api

abstract class RingBufferFields extends RingBufferPad  

{  

    private static final int BUFFER_PAD;  

    private static final long REF_ARRAY_BASE;  

    private static final int REF_ELEMENT_SHIFT;  

    private static final Unsafe UNSAFE = Util.getUnsafe();  

   

    static 

    {  

        final int scale = UNSAFE.arrayIndexScale(Object[].class);  

        if (4 == scale)  

        {  

            REF_ELEMENT_SHIFT = 2;  

        }  

        else if (8 == scale)  

        {  

            REF_ELEMENT_SHIFT = 3;  

        }  

        else 

        {  

            throw new IllegalStateException(“Unknown pointer size”);  

        }  

        // 如果scale是4, BUFFER_PAD則為32  

       BUFFER_PAD = 128 / scale;  

        // Including the buffer pad in the array base offset BUFFER_PAD<

        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);  

    }  

   

    private final long indexMask;  

    private final Object[] entries;  

    protected final int bufferSize;  

    protected final Sequencer sequencer;  

   

    RingBufferFields(  

        EventFactory eventFactory,  

        Sequencer sequencer)  

    {  

        this.sequencer = sequencer;  

        this.bufferSize = sequencer.getBufferSize();  

   

        if (bufferSize < 1)  

        {  

            throw new IllegalArgumentException(“bufferSize must not be less than 1”);  

        }  

        if (Integer.bitCount(bufferSize) != 1)  

        {  

            throw new IllegalArgumentException(“bufferSize must be a power of 2”);  

        }  

   

        this.indexMask = bufferSize – 1;  

        //bufferSize再加兩倍的BUFFER_PAD大小,BUFFER_PAD分別在頭尾  

       this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];  

        fill(eventFactory);  

    }  

   

    private void fill(EventFactory eventFactory)  

    {  

        for (int i = 0; i < bufferSize; i++)  

        {  

            //初始化整個buffer  

            entries[BUFFER_PAD + i] = eventFactory.newInstance();  

        }  

    }  

   

    @SuppressWarnings(“unchecked”)  

    protected final E elementAt(long sequence)  

    {  

        //sequence & indexMask即對sequence取模, 最終算出來的就是基地址+偏移地址  

        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));  

    }  

}

主體程式碼基本如上。其他程式碼可以自行參考。

下麵介紹下一些常見問題。

1. disruptor應該如何用才能發揮最大功效?

disruptor原本就是事件驅動的設計,其整個架構跟普通的多執行緒很不一樣。比如一種用法,將disruptor作為業務處理,中間帶I/O處理,這種玩法比多執行緒還慢;相反,如果將disruptor做業務處理,需要I/O時採用nio非同步呼叫,不阻塞disruptor消費者執行緒,等到I/O非同步呼叫回來後在回呼方法中將後續處理重新塞到disruptor佇列中,可以看出來,這是典型的事件處理架構,確實能在時間上佔據優勢,加上ringBuffer固有的幾項效能最佳化,能讓disruptor發揮最大功效。

2. disruptor為啥這麼快?

這個問題參考之前的一篇文章 disruptor框架為什麼這麼強大

http://blog.csdn.net/liweisnake/article/details/9113119

3. 多生產者如何寫入訊息?

多生產者的訊息寫入實際上是透過availableBuffer與消費者來同步最後一個生產者寫入的位置,這樣,消費者永遠不能超越最慢的那個生產者。見如下程式碼段

private void setAvailableBufferValue(int index, int flag)  

{  

    long bufferAddress = (index * SCALE) + BASE;  

    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);  

}  

   

@Override 

public boolean isAvailable(long sequence)  

{  

    int index = calculateIndex(sequence);  

    int flag = calculateAvailabilityFlag(sequence);  

    long bufferAddress = (index * SCALE) + BASE;  

    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;  

}  

   

@Override 

public long getHighestPublishedSequence(long lowerBound, long availableSequence)  

{  

    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)  

    {  

        if (!isAvailable(sequence))  

        {  

            return sequence – 1;  

        }  

    }  

   

    return availableSequence;  

}

可以參考這篇文章 RingBuffer多生產者寫入

http://alicharles.com/article/disruptor-ringbuffer-muti-write/

4. 除了多個消費者重覆處理生產者傳送的訊息,是否可以多消費者不重覆處理生產者傳送的訊息,即各處理各的?

若要多消費者重覆處理生產者的訊息,則使用disruptor.handleEventsWith方法將消費者傳入;而若要消費者不重覆的處理生產者的訊息,則使用disruptor.handleEventsWithWorkerPool方法將消費者傳入。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂