歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— EndPoint 與 解析器

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. EndPoint

  • 2.1 EurekaEndpoint

  • 2.2 DefaultEndpoint

  • 2.3 AwsEndpoint

  • 3. 解析器

  • 3.1 ClusterResolver

  • 3.2 ClosableResolver

  • 3.3 DnsTxtRecordClusterResolver

  • 3.4 ConfigClusterResolver

  • 3.5 ZoneAffinityClusterResolver

  • 3.6 AsyncResolver

    • 3.6.1 定時任務

    • 3.6.2 解析 EndPoint 集群

  • 4. 初始化解析器

  • 666. 彩蛋


1. 概述

本文主要分享 EndPoint 與 解析器

  • EndPoint ,服務端點。例如,Eureka-Server 的訪問地址。

  • EndPoint 解析器,將配置的 Eureka-Server 的訪問地址解析成 EndPoint 。

目前有多種 Eureka-Server 訪問地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 對 Eureka 2.x 的兼容配置:

  • 第一種,直接配置實際訪問地址。例如,eureka.serviceUrl.defaultZone=http://127.0.0.1:8080/v2 。

  • 第二種,基於 DNS 解析出訪問地址。例如,eureka.shouldUseDns=true 並且eureka.eurekaServer.domainName=eureka.iocoder.cn 。

本文涉及類在 com.netflix.discovery.shared.resolver 包下,涉及到主體類的類圖如下( 打開大圖 ):

  • 紅色部分 —— EndPoint

  • 黃色部分 —— EndPoint 解析器

推薦 Spring Cloud 書籍

  • 請支持正版。下載盜版,等於主動編寫低級 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 視頻

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. EndPoint

2.1 EurekaEndpoint

com.netflix.discovery.shared.resolver.EurekaEndpoint ,Eureka 服務端點接口,實現代碼如下:

public interface EurekaEndpoint extends Comparable<Object{

    /**
     * @return 完整的服務 URL
     */

    String getServiceUrl();

    /**
     * @deprecated use {@link #getNetworkAddress()}
     */

    @Deprecated
    String getHostName();

    /**
     * @return 網絡地址
     */

    String getNetworkAddress();

    /**
     * @return 端口
     */

    int getPort();

    /**
     * @return 是否安全( https )
     */

    boolean isSecure();

    /**
     * @return 相對路徑
     */

    String getRelativeUri();

}

2.2 DefaultEndpoint

com.netflix.discovery.shared.resolver.DefaultEndpoint ,預設 Eureka 服務端點實現類。實現代碼如下:

public class DefaultEndpoint implements EurekaEndpoint {

    /**
     * 網絡地址
     */

    protected final String networkAddress;
    /**
     * 端口
     */

    protected final int port;
    /**
     * 是否安全( https )
     */

    protected final boolean isSecure;
    /**
     * 相對地址
     */

    protected final String relativeUri;
    /**
     * 完整的服務 URL
     */

    protected final String serviceUrl;

    public DefaultEndpoint(String serviceUrl) {
        this.serviceUrl = serviceUrl;

        // 將 serviceUrl 分解成 幾個屬性
        try {
            URL url = new URL(serviceUrl);
            this.networkAddress = url.getHost();
            this.port = url.getPort();
            this.isSecure = "https".equals(url.getProtocol());
            this.relativeUri = url.getPath();
        } catch (Exception e) {
            throw new IllegalArgumentException("Malformed serviceUrl: " + serviceUrl);
        }
    }

    public DefaultEndpoint(String networkAddress, int port, boolean isSecure, String relativeUri) {
        this.networkAddress = networkAddress;
        this.port = port;
        this.isSecure = isSecure;
        this.relativeUri = relativeUri;

        // 幾個屬性 拼接成 serviceUrl
        StringBuilder sb = new StringBuilder().append(isSecure ? "https" : "http").append("://").append(networkAddress);
        if (port >= 0) {
            sb.append(':').append(port);
        }
        if (relativeUri != null) {
            if (!relativeUri.startsWith("/")) {
                sb.append('/');
            }
            sb.append(relativeUri);
        }
        this.serviceUrl = sb.toString();
    }
}
  • 重寫了 #equals(…) 和 #hashCode(…) 方法,標準實現方式,這裡就不貼代碼了。

  • 重寫了 #compareTo(…) 方法,基於 serviceUrl 屬性做比較。

