歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— 基於令牌桶演算法的 RateLimiter

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Eureka/rate-limiter/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. RateLimiter

  • 2.1 refillToken

  • 2.2 consumeToken

  • 3. RateLimitingFilter

  • 4. InstanceInfoReplicator

  • 666. 彩蛋


1. 概述

本文主要分享 RateLimiter 的代碼實現和 RateLimiter 在 Eureka 中的應用

推薦 Spring Cloud 書籍

  • 請支持正版。下載盜版,等於主動編寫低級 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 視頻

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. RateLimiter

com.netflix.discovery.util.RateLimiter ,基於Token Bucket Algorithm ( 令牌桶演算法 )的速率限制器。

FROM 《接口限流實踐》 
令牌桶演算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。 

RateLimiter 目前支持分鐘級秒級兩種速率限制。構造方法如下:

public class RateLimiter {

    /**
     * 速率單位轉換成毫秒
     */

    private final long rateToMsConversion;

    public RateLimiter(TimeUnit averageRateUnit) {
        switch (averageRateUnit) {
            case SECONDS: // 秒級
                rateToMsConversion = 1000;
                break;
            case MINUTES: // 分鐘級
                rateToMsConversion = 60 * 1000;
                break;
            default:
                throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
        }
    }
}
  • averageRateUnit 引數,速率單位。構造方法里將 averageRateUnit 轉換成 rateToMsConversion 。

呼叫 #acquire(...) 方法,獲取令牌,並傳回是否獲取成功

// RateLimiter.java
/**
* 獲取令牌( Token )
*
@param burstSize 令牌桶上限
@param averageRate 令牌再裝平均速率
@return 是否獲取成功
*/

public boolean acquire(int burstSize, long averageRate) {
   return acquire(burstSize, averageRate, System.currentTimeMillis());
}

public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
   if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
       return true;
   }

   // 填充 令牌
   refillToken(burstSize, averageRate, currentTimeMillis);
   // 消費 令牌
   return consumeToken(burstSize);
}
  • burstSize 引數 :令牌桶上限。

  • averageRate 引數 :令牌填充平均速率。

  • 我們舉個 ? 來理解這兩個引數 + 構造方法里的一個引數:

    • averageRateUnit = SECONDS

    • averageRate = 2000

    • burstSize = 10

    • 可獲取 2000 個令牌。例如,每秒允許請求 2000 次。

    • 毫秒可填充 2000 / 1000 = 2 個消耗的令牌。

    • 毫秒可獲取 10 個令牌。例如,每毫秒允許請求上限為 10 次,並且請求消耗掉的令牌,需要逐步填充。這裡要註意下,雖然每毫秒允許請求上限為 10 次,這是在沒有任何令牌被消耗的情況下,實際每秒允許請求依然是 2000 次。

    • 這就是基於令牌桶演算法的限流的特點:讓流量平穩,而不是瞬間流量。1000 QPS 相對平均的分攤在這一秒內,而不是第 1 ms 999 請求,後面 999 ms 0 請求

  • 從代碼上看,#acquire(...) 分成兩部分,我們分別解析,整體如下圖:

2.1 refillToken

呼叫 #refillToken(...) 方法,填充已消耗的令牌。可能很多同學開始和我想的一樣,一個後臺每毫秒執行填充。為什麼不適合這樣呢?一方面,實際專案里每個接口都會有相應的 RateLimiter ,導致太多執行頻率極高的後臺任務;另一方面,獲取令牌時才計算,多次令牌填充可以合併成一次,減少冗餘和無效的計算。

