歡迎光臨
每天分享高質量文章

熔斷器 Hystrix 原始碼解析 —— 命令合併執行

摘要: 原創出處 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. HystrixCollapser

    • 2.1 構造方法

    • 2.2 執行命令方式

    • 2.3 核心方法

  • 3. RequestCollapserFactory

  • 4. RequestCollapser

    • 4.1 構造方法

    • 4.2 RequestBatch

    • 4.3 #submitRequest(arg)

    • 4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)

  • 5. CollapserTimer

    • 5.1 RealCollapserTimer

    • 5.2 CollapsedTask

  • 666. 彩蛋


  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

本文主要分享 Hystrix 命令合併執行

在 《【翻譯】Hystrix檔案-實現原理》「請求合併」 中,對 Hystrix 命令合併執行的概念原理使用場景優缺點已經做了非常詳細透徹的分享,所以胖友可以先認真閱讀學習下。

命令合併執行整體流程如下圖 :

FROM 《【翻譯】Hystrix檔案-實現原理》「請求合併」

  • 第一步,提交單個命令請求到請求佇列( RequestQueue )

  • 第二部,定時任務( TimerTask ) 固定週期從請求佇列獲取多個命令執行,合併執行。

在官方提供的示例中,我們透過 CommandCollapserGetValueForKey 熟悉命令合併執行的使用。


推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

2. HystrixCollapser

com.netflix.hystrix.HystrixCollapser命令合併器抽象父類

NOTE : com.netflix.hystrix.HystrixObservableCollapser另一種命令合併器抽象父類,本文暫不解析。

2.1 構造方法

HystrixCollapser 構造方法,程式碼如下 :

  1. public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>

  2.        implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {

  3.    private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;

  4.    private final HystrixRequestCache requestCache;

  5.    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;

  6.    private final HystrixCollapserMetrics metrics;

  7.    /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {

  8.        if (collapserKey == null || collapserKey.name().trim().equals("")) {

  9.            String defaultKeyName = getDefaultNameFromClass(getClass());

  10.            collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);

  11.        }

  12.        HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);

  13.        this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties);

  14.        this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

  15.        if (metrics == null) {

  16.            this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);

  17.        } else {

  18.            this.metrics = metrics;

  19.        }

  20.        final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;

  21.         /* strategy: HystrixMetricsPublisherCollapser */

  22.        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);

  23.        /**

  24.         * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.

  25.         */

  26.        collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {

  27.            @Override

  28.            public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  29.                Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);

  30.                self.metrics.markShards(shards.size());

  31.                return shards;

  32.            }

  33.            @Override

  34.            public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  35.                final HystrixCommand<BatchReturnType> command = self.createCommand(requests);

  36.                command.markAsCollapsedCommand(this.getCollapserKey(), requests.size());

  37.                self.metrics.markBatch(requests.size());

  38.                return command.toObservable();

  39.            }

  40.            @Override

  41.            public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  42.                return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {

  43.                    @Override

  44.                    public void call(BatchReturnType batchReturnType) {

  45.                        // this is a blocking call in HystrixCollapser

  46.                        self.mapResponseToRequests(batchReturnType, requests);

  47.                    }

  48.                }).ignoreElements().cast(Void.class);

  49.            }

  50.            @Override

  51.            public HystrixCollapserKey getCollapserKey() {

  52.                return self.getCollapserKey();

  53.            }

  54.        };

  55.    }

  56. }

  • BatchReturnType 泛型多個命令合併執行傳回結果型別。

  • ResponseType 泛型單個命令執行傳回結果型別。

  • RequestArgumentType 泛型單個命令引數型別。

  • collapserFactory 屬性,RequestCollapser 工廠,在 「3. RequestCollapserFactory」 詳細解析。

  • requestCache 屬性,TODO 【2012】【請求背景關係】

  • collapserInstanceWrapper 屬性,命令合併器包裝器。

    • com.netflix.hystrix.collapser.HystrixCollapserBridge 介面,點選 連結 檢視程式碼。

    • HystrixCollapserBridge ,為 RequestBatch 透明呼叫 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的實現。參見 《橋接樣式》 。

  • metrics 屬性,TODO 【2002】【metrics】

2.2 執行命令方式

在 《Hystrix 原始碼解析 —— 執行命令方式》 中,我們已經看了 HystrixCommand 提供的四種執行命令方式。

HystrixCollapser 類似於 HystrixCommand ,也提供四種相同的執行命令方式,其中如下三種方式程式碼基本類似,我們就給下傳送門,就不重覆囉嗦了 :

  • #observe() 方法 :傳送門 。

  • #queue() 方法 :傳送門 。

  • #execute() 方法 :傳送門 。