2.3 AwsEndpoint

com.netflix.discovery.shared.resolver.aws.AwsEndpoint ,基於 regionzone 的 Eureka 服務端點實現類 ( 請不要在意 AWS 開頭 )。實現代碼如下:

public class AwsEndpoint extends DefaultEndpoint {

    /**
     * 區域
     */

    protected final String region;
    /**
     * 可用區
     */

    protected final String zone;
}
  • 重寫了 #equals(…) 和 #hashCode(…) 方法,標準實現方式,這裡就不貼代碼了。

3. 解析器

EndPoint 解析器使用委托設計樣式實現。所以,上文圖片中我們看到好多個解析器,實際代碼非常非常非常清晰

FROM 《委托樣式》 
委托樣式是軟體設計樣式中的一項基本技巧。在委托樣式中,有兩個物件參與處理同一個請求,接受請求的物件將請求委托給另一個物件來處理。委托樣式是一項基本技巧,許多其他的樣式,如狀態樣式、策略樣式、訪問者樣式本質上是在更特殊的場合採用了委托樣式。委托樣式使得我們可以用聚合來替代繼承,它還使我們可以模擬mixin。

我們在上圖的基礎上,增加委托的關係,如下圖:

3.1 ClusterResolver

com.netflix.discovery.shared.resolver.ClusterResolver ,集群解析器接口。接口代碼如下:

public interface ClusterResolver<T extends EurekaEndpoint{

    /**
     * @return 地區
     */

    String getRegion();

    /**
     * @return EndPoint 集群( 陣列 )
     */

    List getClusterEndpoints();

}

3.2 ClosableResolver

com.netflix.discovery.shared.resolver.ClosableResolver ,可關閉的解析器接口,繼承自 ClusterResolver 接口。接口代碼如下:

public interface ClosableResolver<T extends EurekaEndpointextends ClusterResolver<T{

    /**
     * 關閉
     */

    void shutdown();
}

3.3 DnsTxtRecordClusterResolver

com.netflix.discovery.shared.resolver.aws.DnsTxtRecordClusterResolver ,基於 DNS TXT 記錄型別的集群解析器。類屬性代碼如下:

public class DnsTxtRecordClusterResolver implements ClusterResolver<AwsEndpoint{

    /**
     * 地區
     */

    private final String region;
    /**
     * 集群根地址,例如 txt.default.eureka.iocoder.cn
     */

    private final String rootClusterDNS;
    /**
     * 是否解析可用區( zone )
     */

    private final boolean extractZoneFromDNS;
    /**
     * 端口
     */

    private final int port;
    /**
     * 是否安全
     */

    private final boolean isSecure;
    /**
     * 相對地址
     */

    private final String relativeUri;
}
  • DnsTxtRecordClusterResolver 通過集群根地址( rootClusterDNS ) 解析出 EndPoint 集群。需要在 DNS 配置兩層解析記錄:

    • 主機記錄 :格式為 TXT.${ZONE}.${自定義二級域名} 或者 ${ZONE}.${自定義二級域名}

    • 記錄型別 :TXT 記錄型別

    • 記錄值 :EndPoint 的網絡地址。如有多個 EndPoint,使用空格分隔。

    • 主機記錄 :格式為 TXT.${REGION}.${自定義二級域名} 。

    • 記錄型別 :TXT 記錄型別

    • 記錄值 :第二層的主機記錄。如有多個第二層級,使用空格分隔。

    • 第一層 :

    • 第二層:

  • 舉個例子:

  • rootClusterDNS ,集群根地址。例如:txt.default.eureka.iocoder.cn,其· txt.default.eureka 為 DNS 解析記錄的第一層的主機記錄

  • region :地區。需要和 rootClusterDNS 的 ${REGION} 一致。

  • extractZoneFromDNS :是否解析 DNS 解析記錄的第二層級的主機記錄的 ${ZONE} 可用區。