代碼如下:

  1/**
  2:  * 速率單位轉換成毫秒
  3:  */

  4private final long rateToMsConversion;
  5
  6/**
  7:  * 消耗令牌數
  8:  */

  9private final AtomicInteger consumedTokens = new AtomicInteger();
 10/**
 11:  * 最後填充令牌的時間
 12:  */

 13private final AtomicLong lastRefillTime = new AtomicLong(0);
 14
 15private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
 16:     // 獲得 最後填充令牌的時間
 17:     long refillTime = lastRefillTime.get();
 18:     // 獲得 過去多少毫秒
 19:     long timeDelta = currentTimeMillis - refillTime;
 20
 21:     // 計算 可填充最大令牌數量
 22:     long newTokens = timeDelta * averageRate / rateToMsConversion;
 23:     if (newTokens > 0) {
 24:         // 計算 新的填充令牌的時間
 25:         long newRefillTime = refillTime == 0
 26:                 ? currentTimeMillis
 27:                 : refillTime + newTokens * rateToMsConversion / averageRate;
 28:         // CAS 保證有且僅有一個執行緒進入填充
 29:         if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
 30:             while (true) { // 死迴圈,直到成功
 31:                 // 計算 填充令牌後的已消耗令牌數量
 32:                 int currentLevel = consumedTokens.get();
 33:                 int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
 34:                 int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
 35:                 // CAS 避免和正在消費令牌的執行緒衝突
 36:                 if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
 37:                     return;
 38:                 }
 39:             }
 40:         }
 41:     }
 42: }
  • 第 17 行 :獲取最後填充令牌的時間( refillTime ) 。每次填充令牌,會設置 currentTimeMillis 到 refillTime 。

  • 第 19 行 :獲得距離最後填充令牌的時間差( timeDelta ),用於計算需要填充的令牌數。

  • 第 22 行 :計算可填充的最大令牌數量( newTokens )。newTokens 可能超過 burstSize ,所以下麵會有邏輯調整 newTokens 。

  • 第 25 至 27 行 :計算新的填充令牌的時間。為什麼不能用 `currentTimeMillis` 呢?例如,averageRate = 500 && averageRateUnit = SECONDS 時, 每 2 毫秒才填充一個令牌,如果設置 currentTimeMillis ,會導致不足以填充一個令牌的時長被吞了

  • 第 29 行 :通過 CAS 保證有且僅有一個執行緒進入填充邏輯。

  • 第 30 行 :死迴圈直到成功

  • 第 32 至 34 行 :計算新的填充令牌後的已消耗的令牌數量。

    • 第 33 行 :`burstSize` 可能調小,例如,系統接入分佈式配置中心,可以遠程調整該數值。如果此時 `burstSize` 更小,以它作為已消耗的令牌數量。

  • 第 36 行 :通過 CAS 保證避免改寫設置正在消費令牌的執行緒。

2.2 consumeToken

用 #refillToken(...) 方法,填充消耗( 獲取 )的令牌。

代碼如下 :

  1private boolean consumeToken(int burstSize) {
  2:     while (true) { // 死迴圈,直到沒有令牌,或者獲取令牌成功
  3:         // 沒有令牌
  4:         int currentLevel = consumedTokens.get();
  5:         if (currentLevel >= burstSize) {
  6:             return false;
  7:         }
  8:         // CAS 避免和正在消費令牌或者填充令牌的執行緒衝突
  9:         if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
 10:             return true;
 11:         }
 12:     }
 13: }
  • 第 2 行 :死迴圈直到沒有令牌或者競爭獲取令牌成功

  • 第 4 至 7 行 :沒有令牌。

  • 第 9 至 11 行 :通過 CAS 避免和正在消費令牌或者填充令牌的執行緒衝突。

3. RateLimitingFilter

com.netflix.eureka.RateLimitingFilter ,Eureka-Server 限流過濾器。使用 RateLimiting ,保證 Eureka-Server 穩定性。