下麵一起來看看 #toObservable() 方法的實現,程式碼如下 :

  1.  1: public Observable<ResponseType> toObservable() {

  2.  2:     // when we callback with the data we want to do the work

  3.  3:     // on a separate thread than the one giving us the callback

  4.  4:     return toObservable(Schedulers.computation());

  5.  5: }

  6.  6:

  7.  7: public Observable<ResponseType> toObservable(Scheduler observeOn) {

  8.  8:     return Observable.defer(new Func0<Observable<ResponseType>>() {

  9.  9:         @Override

  10. 10:         public Observable<ResponseType> call() {

  11. 11:             // // 快取開關、快取KEY

  12. 12:             final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();

  13. 13:             final String cacheKey = getCacheKey();

  14. 14:

  15. 15:             // 優先從快取中獲取

  16. 16:             /* try from cache first */

  17. 17:             if (isRequestCacheEnabled) {

  18. 18:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey);

  19. 19:                 if (fromCache != null) {

  20. 20:                     metrics.markResponseFromCache();

  21. 21:                     return fromCache.toObservable();

  22. 22:                 }

  23. 23:             }

  24. 24:

  25. 25:             // 獲得 RequestCollapser

  26. 26:             RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);

  27. 27:

  28. 28:             // 提交 命令請求

  29. 29:             Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());

  30. 30:

  31. 31:             // 獲得 快取Observable

  32. 32:             if (isRequestCacheEnabled && cacheKey != null) {

  33. 33:                 HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);

  34. 34:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);

  35. 35:                 if (fromCache == null) {

  36. 36:                     return toCache.toObservable();

  37. 37:                 } else {

  38. 38:                     toCache.unsubscribe(); // 取消訂閱

  39. 39:                     return fromCache.toObservable();

  40. 40:                 }

  41. 41:             }

  42. 42:

  43. 43:             // 獲得 非快取Observable

  44. 44:             return response;

  45. 45:         }

  46. 46:     });

  47. 47: }

  • observeOn 方法引數,實際方法暫未用到,跳過無視。

  • 第 11 至 13 行 :快取存開關、KEY 。

  • 反向】第 32 至 41 行 :獲得【快取 Observable】。這塊程式碼和 AbstractCommand#toObservavle(...) 類似,在《Hystrix 原始碼解析 —— 執行結果快取》「4. AbstractCommand#toObservavle(...)」 有詳細解析。

  • 反向】第 44 行 :獲得【非快取 Observable】。

  • 註意 :傳回的 Observable ,很可能命令實際並未執行,或者說並未執行完成,此時在 #queue() / #execute() 方法,透過 BlockingObservable 阻塞等待執行完成。BlockingObservable 在 《RxJava 原始碼解析 —— BlockingObservable》 有詳細解析。

  • 第 26 行 :呼叫 RequestCollapserFactory#getRequestCollapser() ,獲得 RequestCollapser 。在 「3. RequestCollapserFactory」 詳細解析。

  • 第 29 行 :提交單個命令請求到請求佇列( RequestQueue ),即命令合併執行整體流程第一步。在 「4. RequestCollapser」 詳細解析。

2.3 核心方法

  • #getRequestArgument(...) 抽象方法,獲得單個命令引數。程式碼如下 :

  1. public abstract RequestArgumentType getRequestArgument();


  • #createCommand(...) 抽象方法,將多個命令請求合併,建立一個 HystrixCommand 。程式碼如下 :

  1. protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);


  • #mapResponseToRequests(...) 抽象方法,將一個 HystrixCommand 的執行結果,對映回對應的命令請求們。

  1. protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);


  • #shardRequests(...) 方法,將多個命令請求分片N 個【多個命令請求】。預設實現下,不進行分片。程式碼如下 :

  1. protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  2.    return Collections.singletonList(requests);

  3. }


  • 未重寫 #shardRequests(...) 的情況下,整體方法流程如下 :

  • 重寫 #shardRequests(...) 的情況下,整體方法流程如下 :

    • 本圖中命令請求分片僅僅是例子,實際根據重寫的邏輯不同而不同。

3. RequestCollapserFactory

