歡迎光臨
每天分享高質量文章

熔斷器 Hystrix 原始碼解析 —— 斷路器 HystrixCircuitBreaker

摘要: 原創出處 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

排版又崩了,請【閱讀原文】。

本文主要基於 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. HystrixCircuitBreaker

  • 3. HystrixCircuitBreaker.Factory

  • 4. HystrixCircuitBreakerImpl

    • 4.1 構造方法

    • 4.2 #subscribeToStream()

    • 4.3 #attemptExecution()

    • 4.4 #markSuccess()

    • 4.5 #markNonSuccess()

    • 4.6 #allowRequest()

    • 4.7 #isOpen()

  • 666. 彩蛋

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

本文主要分享 斷路器 HystrixCircuitBreaker

HystrixCircuitBreaker 有三種狀態 :

  • CLOSED :關閉

  • OPEN :開啟

  • HALF_OPEN :半開

其中,斷路器處於 OPEN 狀態時,鏈路處於非健康狀態,命令執行時,直接呼叫回退邏輯,跳過正常邏輯。

HystrixCircuitBreaker 狀態變遷如下圖 :


  • 紅線 :初始時,斷路器處於 CLOSED 狀態,鏈路處於健康狀態。當滿足如下條件,斷路器從 CLOSED 變成 OPEN 狀態:

    • 週期( 可配, HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms )內,總請求數超過一定( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20 ) 。

    • 錯誤請求佔總請求數超過一定比例( 可配, HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50% ) 。

  • 綠線 :斷路器處於 OPEN 狀態,命令執行時,若當前時間超過斷路器開啟時間一定時間( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds=5000ms ),斷路器變成 HALF_OPEN 狀態,嘗試呼叫正常邏輯,根據執行是否成功,開啟或關閉熔斷器【藍線】。


推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

2. HystrixCircuitBreaker

com.netflix.hystrix.HystrixCircuitBreaker ,Hystrix 斷路器介面。定義介面如下程式碼 :

  1. public interface HystrixCircuitBreaker {

  2.    /**

  3.     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does

  4.     * not modify any internal state, and takes into account the half-open logic which allows some requests through

  5.     * after the circuit has been opened

  6.     *

  7.     * @return boolean whether a request should be permitted

  8.     */

  9.    boolean allowRequest();

  10.    /**

  11.     * Whether the circuit is currently open (tripped).

  12.     *

  13.     * @return boolean state of circuit breaker

  14.     */

  15.    boolean isOpen();

  16.    /**

  17.     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.

  18.     */

  19.    void markSuccess();

  20.    /**

  21.     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.

  22.     */

  23.    void markNonSuccess();

  24.    /**

  25.     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal

  26.     * state.

  27.     */

  28.    boolean attemptExecution();

  29. }

  • #allowRequest() 和 #attemptExecution() 方法,方法目的基本類似,差別在於當斷路器滿足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE=>HALF-OPEN ),而後者會。


HystrixCircuitBreaker 有兩個子類實現 :

  • NoOpCircuitBreaker :的斷路器實現,用於不開啟斷路器功能的情況。

  • HystrixCircuitBreakerImpl :完整的斷路器實現。

在 AbstractCommand 建立時,初始化 HystrixCircuitBreaker ,程式碼如下 :

  1. /* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

  2.    /**

  3.     * 斷路器

  4.     */

  5.    protected final HystrixCircuitBreaker circuitBreaker;

  6.    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,

  7.            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,

  8.            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,

  9.            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

  10.        // ... 省略無關程式碼

  11.        // 初始化 斷路器

  12.        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);

  13.        // ... 省略無關程式碼

  14.    }

  15.    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,

  16.                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,

  17.                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {

  18.        if (enabled) {

  19.            if (fromConstructor == null) {

  20.                // get the default implementation of HystrixCircuitBreaker

  21.                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);

  22.            } else {

  23.                return fromConstructor;

  24.            }

  25.        } else {

  26.            return new NoOpCircuitBreaker();

  27.        }

  28.    }

  29. }

  • 當 HystrixCommandProperties.circuitBreakerEnabled=true 時,即斷路器功能開啟,使用 Factory 獲得 HystrixCircuitBreakerImpl 物件。在 「3. HystrixCircuitBreaker.Factory」 詳細解析。

  • 當 HystrixCommandProperties.circuitBreakerEnabled=false 時,即斷路器功能關閉,建立 NoOpCircuitBreaker 物件。另外,NoOpCircuitBreaker 程式碼簡單到腦殘,點選 連結 檢視實現。