#doFilter(...) 方法,代碼如下:

  1@Override
  2public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
  3:     // 獲得 Target
  4:     Target target = getTarget(request);
  5
  6:     // Other Target ,不做限流
  7:     if (target == Target.Other) {
  8:         chain.doFilter(request, response);
  9:         return;
 10:     }
 11
 12:     HttpServletRequest httpRequest = (HttpServletRequest) request;
 13:     // 判斷是否被限流
 14:     if (isRateLimited(httpRequest, target)) {
 15:         // TODO[0012]:監控相關,跳過
 16:         incrementStats(target);
 17:         // 如果開啟限流,傳回 503 狀態碼
 18:         if (serverConfig.isRateLimiterEnabled()) {
 19:             ((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
 20:             return;
 21:         }
 22:     }
 23:     chain.doFilter(request, response);
 24: }
  • 第 4 行 :呼叫 #getTarget() 方法,獲取 Target。RateLimitingFilter 只對符合正在運算式 ^./apps(/[^/])?$ 的接口做限流,其中不包含 Eureka-Server 集群批量同步接口。

    • 點擊 鏈接 查看 Target 列舉類代碼。

    • 點擊 鏈接 查看 #getTarget(…) 方法代碼。

  • 第 14 行 :呼叫 #isRateLimited(...) 方法,判斷是否被限流。代碼如下:

      1private boolean isRateLimited(HttpServletRequest request, Target target) {
      2:     // 判斷是否特權應用
      3:     if (isPrivileged(request)) {
      4:         logger.debug("Privileged {} request", target);
      5:         return false;
      6:     }
      7:     // 判斷是否被超載( 限流 )
      8:     if (isOverloaded(target)) {
      9:         logger.debug("Overloaded {} request; discarding it", target);
     10:         return true;
     11:     }
     12:     logger.debug("{} request admitted", target);
     13:     return false;
     14: }
    • x

    • x

    • 第 3 至 6 行 :呼叫 #isPrivileged() 方法,判斷是否為特權應用,對特權應用不開啟限流邏輯。代碼如下:

      private boolean isPrivileged(HttpServletRequest request) {
          // 是否對標準客戶端開啟限流
          if (serverConfig.isRateLimiterThrottleStandardClients()) {
              return false;
          }
          // 以請求頭( "DiscoveryIdentity-Name" ) 判斷是否在標準客戶端名集合內
          Set privilegedClients = serverConfig.getRateLimiterPrivilegedClients();
          String clientName = request.getHeader(AbstractEurekaIdentity.AUTH_NAME_HEADER_KEY);
          return privilegedClients.contains(clientName) || DEFAULT_PRIVILEGED_CLIENTS.contains(clientName);
      }
    • 第 8 至 11 行 :呼叫 #isOverloaded(...) 方法,判斷是否超載( 限流 )。代碼如下:

      /**
      * Includes both full and delta fetches.
      */

      private static final RateLimiter registryFetchRateLimiter = new RateLimiter(TimeUnit.SECONDS);

      /**
      * Only full registry fetches.
      */

      private static final RateLimiter registryFullFetchRateLimiter = new RateLimiter(TimeUnit.SECONDS);

      private boolean isOverloaded(Target target) {
          int maxInWindow = serverConfig.getRateLimiterBurstSize(); // 10
          int fetchWindowSize = serverConfig.getRateLimiterRegistryFetchAverageRate(); // 500
          boolean overloaded = !registryFetchRateLimiter.acquire(maxInWindow, fetchWindowSize);
          if (target == Target.FullFetch) {
              int fullFetchWindowSize = serverConfig.getRateLimiterFullFetchAverageRate(); // 100
                  overloaded |= !registryFullFetchRateLimiter.acquire(maxInWindow, fullFetchWindowSize);
          }
          return overloaded;
      }
  • 第 18 至 21 行 :若 eureka.rateLimiter.enabled = true( 預設值 :false ,可配 ),傳回 503 狀態碼。

4. InstanceInfoReplicator

com.netflix.discovery.InstanceInfoReplicator ,Eureka-Client 應用實體複製器。在 《Eureka 原始碼解析 —— 應用實體註冊發現(一)之註冊》「2.1 應用實體信息複製器」 有詳細解析。

應用實體狀態發生變化時,呼叫 #onDemandUpdate() 方法,向 Eureka-Server 發起註冊,同步應用實體信息。InstanceInfoReplicator 使用 RateLimiter ,避免狀態頻繁發生變化,向 Eureka-Server 頻繁同步。代碼如下:

class InstanceInfoReplicator implements Runnable {

    /**
     * RateLimiter
     */

    private final RateLimiter rateLimiter;
    /**
     * 令牌桶上限,預設:2
     */

    private final int burstSize;
    /**
     * 令牌再裝平均速率,預設:60 * 2 / 30 = 4
     */

    private final int allowedRatePerMinute;

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        // ... 省略其他代碼

        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;

        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }

    public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
                    // 取消任務
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }
                    // 再次呼叫
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

}
  • 在 #onDemandUpdate() 方法,呼叫 RateLimiter#acquire(…) 方法,獲取令牌。

    • InstanceInfoReplicator 會固定周期檢查本地應用實體是否有沒向 Eureka-Server ,若未同步,則發起同步。在 《Eureka 原始碼解析 —— 應用實體註冊發現(一)之註冊》「2.1 應用實體信息複製器」 有詳細解析。

    • Eureka-Client 向 Eureka-Server 心跳時,Eureka-Server 會對比應用實體的 `lastDirtyTimestamp` ,若 Eureka-Client 的更大,則 Eureka-Server 傳回 404 狀態碼。Eureka-Client 接收到 404 狀態碼後,發起註冊同步。在 Eureka 原始碼解析 —— 應用實體註冊發現(二)之續租》「2.2 HeartbeatThread」 有詳細解析。

    • 若獲取成功,向 Eureka-Server 發起註冊,同步應用實體信息。

    • 若獲取失敗,向 Eureka-Server 發起註冊,同步應用實體信息。這樣會不會有問題?答案是不會

666. 彩蛋

後面找時間研究下 Google Guava RateLimiter 的原始碼實現,從功能上更加強大,感興趣的胖友可以瞅瞅呀。

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