com.netflix.hystrix.collapser.RequestCollapserFactory ,RequestCollapser 工廠

  1. public class RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> {

  2.    private final CollapserTimer timer;

  3.    private final HystrixCollapserKey collapserKey;

  4.    private final HystrixCollapserProperties properties;

  5.    private final HystrixConcurrencyStrategy concurrencyStrategy;

  6.    private final Scope scope;

  7.    public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) {

  8.         /* strategy: ConcurrencyStrategy */

  9.        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  10.        this.timer = timer;

  11.        this.scope = scope;

  12.        this.collapserKey = collapserKey;

  13.        this.properties = properties;

  14.    }

  • timer 屬性,命令合併器的定時器,在 「5. CollapserTimer」 詳細解析。

  • collapserKey 屬性,命令合併器標識,實現類似 HystrixThreadPoolKey 。

    • HystrixCollapserKey ,點選 連結 檢視程式碼。

    • HystrixThreadPoolKey ,在 《Hystrix 原始碼解析 —— 命令執行(二)之執行隔離策略》「3. HystrixThreadPoolKey」 有詳細解析。

  • properties 屬性,命令合併器屬性配置。

  • concurrencyStrategy 屬性,併發策略,在 《Hystrix 原始碼解析 —— 命令執行(二)之執行隔離策略》「4. HystrixConcurrencyStrategy」 有詳細解析。

  • scope 屬性,命令請求作用域。目前有兩種作用域 :

    • REQUEST :請求背景關係( HystrixRequestContext )。

      Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
      No interaction with other user requests.
      1 queue per user request.

    • GLOBAL :全域性。

      Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
      1 queue for entire app.


呼叫 #getRequestCollapser() 方法,獲得 RequestCollapser 。程式碼如下 :

  1. public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {

  2.   if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {

  3.       return getCollapserForUserRequest(commandCollapser);

  4.   } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {

  5.       return getCollapserForGlobalScope(commandCollapser);

  6.   } else {

  7.       logger.warn("Invalid Scope: {}  Defaulting to REQUEST scope.", getScope());

  8.       return getCollapserForUserRequest(commandCollapser);

  9.   }

  10. }

  • 根據 scope 不同,呼叫兩個不同方法,獲得 RequestCollapser 。這兩個方法大體邏輯相同,優先從快取中查詢滿足條件的 RequestCollapser 傳回;若不存在,則建立滿足條件的 RequestCollapser 新增到快取並傳回。

    • REQUEST :呼叫 #getCollapserForUserRequest() 方法,TODO 【2012】【請求背景關係】。

    • GLOBAL :呼叫 #getCollapserForGlobalScope() 方法,點選 連結 檢視中文註釋的程式碼。

4. RequestCollapser

com.netflix.hystrix.collapser.RequestCollapser命令請求合併器。主要用於 :

  • 提交單個命令請求到請求佇列( RequestQueue )。

  • 接收來自定時任務提交的多個命令,合併執行。

4.1 構造方法

RequestCollapser 構造方法,程式碼如下 :

  1. public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> {

  2.    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;

  3.    // batch can be null once shutdown

  4.    private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();

  5.    private final AtomicReference<Reference<TimerListener>> timerListenerReference = new AtomicReference<Reference<TimerListener>>();

  6.    private final AtomicBoolean timerListenerRegistered = new AtomicBoolean();

  7.    private final CollapserTimer timer;

  8.    private final HystrixCollapserProperties properties;

  9.    private final HystrixConcurrencyStrategy concurrencyStrategy;

  10.    RequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) {

  11.        this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need

  12.        this.concurrencyStrategy = concurrencyStrategy;

  13.        this.properties = properties;

  14.        this.timer = timer;

  15.        batch.set(new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()));

  16.    }

  17. }

  • commandCollapser 屬性,命令合併器包裝器。

  • batch 屬性,RequestBatch,即是本文一直說的請求佇列。在 「4.2 RequestBatch」 也會詳細解析。

  • timerListenerReference 屬性,註冊在命令合併器的定時器的監聽器。每個 RequestCollapser 獨有一個監聽器。該監聽器( 實際上會使用該監聽器建立定時任務 )固定週期從請求佇列獲取多個命令執行,提交 RequestCollapser 合併執行。在 「5. CollapserTimer」 也會詳細解析。

  • timerListenerRegistered 屬性, timerListenerReference 是否已經註冊。

  • timer 屬性,命令合併器的定時器。

  • properties 屬性,命令合併器屬性配置。

  • concurrencyStrategy 屬性,併發策略。

4.2 RequestBatch