3. HystrixCircuitBreaker.Factory

com.netflix.hystrix.HystrixCircuitBreaker.Factory ,HystrixCircuitBreaker 工廠,主要用於:

  • 建立 HystrixCircuitBreaker 物件,目前只建立 HystrixCircuitBreakerImpl 。

  • HystrixCircuitBreaker 容器,基於 HystrixCommandKey 維護了 HystrixCircuitBreaker 單例物件 的對映。程式碼如下 :


  1. private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

整體程式碼灰常清晰,點選 連結 檢視程式碼。

4. HystrixCircuitBreakerImpl

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl完整的斷路器實現。

我們來逐個方法看看 HystrixCircuitBreakerImpl 的具體實現。

4.1 構造方法

構造方法,程式碼如下 :

  1. /* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {

  2.    private final HystrixCommandProperties properties;

  3.    private final HystrixCommandMetrics metrics;

  4.    enum Status {

  5.        CLOSED, OPEN, HALF_OPEN

  6.    }

  7.    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

  8.    private final AtomicLong circuitOpened = new AtomicLong(-1);

  9.    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

  10.    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {

  11.        this.properties = properties;

  12.        this.metrics = metrics;

  13.        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur

  14.        Subscription s = subscribeToStream();

  15.        activeSubscription.set(s);

  16.    }

  17. }    

  • Status 列舉類,斷路器的三種狀態。

  • status 屬性,斷路器的狀態。

  • circuitOpened 屬性,斷路器開啟,即狀態變成 OPEN 的時間。

  • activeSubscription 屬性,基於 Hystrix Metrics 對請求量統計 Observable 的訂閱,在 「4.2 #subscribeToStream()」 詳細解析。

4.2 #subscribeToStream()

#subscribeToStream() 方法,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。程式碼如下 :

  1. private Subscription subscribeToStream() {

  2.  1: private Subscription subscribeToStream() {

  3.  2:     /*

  4.  3:      * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream

  5.  4:      */

  6.  5:     return metrics.getHealthCountsStream()

  7.  6:             .observe()

  8.  7:             .subscribe(new Subscriber<HealthCounts>() {

  9.  8:                 @Override

  10.  9:                 public void onCompleted() {

  11. 10:

  12. 11:                 }

  13. 12:

  14. 13:                 @Override

  15. 14:                 public void onError(Throwable e) {

  16. 15:

  17. 16:                 }

  18. 17:

  19. 18:                 @Override

  20. 19:                 public void onNext(HealthCounts hc) {

  21. 20:                     System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用於除錯

  22. 21:                     // check if we are past the statisticalWindowVolumeThreshold

  23. 22:                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

  24. 23:                         // we are not past the minimum volume threshold for the stat window,

  25. 24:                         // so no change to circuit status.

  26. 25:                         // if it was CLOSED, it stays CLOSED

  27. 26:                         // if it was half-open, we need to wait for a successful command execution

  28. 27:                         // if it was open, we need to wait for sleep window to elapse

  29. 28:                     } else {

  30. 29:                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

  31. 30:                             //we are not past the minimum error threshold for the stat window,

  32. 31:                             // so no change to circuit status.

  33. 32:                             // if it was CLOSED, it stays CLOSED

  34. 33:                             // if it was half-open, we need to wait for a successful command execution

  35. 34:                             // if it was open, we need to wait for sleep window to elapse

  36. 35:                         } else {

  37. 36:                             // our failure rate is too high, we need to set the state to OPEN

  38. 37:                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

  39. 38:                                 circuitOpened.set(System.currentTimeMillis());

  40. 39:                             }

  41. 40:                         }

  42. 41:                     }

  43. 42:                 }

  44. 43:             });

  45. 44: }

  • 第 5 至 7 行 :向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。這裡的 Observable 基於 RxJava Window 運運算元。

    FROM 《ReactiveX檔案中文翻譯》「Window」
    定期將來自原始 Observable 的資料分解為一個 Observable 視窗,發射這些視窗,而不是每次發射一項資料

    • 簡單來說,固定間隔, #onNext() 方法將不斷被呼叫,每次計算斷路器的狀態。

  • 第 22 行 :判斷週期( 可配, HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms )內,總請求數超過一定( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20 ) 。

    • 這裡要註意下,請求次數統計的是週期內,超過週期的不計算在內。例如說, 00:00 內發起了 N 個請求, 00:11 不計算這 N 個請求。

  • 第 29 行 :錯誤請求佔總請求數超過一定比例( 可配, HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50% ) 。

  • 第 37 至 39 行 :滿足斷路器開啟條件,CAS 修改狀態( CLOSED=>OPEN ),並設定開啟時間( circuitOpened ) 。

  • 補充】第 5 至 7 行 :? 怕寫在上面,大家有壓力。Hystrix Metrics 對請求量統計 Observable 使用了兩種 RxJava Window 運運算元 :

    • Observable#window(timespan, unit) 方法,固定週期( 可配, HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds=500ms ),發射 Observable 視窗。點選 BucketedCounterStream 構造方法 檢視呼叫處的程式碼。

    • Observable#window(count, skip) 方法,每發射一次skip) Observable 忽略 count ( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20 ) 個資料項。為什麼?答案在第 22 行的程式碼,週期內達到一定請求量是斷路器開啟的一個條件。點選 BucketedRollingCounterStream 構造方法 檢視呼叫處的程式碼。