#getClusterEndpoints(...) 方法,實現代碼如下:

  1@Override
  2public List getClusterEndpoints() {
  3:     List eurekaEndpoints = resolve(region, rootClusterDNS, extractZoneFromDNS, port, isSecure, relativeUri);
  4:     if (logger.isDebugEnabled()) {
  5:         logger.debug("Resolved {} to {}", rootClusterDNS, eurekaEndpoints);
  6:     }
  7:     return eurekaEndpoints;
  8: }
  9
 10private static List resolve(String region, String rootClusterDNS, boolean extractZone, int port, boolean isSecure, String relativeUri) {
 11:     try {
 12:         // 解析 第一層 DNS 記錄
 13:         Set zoneDomainNames = resolve(rootClusterDNS);
 14:         if (zoneDomainNames.isEmpty()) {
 15:             throw new ClusterResolverException("Cannot resolve Eureka cluster addresses; there are no data in TXT record for DN " + rootClusterDNS);
 16:         }
 17:         // 記錄 第二層 DNS 記錄
 18:         List endpoints = new ArrayList<>();
 19:         for (String zoneDomain : zoneDomainNames) {
 20:             String zone = extractZone ? ResolverUtils.extractZoneFromHostName(zoneDomain) : null// 
 21:             Set zoneAddresses = resolve(zoneDomain);
 22:             for (String address : zoneAddresses) {
 23:                 endpoints.add(new AwsEndpoint(address, port, isSecure, relativeUri, region, zone));
 24:             }
 25:         }
 26:         return endpoints;
 27:     } catch (NamingException e) {
 28:         throw new ClusterResolverException("Cannot resolve Eureka cluster addresses for root: " + rootClusterDNS, e);
 29:     }
 30: }
  • 第 12 至 16 行 :呼叫 #resolve(rootClusterDNS) 解析第一層 DNS 記錄。實現代碼如下:

      1private static Set resolve(String rootClusterDNS) throws NamingException {
      2:     Set result;
      3:     try {
      4:         result = DnsResolver.getCNamesFromTxtRecord(rootClusterDNS);
      5:         // TODO 芋艿:這塊是bug,不需要這一段
      6:         if (!rootClusterDNS.startsWith("txt.")) {
      7:             result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
      8:         }
      9:     } catch (NamingException e) {
     10:         if (!rootClusterDNS.startsWith("txt.")) {
     11:             result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
     12:         } else {
     13:             throw e;
     14:         }
     15:     }
     16:     return result;
     17: }
    • 第 4 行 : 呼叫 DnsResolver#getCNamesFromTxtRecord(…) 方法,解析 TXT 主機記錄。點擊鏈接查看帶中文註釋的 DnsResolver 的代碼,比較解析,筆者就不啰嗦了。

    • 第 5 至 8 行 :當傳遞引數 rootClusterDNS 不以 txt. 開頭時,即使第 4 行解析成功,也會報錯,此時是個 Eureka 的 BUG 。因此,配置 DNS 解析記錄時,主機記錄暫時必須以 txt. 開頭。

  • 第 17 至 25 行 :迴圈第一層 DNS 記錄的解析結果,進一步解析第二層 DNS 記錄。

    • 第 20 行 :解析可用區( zone )。

    • 第 21 行 :呼叫 #resolve(rootClusterDNS) 解析第二層 DNS 記錄。

3.4 ConfigClusterResolver

com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver ,基於配置檔案的集群解析器。類屬性代碼如下:

public class ConfigClusterResolver implements ClusterResolver<AwsEndpoint{

    private final EurekaClientConfig clientConfig;
    private final InstanceInfo myInstanceInfo;

    public ConfigClusterResolver(EurekaClientConfig clientConfig, InstanceInfo myInstanceInfo) {
        this.clientConfig = clientConfig;
        this.myInstanceInfo = myInstanceInfo;
    }
}

#getClusterEndpoints(...) 方法,實現代碼如下:

// ... 省略代碼,超過微信文章的長度
  • 第 3 至 8 行 :基於 DNS 獲取 EndPoint 集群,呼叫 #getClusterEndpointsFromDns() 方法,實現代碼如下:

    // ... 省略代碼,超過微信文章的長度
    • 必須配置 eureka.shouldUseDns=true ,開啟基於 DNS 獲取 EndPoint 集群。

    • 必須配置 eureka.eurekaServer.domainName=${xxxxx} ,配置集群根地址。

    • 選填配 eureka.eurekaServer.port ,eureka.eurekaServer.context 。

    • 從代碼中我們可以看出,使用 DnsTxtRecordClusterResolver 解析出 EndPoint 集群。

  • 第 9 至 13 行 :直接配置檔案填寫實際 EndPoint 集群,呼叫 #getClusterEndpointsFromConfig() 方法,實現代碼如下:

// ... 省略代碼,超過微信文章的長度
  • 第 3 行 :獲得可用區陣列。通過 eureka.${REGION}.availabilityZones 配置。

  • 第 5 行 :呼叫 InstanceInfo#getZone(…) 方法,獲得應用實體自己所在的可用區zone )。非亞馬遜 AWS 環境下,可用區陣列的第一個元素就是應用實體自己所在的可用區

  • 第 7 行 :呼叫 EndpointUtils#getServiceUrlsMapFromConfig(...) 方法,獲得可用區與 serviceUrls 的映射。實現代碼如下:

    // ... 省略代碼,超過微信文章的長度
    • 當方法引數 preferSameZone=true ,即 eureka.preferSameZone=true( 預設值 :true ) 時,開始位置為可用區陣列( availZones )的第一個和應用實體所在的可用區( myZone )【相等】元素的位置。

    • 當方法引數 preferSameZone=false ,即 eureka.preferSameZone=false( 預設值 :true ) 時,開始位置為可用區陣列( availZones )的第一個和應用實體所在的可用區( myZone )【不相等】元素的位置。

    • 第 13 行 :獲得開始位置。實現代碼如下:

      // ... 省略代碼,超過微信文章的長度
    • 第 20 至 33 行 :從開始位置順序將剩餘的可用區的 serviceUrls 添加到結果。順序理解如下圖:

  • 第 9 至 18 行 :拼裝 EndPoint 集群結果。

3.5 ZoneAffinityClusterResolver

com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver ,使用可用區親和的集群解析器。類屬性代碼如下:

// ... 省略代碼,超過微信文章的長度
  • 屬性 delegate ,委托的解析器。目前代碼里使用的是 ConfigClusterResolver 。

  • 屬性 zoneAffinity ,是否可用區親和。

    • `true` :EndPoint 可用區為本地的優先被放在前面。

    • `false` :EndPoint 可用區非本地的優先被放在前面。

#getClusterEndpoints(...) 方法,實現代碼如下:

// ... 省略代碼,超過微信文章的長度
  • 第 2 行 :呼叫 ClusterResolver#getClusterEndpoints() 方法,獲得 EndPoint 集群。再呼叫 ResolverUtils#splitByZone(…) 方法,拆分成本地非本地的可用區的 EndPoint 集群,點擊鏈接查看實現。

  • 第 8 行 :呼叫 #randomizeAndMerge(...) 方法,分別隨機打亂每個 EndPoint 集群,併進行合併陣列,實現代碼如下:

    // ... 省略代碼,超過微信文章的長度
    • 多個主機,實現對同一個 EndPoint 集群負載均衡的效果。

    • 單個主機,同一個 EndPoint 集群按照固定順序訪問。Eureka-Server 不是強一致性的註冊中心,Eureka-Client 對同一個 Eureka-Server 拉取註冊信息,保證兩者之間增量同步的一致性。

    • 註意,`ResolverUtils#randomize(…)` 使用以本機IP為隨機種子,有如下好處:

  • 第 10 至 12 行 :非可用區親和,將非本地的可用區的 EndPoint 集群放在前面。

3.6 AsyncResolver