com.netflix.hystrix.collapser.RequestBatch ,命令請求佇列。提供如下功能 :

  • 命令請求的新增

  • 命令請求的移除

  • 命令請求的批次執行。筆者把 RequestBatch 解釋成 "命令請求佇列",主要方便大家理解。

    • 那可能有胖友有疑問,為啥該功能不在 RequestCollapser 直接實現,這樣 RequestBatch 成為純粹的佇列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 詳細解析。

RequestBatch 構造方法,程式碼如下 :

  1. public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> {

  2.    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;

  3.    private final int maxBatchSize;

  4.    private final AtomicBoolean batchStarted = new AtomicBoolean();

  5.    private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =

  6.            new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();

  7.    private final HystrixCollapserProperties properties;

  8.    private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();

  9.    public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {

  10.        this.properties = properties;

  11.        this.commandCollapser = commandCollapser;

  12.        this.maxBatchSize = maxBatchSize;

  13.    }

  14. }

  • commandCollapser 屬性,命令合併器包裝器。

  • maxBatchSize 屬性,佇列最大長度。

  • batchStarted 屬性,執行是否開始。

  • argumentMap 屬性,命令請求引數對映( 佇列 )。

  • properties 屬性,命令合併器屬性配置。

  • batchLock 屬性, argumentMap 操作的讀寫鎖

RequestBatch 實現佇列具體的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。

4.3 #submitRequest(arg)

#toObservable() 方法裡,呼叫 #submitRequest(arg) 方法,提交單個命令請求到 RequestBatch 。程式碼如下 :

  1.  1: public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {

  2.  2:     /*

  3.  3:      * We only want the timer ticking if there are actually things to do so we register it the first time something is added.

  4.  4:      */

  5.  5:     if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {

  6.  6:         /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */

  7.  7:         timerListenerReference.set(timer.addListener(new CollapsedTask()));

  8.  8:     }

  9.  9:

  10. 10:     // loop until succeed (compare-and-set spin-loop)

  11. 11:     while (true) {

  12. 12:         // 獲得 RequestBatch

  13. 13:         final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();

  14. 14:         if (b == null) {

  15. 15:             return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));

  16. 16:         }

  17. 17:

  18. 18:         // 新增到 RequestBatch

  19. 19:         final Observable<ResponseType> response;

  20. 20:         if (arg != null) {

  21. 21:             response = b.offer(arg);

  22. 22:         } else {

  23. 23:             response = b.offer( (RequestArgumentType) NULL_SENTINEL);

  24. 24:         }

  25. 25:

  26. 26:         // 新增成功,傳回 Observable

  27. 27:         // it will always get an Observable unless we hit the max batch size

  28. 28:         if (response != null) {

  29. 29:             return response;

  30. 30:         } else {

  31. 31:             // 新增失敗,執行 RequestBatch ,並建立新的 RequestBatch

  32. 32:             // this batch can't accept requests so create a new one and set it if another thread doesn't beat us

  33. 33:             createNewBatchAndExecutePreviousIfNeeded(b);

  34. 34:         }

  35. 35:     }

  36. 36: }

  • 第 5 至 8 行 :當 RequestCollapser 的監聽任務( CollapsedTask )還未建立,進行初始化。

  • 第 11 至 35 行 :死迴圈,直到提交單個命令請求到 RequestBatch 成功

    • 第 13 至 16 行 :獲得 RequestBatch 。從目前程式碼看下來,除非 RequestCollapser 被 #shutdown() 後才會出現為 null 的情況。

    • 第 19 至 24 行 :調動 RequestBatch#offer(...) 方法,提交單個命令請求到 RequestBatch ,並獲得 Observable 。這裡對 arg==null 做了特殊處理,因為 RequestBatch.argumentMap 是 ConcurrentHashMap ,不允許值為 null 。另外, RequestBatch#offer(...) 方法的實現程式碼,在結束了當前方法,詳細解析。

    • 第 28 至 29 行 :新增成功,傳回 Observable 。

    • 第 30 至 34 行 :新增失敗,執行當前 RequestBatch 的多個命令合併執行,並建立新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 詳細解析。