目前該方法有兩處呼叫 :

  • 「4.1 構造方法」,在建立 HystrixCircuitBreakerImpl 時,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。固定間隔,計算斷路器是否要關閉( CLOSE )。

  • 「4.4 #markSuccess()」,清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊,取消原有訂閱,併發起新的訂閱。

4.3 #attemptExecution()

如下是 AbstractCommand#applyHystrixSemantics(_cmd) 方法,對 HystrixCircuitBreakerImpl#attemptExecution 方法的呼叫的程式碼 :

  1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

  2.    // ...  省略無關程式碼

  3.   /* determine if we're allowed to execute */

  4.   if (circuitBreaker.attemptExecution()) {

  5.        // 執行【正常邏輯】

  6.   } else {

  7.        // 執行【回退邏輯】

  8.   }

  9. }

  • 使用 HystrixCircuitBreakerImpl#attemptExecution 方法,判斷是否可以執行正常邏輯


#attemptExecution 方法,程式碼如下 :

  1.  1: @Override

  2.  2: public boolean attemptExecution() {

  3.  3:     // 強制 開啟

  4.  4:     if (properties.circuitBreakerForceOpen().get()) {

  5.  5:         return false;

  6.  6:     }

  7.  7:     // 強制 關閉

  8.  8:     if (properties.circuitBreakerForceClosed().get()) {

  9.  9:         return true;

  10. 10:     }

  11. 11:     // 開啟時間為空

  12. 12:     if (circuitOpened.get() == -1) {

  13. 13:         return true;

  14. 14:     } else {

  15. 15:         // 滿足間隔嘗試斷路器時間

  16. 16:         if (isAfterSleepWindow()) {

  17. 17:             //only the first request after sleep window should execute

  18. 18:             //if the executing command succeeds, the status will transition to CLOSED

  19. 19:             //if the executing command fails, the status will transition to OPEN

  20. 20:             //if the executing command gets unsubscribed, the status will transition to OPEN

  21. 21:             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {

  22. 22:                 return true;

  23. 23:             } else {

  24. 24:                 return false;

  25. 25:             }

  26. 26:         } else {

  27. 27:             return false;

  28. 28:         }

  29. 29:     }

  30. 30: }

  • 第 4 至 6 行 :當 HystrixCommandProperties.circuitBreakerForceOpen=true ( 預設值 : false) 時,即斷路器強制開啟,傳回 false 。當該配置接入配置中心後,可以動態實現開啟熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,透過該配置以實現類似效果。

  • 第 8 至 10 行 :當 HystrixCommandProperties.circuitBreakerForceClose=true ( 預設值 : false) 時,即斷路器強制關閉,傳回 true 。當該配置接入配置中心後,可以動態實現關閉熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,透過該配置以實現類似效果。

  • 第 12 至 13 行 :斷路器開啟時間( circuitOpened ) 為"空",傳回 true 。

  • 第 16 至 28 行 :呼叫 #isAfterSleepWindow() 方法,判斷是否滿足嘗試呼叫正常邏輯的間隔時間。當滿足,使用CAS 方式修改斷路器狀態( OPEN=>HALF_OPEN ),從而保證有且僅有一個執行緒能夠嘗試呼叫正常邏輯。