com.netflix.discovery.shared.resolver.AsyncResolver ,異步執行解析的集群解析器。AsyncResolver 屬性較多,而且複雜的多,我們拆分到具體方法里分享。

3.6.1 定時任務

AsyncResolver 內置定時任務,定時掃清 EndPoint 集群解析結果。

為什麼要掃清?例如,Eureka-Server 的 serviceUrls 基於 DNS 配置。

定時任務代碼如下

// ... 省略代碼,超過微信文章的長度
  • backgroundTask ,後臺任務,定時解析 EndPoint 集群。

    • delegate ,委托的解析器,目前代碼為 ZoneAffinityClusterResolver。

    • TimedSupervisorTask ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(二)之續租》「2.3 TimedSupervisorTask」 有詳細解析。

    • updateTask 實現代碼如下:

      // ... 省略代碼,超過微信文章的長度
    • 後臺任務的發起在 #getClusterEndpoints() 方法,在 「3.6.2 解析 EndPoint 集群」 詳細解析。

3.6.2 解析 EndPoint 集群

呼叫 #getClusterEndpoints() 方法,解析 EndPoint 集群,實現代碼如下:

// ... 省略代碼,超過微信文章的長度
  • 第 5 至 9 行 :若未預熱解析 EndPoint 集群結果,呼叫 #doWarmUp() 方法,進行預熱。若預熱失敗,取消定時任務的第一次延遲。#doWarmUp() 方法實現代碼如下:

    // ... 省略代碼,超過微信文章的長度
    • 呼叫 updateTask ,解析 EndPoint 集群。

  • 第 10 至 13 行 : 若未調度定時任務,進行調度,呼叫 #scheduleTask() 方法,實現代碼如下:

    // ... 省略代碼,超過微信文章的長度
    • x

  • 第 15 行 :傳回 EndPoint 集群。當第一次預熱失敗,會傳回空,直到定時任務獲得到結果

4. 初始化解析器

Eureka-Client 在初始化時,呼叫 DiscoveryClient#scheduleServerEndpointTask() 方法,初始化 AsyncResolver 解析器。實現代碼如下:

// ... 省略代碼,超過微信文章的長度                                            
  • 呼叫 EurekaHttpClients#newBootstrapResolver(...) 方法,創建 EndPoint 解析器,實現代碼如下:

     // ... 省略代碼,超過微信文章的長度
    • x

    • 第 10 至 23 行 :組合解析器,用於 Eureka 1.x 對 Eureka 2.x 的兼容配置,暫時不需要瞭解。TODO[0028]寫入集群和讀取集群

    • 第 26 行 :呼叫 #defaultBootstrapResolver() 方法,創建預設的解析器 AsyncResolver 。

    • 第 40 至 45 行 :創建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 構造方法的引數,我們看到創建 ConfigClusterResolver 作為 delegate 引數。

    • 第 48 行 :呼叫 ZoneAffinityClusterResolver#getClusterEndpoints() 方法,第一次 Eureka-Server EndPoint 集群解析

    • 第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群時,可以通過配置( eureka.experimental.clientTransportFailFastOnInit=true ),使 Eureka-Client 初始化失敗。#failFastOnInitCheck(...) 方法,實現代碼如下:

      // potential future feature, guarding with experimental flag for now
      // ... 省略代碼,超過微信文章的長度
  • 第 58 至 64 行 :創建 AsyncResolver 。從代碼上,我們可以看到,AsyncResolver.resultsRef 屬性一開始已經用 initialValue 傳遞給 AsyncResolver 構造方法。實現代碼如下:

    Java public AsyncResolver(String name, ClusterResolver delegate, List initialValues, int executorThreadPoolSize, int refreshIntervalMs) { this( name, delegate, initialValues, executorThreadPoolSize, refreshIntervalMs, 0 ); ¨K78K }

    • x

666. 彩蛋

T T 一開始看解析器,沒反應過來是委托設計樣式,一臉懵逼+一臉懵逼+一臉懵逼。後面理順了,發現超級奈斯( Nice ) 啊 !!!!

胖友,你學會了麽?

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