RequestBatch#offer(...) 方法,程式碼如下 :

  1.  1: public Observable<ResponseType>  offer(RequestArgumentType arg) {

  2.  2:     // 執行已經開始,新增失敗

  3.  3:     /* short-cut - if the batch is started we reject the offer */

  4.  4:     if (batchStarted.get()) {

  5.  5:         return null;

  6.  6:     }

  7.  7:

  8.  8:     /*

  9.  9:      * The 'read' just means non-exclusive even though we are writing.

  10. 10:      */

  11. 11:     if (batchLock.readLock().tryLock()) {

  12. 12:         try {

  13. 13:             // 執行已經開始,新增失敗

  14. 14:             /* double-check now that we have the lock - if the batch is started we reject the offer */

  15. 15:             if (batchStarted.get()) {

  16. 16:                 return null;

  17. 17:             }

  18. 18:

  19. 19:             // 超過佇列最大長度,新增失敗

  20. 20:             if (argumentMap.size() >= maxBatchSize) {

  21. 21:                 return null;

  22. 22:             } else {

  23. 23:                 // 建立 CollapsedRequestSubject ,並新增到佇列

  24. 24:                 CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);

  25. 25:                 final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);

  26. 26:                 /**

  27. 27:                  * If the argument already exists in the batch, then there are 2 options:

  28. 28:                  * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses

  29. 29:                  * be hooked up to that argument

  30. 30:                  * B) If request caching is OFF: return an error to all duplicate argument requests

  31. 31:                  *

  32. 32:                  * This maintains the invariant that each batch has no duplicate arguments.  This prevents the impossible

  33. 33:                  * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)

  34. 34:                  * of trying to figure out which argument of a set of duplicates should get attached to a response.

  35. 35:                  *

  36. 36:                  * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.

  37. 37:                  */

  38. 38:                 if (existing != null) {

  39. 39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get();

  40. 40:                     if (requestCachingEnabled) {

  41. 41:                         return existing.toObservable();

  42. 42:                     } else {

  43. 43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "]  This is not supported.  Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));

  44. 44:                     }

  45. 45:                 } else {

  46. 46:                     return collapsedRequest.toObservable();

  47. 47:                 }

  48. 48:

  49. 49:             }

  50. 50:         } finally {

  51. 51:             batchLock.readLock().unlock();

  52. 52:         }

  53. 53:     } else {

  54. 54:         return null;

  55. 55:     }

  56. 56: }

  • 第 4 至 6 行 :執行已經開始,新增失敗。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法的開頭,優先 CAS 使 batchStarted=true

  • 第 11 行 :獲得讀鎖The'read'just means non-exclusive even though we are writing. ,即使該方法實際在做"寫操作",不排他,執行緒安全,所以可以使用讀鎖。

  • 第 15 至 17 行 : double-check,執行已經開始,新增失敗。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法,優先 CAS 使 batchStarted=true,再獲取寫鎖,所以會出現該情況。

  • 第 20 至 21 行 :超過佇列最大長度,新增失敗。

  • 第 24 至 25 行 :建立 com.netflix.hystrix.collapser.CollapsedRequestSubject ,並將新增到佇列( argumentMap ) 。

    • argument 屬性,單個命令請求引數。

    • valueSet 屬性,結果( Response ) 是否設定,透過 #setResponse()#emitResponse() 方法設定。

    • subject 屬性,可回放執行結果的 Subject 。此處使用 ReplaySubject 的主要目的,當 HystrixCollapser 開啟快取功能時,透過回放執行結果,在 《Hystrix 原始碼解析 —— 執行結果快取》「5. HystrixCachedObservable」 也有相同的實現。另外,這裡有一點要註意下,ReplaySubject 並沒有向任何 Observable 訂閱結果,而是透過 #setResponse()#emitResponse() 方法設定結果

    • outstandingSubscriptions 屬性,訂閱數量。

    • subjectWithAccounting 屬性,帶訂閱數量的 ReplaySubject 。當取消訂閱時,呼叫 RequestBatch#remove(arg) 方法,移除單個命令請求。

    • CollapsedRequestSubject 實現 com.netflix.hystrix.HystrixCollapser.CollapsedRequest 介面,定義了批次命令執行的請求,不僅限於獲得請求引數( #getArgument() 方法 ),也包括對批次命令執行結束後,每個請求的結果設定( #setResponse(...)/ #emitResponse(...)/ #setException(...)/ #setComplete() 方法 ),點選 連結 檢視該介面的程式碼。

    • CollapsedRequestSubject 構造方法,程式碼如下:

  1. /* package */class CollapsedRequestSubject<T, R> implements CollapsedRequest<T, R> {

    /**

  2.  * 引數

  3.  */

  4. private final R argument;

  5. /**

  6.  * 結果( response ) 是否設定

  7.  */

  8. private AtomicBoolean valueSet = new AtomicBoolean(false);

  9. /**

  10.  * 可回放的 ReplaySubject

  11.  */

  12. private final ReplaySubject&lt;T&gt; subject = ReplaySubject.create();

  13. /**

  14.  * 帶訂閱數量的 ReplaySubject

  15.  */

  16. private final Observable&lt;T&gt; subjectWithAccounting;

  17. /**

  18.  * 訂閱數量

  19.  */

  20. private volatile int outstandingSubscriptions = 0;

  21. public CollapsedRequestSubject(final R arg, final RequestBatch&lt;?, T, R&gt; containingBatch) {

  22.     // 設定 argument

  23.     if (arg == RequestCollapser.NULL_SENTINEL) {

  24.         this.argument = null;

  25.     } else {

  26.         this.argument = arg;

  27.     }

  28.     // 設定 帶訂閱數量的 ReplaySubject

  29.     this.subjectWithAccounting = subject

  30.             .doOnSubscribe(new Action0() {

  31.                 @Override

  32.                 public void call() {

  33.                     outstandingSubscriptions++;

  34.                 }

  35.             })

  36.             .doOnUnsubscribe(new Action0() {

  37.                 @Override

  38.                 public void call() {

  39.                     outstandingSubscriptions--;

  40.                     if (outstandingSubscriptions == 0) {

  41.                         containingBatch.remove(arg);

  42.                     }

  43.                 }

  44.             });

  45. }

  46. }

  • 第 38 至 47 行 :傳回 Observable 。

    • 當 argumentMap 已經存在 arg 對應的 Observable 時,必須開啟快取 ( HystrixCollapserProperties.requestCachingEnabled=true ) 功能。原因是,如果在相同的 arg ,並且未開啟快取,同時第 43 行實現的是 collapsedRequest.toObservable() ,那麼相同的 arg 將有多個 Observable 執行命令,此時 HystrixCollapserBridge#mapResponseToRequests(...) 方法無法將執行( Response )賦值到 arg 對應的命令請求( CollapsedRequestSubject ) 。更多討論,見 https://github.com/Netflix/Hystrix/pull/1176 。

    • 回過頭看 HystrixCollapser#toObservable() 方法的第 32 至 41 行的程式碼,這裡也有對快取功能,是不是重覆了呢? argumentMap 針對的是 RequestBatch 級的快取,HystrixCollapser : RequestCollapser : RequestBatch 是 1:1:N 的關係,透過 HystrixCollapser#toObservable() 對快取的處理邏輯,保證 RequestBatch 切換後,依然有快取


    RequestBatch#remove() 方法,程式碼如下 :

    1. /* package-private */ void remove(RequestArgumentType arg) {

    2.    if (batchStarted.get()) {

    3.        //nothing we can do

    4.        return;

    5.    }

    6.    if (batchLock.readLock().tryLock()) {

    7.        try {

    8.            /* double-check now that we have the lock - if the batch is started, deleting is useless */

    9.            if (batchStarted.get()) {

    10.                return;

    11.            }

    12.            argumentMap.remove(arg);

    13.        } finally {

    14.            batchLock.readLock().unlock();

    15.        }

    16.    }

    17. }

    • 當 RequestBatch 開始執行,不允許移除單個命令請求。

    4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)

    本小節建議在 「5. CollapserTimer」 後,再回過頭看。

    #createNewBatchAndExecutePreviousIfNeeded(previousBatch) 方法,程式碼如下 :

    1.  1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> previousBatch) {

    2.  2:     if (previousBatch == null) {

    3.  3:         throw new IllegalStateException("Trying to start null batch which means it was shutdown already.");

    4.  4:     }

    5.  5:     if (batch.compareAndSet(previousBatch, new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()))) {

    6.  6:         // this thread won so trigger the previous batch

    7.  7:         previousBatch.executeBatchIfNotAlreadyStarted();

    8.  8:     }

    9.  9: }

    • 第 5 行 :透過 CAS 修改 batch ,保證併發情況下的執行緒安全。同時註意,此處也進行了新的 RequestBatch ,切換掉老的 RequestBatch 。

    • 第 6 行 :使用老的 RequestBatch ,呼叫 RequestBatch#executeBatchIfNotAlreadyStarted() 方法,命令合併執行。


    RequestBatch#executeBatchIfNotAlreadyStarted() 方法,程式碼如下 :

    1.  1: public void executeBatchIfNotAlreadyStarted() {

    2.  2:     /*

    3.  3:      * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit)

    4.  4:      * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch

    5.  5:      */

    6.  6:     // 設定 執行已經開始

    7.  7:     if (batchStarted.compareAndSet(false, true)) {

    8.  8:         // 獲得 寫鎖

    9.  9:         /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */

    10. 10:         batchLock.writeLock().lock();

    11. 11:

    12. 12:         try {

    13. 13:             // 將多個命令請求分片成 N 個【多個命令請求】。

    14. 14:             // shard batches

    15. 15:             Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());

    16. 16:             // for each shard execute its requests

    17. 17:             for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {

    18. 18:                 try {

    19. 19:                     // 將多個命令請求合併,建立一個 HystrixCommand

    20. 20:                     // create a new command to handle this batch of requests

    21. 21:                     Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);

    22. 22:

    23. 23:                     // 將一個 HystrixCommand 的執行結果,映射回對應的命令請求們

    24. 24:                     commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {

    25. 25:

    26. 26:                         /**

    27. 27:                          * This handles failed completions

    28. 28:                          */

    29. 29:                         @Override

    30. 30:                         public void call(Throwable e) {

    31. 31:                             // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted

    32. 32:                             Exception ee;

    33. 33:                             if (e instanceof Exception) {

    34. 34:                                 ee = (Exception) e;

    35. 35:                             } else {

    36. 36:                                 ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e);

    37. 37:                             }

    38. 38:                             logger.debug("Exception mapping responses to requests.", e);

    39. 39:                             // if a failure occurs we want to pass that exception to all of the Futures that we've returned

    40. 40:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {

    41. 41:                                 try {

    42. 42:                                     ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);

    43. 43:                                 } catch (IllegalStateException e2) {

    44. 44:                                     // if we have partial responses set in mapResponseToRequests

    45. 45:                                     // then we may get IllegalStateException as we loop over them

    46. 46:                                     // so we'll log but continue to the rest

    47. 47:                                     logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);

    48. 48:                                 }

    49. 49:                             }

    50. 50:                         }

    51. 51:

    52. 52:                     }).doOnCompleted(new Action0() {

    53. 53:

    54. 54:                         /**

    55. 55:                          * This handles successful completions

    56. 56:                          */

    57. 57:                         @Override

    58. 58:                         public void call() {

    59. 59:                             // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly

    60. 60:                             Exception e = null;

    61. 61:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {

    62. 62:                                 try {

    63. 63:                                    e = ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");

    64. 64:                                 } catch (IllegalStateException e2) {

    65. 65:                                     logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);

    66. 66:                                 }

    67. 67:                             }

    68. 68:                         }

    69. 69:

    70. 70:                     }).subscribe();

    71. 71:                    

    72. 72:                 } catch (Exception e) {

    73. 73:                     // 異常

    74. 74:                     logger.error("Exception while creating and queueing command with batch.", e);

    75. 75:                     // if a failure occurs we want to pass that exception to all of the Futures that we've returned

    76. 76:                     for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {

    77. 77:                         try {

    78. 78:                             request.setException(e);

    79. 79:                         } catch (IllegalStateException e2) {

    80. 80:                             logger.debug("Failed trying to setException on CollapsedRequest", e2);

    81. 81:                         }

    82. 82:                     }

    83. 83:                 }

    84. 84:             }

    85. 85:

    86. 86:         } catch (Exception e) {

    87. 87:             // 異常

    88. 88:             logger.error("Exception while sharding requests.", e);

    89. 89:             // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails

    90. 90:             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {

    91. 91:                 try {

    92. 92:                     request.setException(e);

    93. 93:                 } catch (IllegalStateException e2) {

    94. 94:                     logger.debug("Failed trying to setException on CollapsedRequest", e2);

    95. 95:                 }

    96. 96:             }

    97. 97:         } finally {

    98. 98:             batchLock.writeLock().unlock();

    99. 99:         }

    100. 100:     }

    101. 101: }

    • 程式碼看起來是有點長哈,請對照著官方示例 CommandCollapserGetValueForKey 一起看,臨門一腳了,胖友!

    • 第 7 行 :透過 CAS 修改 batchStarted ,保證併發情況下的執行緒安全。

    • 第 10 行 :獲得寫鎖。等待呼叫 #offer(...)#remove(...) 方法的執行緒執行完成,以保證命令合併執行時,不再有新的請求新增或移除。

    • 第 15 行 :呼叫 HystrixCollapserBridge#shardRequests(...) 方法,將多個命令請求分片成 N 個【多個命令請求】。預設實現下,不進行分片。點選 連結 檢視程式碼。

    • 第 17 行 :迴圈 N 個【多個命令請求】。

    • 第 21 行 :呼叫 HystrixCollapserBridge#createObservableCommand(...) 方法,將多個命令請求合併,建立一個 HystrixCommand 。點選 連結 檢視程式碼。

    • 第 24 行 :呼叫 HystrixCollapserBridge#mapResponseToRequests(...) 方法,將一個 HystrixCommand 的執行結果,對映回對應的命令請求們。點選 連結 檢視程式碼。

      • Observable#single() 方法,如果 Observable 終止時只發射了一個值,傳回那個值,否則丟擲異常。在 《ReactiveX檔案中文翻譯》「single」 有相關分享。

      • Observable#ignoreElements() 方法,抑制原始 Observable 發射的所有資料,只允許它的終止通知( #onError() 或 #onCompleted())透過。在 《ReactiveX檔案中文翻譯》「IgnoreElements」 有相關分享。也推薦點選 rx.internal.operators.OperatorIgnoreElements 看下原始碼,可能更加易懂。

      • Observable#cast() 方法,將原始 Observable 發射的每一項資料都強制轉換為一個指定的型別,然後再發射資料,它是 map 的一個特殊版本。在 《ReactiveX檔案中文翻譯》「cast」 有相關分享。也推薦點選 rx.internal.operators.OperatorCast 看下原始碼,可能更加易懂。

      • 使用 Observable#ignoreElements()Observable#cast() 方法,用於將 Observable 變成不再繼續向下發射資料項,只給現有方法裡 Observable#doNext() 處理資料項,呼叫 HystrixCollapser#mapResponseToRequests(...) 方法。

      • 點選 連結 ,檢視 CollapsedRequestSubject#setResponse(response) 方法的程式碼。

    • 第 24 至 50 行 :呼叫 Observable#doError(Action1) 方法,當命令合併執行發生異常時,設定每個CollapsedRequestSubject 的執行結果為異常。

      • 點選 連結,檢視 CollapsedRequestSubject#setResponse(response) 方法的程式碼。

    • 第 52 至 68 行 :呼叫 Observable#doOnCompleted(Action0) 方法,當命令合併執行完成時,檢查每個CollapsedRequestSubject 是否都有傳回結果。設定沒有傳回結果的 CollapsedRequestSubject 的執行結果為異常。一般情況下,是使用者實現 HystrixCollapser#mapResponseToRequests(...) 方法存在 BUG 。另外,如果不設定,將導致無結果的單個命令請求無限阻塞

    • 第 70 行 :呼叫 Observable#subscribe() 方法,觸發 HystrixCommand 執行。

    • 第 72 至 96 行 :發生異常,設定每個 CollapsedRequestSubject 的執行結果為異常。

      • 點選 連結,檢視 CollapsedRequestSubject#setException(response) 方法的程式碼。

    • 第 97 至 99 行 :釋放寫鎖

    5. CollapserTimer

    com.netflix.hystrix.collapser.CollapserTimer ,命令合併器的定時器介面,定義了提交定時監聽器,生成定時任務的介面方法,程式碼如下 :

    1. public interface CollapserTimer {

    2.    Reference<TimerListener> addListener(TimerListener collapseTask);

    3. }

    5.1 RealCollapserTimer

    com.netflix.hystrix.collapser.RealCollapserTimer ,命令合併器的定時器實現類,程式碼如下 :

    1. public class RealCollapserTimer implements CollapserTimer {

    2.    /* single global timer that all collapsers will schedule their tasks on */

    3.    private final static HystrixTimer timer = HystrixTimer.getInstance();

    4.    @Override

    5.    public Reference<TimerListener> addListener(TimerListener collapseTask) {

    6.        return timer.addTimerListener(collapseTask);

    7.    }

    8. }

    • 實際上,使用的是 HystrixTimer 提供的單例。在 《Hystrix 原始碼解析 —— 執行結果快取》「3. HystrixTimer 」 有詳細解析。

    5.2 CollapsedTask

    com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask ,定時任務,固定週期( 可配,預設 HystrixCollapserProperties.timerDelayInMilliseconds=10ms ) 輪詢其對應的一個 RequestCollapser 當前RequestBatch 。若有命令需要執行,則提交 RequestCollapser 合併執行。

    程式碼比較簡單,點選 連結 直接看程式碼。

    666. 彩蛋

    T T 一開始把命令合併執行,理解成類似執行緒池批次執行任務,怎麼看官方示例,怎麼奇怪。有一樣的同學,一起淚目 + 握爪下。

    本文有點點長,實在不想拆分成多篇。

    恩,另外部分地方寫的不夠清晰,歡迎一起討論和最佳化。

    胖友,分享一波朋友圈可好!

    贊(0)

    分享創造快樂

    © 2024 知識星球   網站地圖