#isAfterSleepWindow() 方法,程式碼如下 :

  1. private boolean isAfterSleepWindow() {

  2.    final long circuitOpenTime = circuitOpened.get();

  3.    final long currentTime = System.currentTimeMillis();

  4.    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();

  5.    return currentTime > circuitOpenTime + sleepWindowTime;

  6. }

  • 當前時間超過斷路器開啟時間 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds ( 預設值, 5000ms ),傳回 true 。

4.4 #markSuccess()

當嘗試呼叫正常邏輯成功時,呼叫 #markSuccess() 方法,關閉斷路器。程式碼如下 :

  1.  1: @Override

  2.  2: public void markSuccess() {

  3.  3:     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {

  4.  4:         // 清空 Hystrix Metrics 對請求量統計 Observable 的**統計資訊**

  5.  5:         //This thread wins the race to close the circuit - it resets the stream to start it over from 0

  6.  6:         metrics.resetStream();

  7.  7:         // 取消原有訂閱

  8.  8:         Subscription previousSubscription = activeSubscription.get();

  9.  9:         if (previousSubscription != null) {

  10. 10:             previousSubscription.unsubscribe();

  11. 11:         }

  12. 12:         // 發起新的訂閱

  13. 13:         Subscription newSubscription = subscribeToStream();

  14. 14:         activeSubscription.set(newSubscription);

  15. 15:         // 設定斷路器開啟時間為空

  16. 16:         circuitOpened.set(-1L);

  17. 17:     }

  18. 18: }

  • 第 3 行 :使用 CAS 方式,修改斷路器狀態( HALF_OPEN=>CLOSED )。

  • 第 6 行 :清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊

  • 第 8 至 14 行 :取消原有訂閱,發起新的訂閱。

  • 第 16 行 :設定斷路器開啟時間為"空" 。


如下兩處呼叫了 #markNonSuccess() 方法 :

  • markEmits

  • markOnCompleted

4.5 #markNonSuccess()

當嘗試呼叫正常邏輯失敗時,呼叫 #markNonSuccess() 方法,重新開啟斷路器。程式碼如下 :

  1.  1: @Override

  2.  2: public void markNonSuccess() {

  3.  3:     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {

  4.  4:         //This thread wins the race to re-open the circuit - it resets the start time for the sleep window

  5.  5:         circuitOpened.set(System.currentTimeMillis());

  6.  6:     }

  7.  7: }

  • 第 3 行 :使用 CAS 方式,修改斷路器狀態( HALF_OPEN=>OPEN )。

  • 第 5 行 :設定設定斷路器開啟時間為當前時間。這樣, #attemptExecution() 過一段時間,可以再次嘗試執行正常邏輯。


如下兩處呼叫了 #markNonSuccess() 方法 :

  • handleFallback

  • unsubscribeCommandCleanup

4.6 #allowRequest()

#allowRequest()#attemptExecution() 方法,方法目的基本類似,差別在於當斷路器滿足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE=>HALF-OPEN ),而後者會。點選 連結 檢視程式碼實現。

4.7 #isOpen()

#isOpen() 方法,比較簡單,點選 連結 檢視程式碼實現。

666. 彩蛋

呼呼,相對比較乾凈的一篇文章,滿足。

胖友,分享一波朋友圈可好!

    

贊(0)

分享創造快樂

© 2022 知識星球   網站地圖