歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— 網絡通信

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Eureka/transport/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. EurekaHttpClient

    • 2.1 EurekaJerseyClientImpl

    • 2.2 EurekaJerseyClientBuilder

  • 3. EurekaHttpClient

    • 3.1 EurekaHttpResponse

    • 3.2 TransportClientFactory

  • 4. AbstractJerseyEurekaHttpClient

    • 4.1 JerseyApplicationClient

    • 4.2 JerseyReplicationClient

  • 5. EurekaHttpClientDecorator

    • 5.1 MetricsCollectingEurekaHttpClient

    • 5.2 RedirectingEurekaHttpClient

    • 5.3 RetryableEurekaHttpClient

    • 5.4 SessionedEurekaHttpClient

  • 6. 創建網絡通訊客戶端

  • 666. 彩蛋


1. 概述

本文主要分享 Eureka 的網絡通信部分。在不考慮 Eureka 2.x 的兼容的情況下,Eureka 1.x 主要兩部分的網絡通信:

  • Eureka-Client 請求 Eureka-Server 的網絡通信

  • Eureka-Server 集群內,Eureka-Server 請求 其它的Eureka-Server 的網絡通信

本文涉及類在 com.netflix.discovery.shared.transport 包下,涉及到主體類的類圖如下( 打開大圖 ):

  • 粉色部分 —— EurekaJerseyClient ,對基於 Jersey Server 的 Eureka-Server 的 Jersey 客戶端封裝。

  • 綠色部分 —— EurekaHttpClient ,Eureka-Server HTTP 訪問客戶端,定義了具體的 Eureka-Server API 呼叫方法。如果把 DiscoveryClient 類比成 Service ,那麼 EurekaHttpClient 可以類比城 Dao 。

  • 綜色部分 —— EurekaHttpClient 實現類,真正實現了具體的 Eureka-Server API 呼叫方法。

  • 紅色部分 —— EurekaHttpClient 委托類,提供了會話、重試、重定向、監控指標收集等特性。

  • 黃色部分 —— EurekaHttpClientFactory,用於創建 EurekaHttpClient 。

類圖看起來很複雜,整體呼叫關係如下( 打開大圖 ):

OK ,我們逐層解析,嗨起來。

推薦 Spring Cloud 書籍

  • 請支持正版。下載盜版,等於主動編寫低級 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 視頻

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. EurekaHttpClient

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient ,EurekaHttpClient 接口。接口代碼如下:

public interface EurekaJerseyClient {

    ApacheHttpClient4 getClient();

    void destroyResources();
}
  • com.sun.jersey.client.apache4.ApacheHttpClient4 ,基於 Apache HttpClient4 實現的 Jersey Client 。

2.1 EurekaJerseyClientImpl

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl ,EurekaHttpClient 實現類。實現代碼如下:

// 超過微信字數上限
  • com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner ,Apache HttpClient 空閑連接清理器,負責周期性關閉處於 half-close 狀態的空閑連接。點擊 鏈接 查看帶中文註釋的 ApacheHttpClientConnectionCleaner。推薦閱讀:《HttpClient容易忽視的細節——連接關閉》 。

2.2 EurekaJerseyClientBuilder

EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 內部類,用於創建 EurekaJerseyClientImpl 。

呼叫 #build() 方法,創建 EurekaJerseyClientImpl ,實現代碼如下:

// EurekaJerseyClientBuilder.java
public EurekaJerseyClient build() {
    MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
    try {
        return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
    } catch (Throwable e) {
        throw new RuntimeException("Cannot create Jersey client ", e);
    }
}
  • MyDefaultApacheHttpClient4Config ,繼承自 com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config ,實現自定義配置。點擊 鏈接 查看帶中文註釋的 MyDefaultApacheHttpClient4Config。例如 :

    • 自定義的請求、響應的編解碼器 `com.netflix.discovery.provider.DiscoveryJerseyProvider` 。

    • 禁用重定向,使用 RedirectingEurekaHttpClient 實現該特性。

    • 自定義 UserAgent 。

    • 自定義 Http Proxy 。

    • SSL 功能的增強。ApacheHttpClient4 使用的是 Apache HttpClient 4.1.1 版本,`com.netflix.discovery.shared.transport.jersey.SSLSocketFactoryAdapter` 將 Apache HttpClient 4.3.4 對 SSL 功能的增強適配到老版本 API 。點擊 鏈接 查看帶中文註釋的 SSLSocketFactoryAdapter。

3. EurekaHttpClient

com.netflix.discovery.shared.transport.EurekaHttpClient ,Eureka-Server HTTP 訪問客戶端,定義了具體的 Eureka-Server API 呼叫方法 。點擊 鏈接 查看帶中文註釋的 EurekaHttpClient。

3.1 EurekaHttpResponse

com.netflix.discovery.shared.transport.EurekaHttpResponse ,請求響應物件,實現代碼如下:

public class EurekaHttpResponse<T{

    /**
     * 傳回狀態碼
     */

    private final int statusCode;
    /**
     * 傳回物件( Entity )
     */

    private final T entity;
    /**
     * 傳回 essay-header
     */

    private final Map essay-headers;
    /**
     * 重定向地址
     */

    private final URI location;

    // ... 省略 setting / getting 和 Builder
}

3.2 TransportClientFactory

com.netflix.discovery.shared.transport.TransportClientFactory ,創建 EurekaHttpClient 的工廠接口。接口代碼如下:

public interface TransportClientFactory {

    /**
     * 創建 EurekaHttpClient
     *
     * @param serviceUrl Eureka-Server 地址
     * @return EurekaHttpClient
     */

    EurekaHttpClient newClient(EurekaEndpoint serviceUrl);

    /**
     * 關閉工廠
     */

    void shutdown();

}

大多數 EurekaHttpClient 實現類都有其對應的工廠實現類

4. AbstractJerseyEurekaHttpClient

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient ,實現 EurekaHttpClient 的抽象類真正實現了具體的 Eureka-Server API 呼叫方法。實現代碼如下:

// 超過微信字數上限
  • jerseyClient 屬性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(…) 方法,獲取 ApacheHttpClient4 。

  • serviceUrl 屬性,請求的 Eureka-Server 地址。

  • #register() 方法,實現向 Eureka-Server 註冊應用實體。其他方法代碼類似

    • x

    • 第 22 至 26 行 :設置請求地址。

    • 第 28 行 :呼叫 #addExtraHeaders(...) 方法,設置請求頭( essay-header )。該方法是抽象方法,提供子類實現自定義的請求頭。代碼如下:

      protected abstract void addExtraHeaders(Builder webResource);
  • 第 29 至 34 行 :請求 Eureka-Server 。

  • 第 35 至 36 行 :解析響應結果,創建 EurekaHttpResponse 。

4.1 JerseyApplicationClient

com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient ,實現 Eureka-Client 請求 Eureka-Server 的網絡通信。點擊 鏈接 查看帶中文註釋的 JerseyApplicationClient。

4.1.1 JerseyEurekaHttpClientFactory

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory ,創建 JerseyApplicationClient 的工廠類。實現代碼如下:

// 超過微信字數上限

4.1.2 JerseyEurekaHttpClientFactoryBuilder

JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 內部類,用於創建 JerseyEurekaHttpClientFactory 。點擊 鏈接 查看帶中文註釋的 JerseyEurekaHttpClientFactory。

呼叫 JerseyEurekaHttpClientFactory#create(...) 方法,創建 JerseyEurekaHttpClientFactory ,實現代碼如下:

// 超過微信字數上限

4.2 JerseyReplicationClient

com.netflix.eureka.transport.JerseyReplicationClient ,Eureka-Server 集群內,Eureka-Server 請求 其它的Eureka-Server 的網絡通信。

  • 實現 AbstractJerseyEurekaHttpClient#addExtraHeaders() 方法,添加自定義頭 x-netflix-discovery-replication=true ,代碼如下:

    @Override
    protected void addExtraHeaders(Builder webResource) {
       webResource.essay-header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }
  • 重寫了 #sendHeartBeat(…) 方法,在 《Eureka 原始碼解析 —— Eureka-Server 集群同步》 有詳細解析。

  • 實現 com.netflix.eureka.cluster.HttpReplicationClient 接口,實現了 #submitBatchUpdates(…) 方法,在 《Eureka 原始碼解析 —— Eureka-Server 集群同步》 有詳細解析。

4.2.1 沒有工廠

JerseyReplicationClient 沒有專屬的工廠

呼叫 JerseyReplicationClient#createReplicationClient(...) 靜態方法,創建 JerseyReplicationClient 。點擊 鏈接 查看帶中文註釋的方法代碼。

5. EurekaHttpClientDecorator

com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator,EurekaHttpClient 委托者抽象類。實現代碼如下:

// 超過微信字數上限
  • #execute(…) 抽象方法,子類實現該方法,實現自己的特性。

  • #register() 方法,實現向 Eureka-Server 註冊應用實體。其他方法代碼類似

    • 呼叫 #execute(…) 方法,並將原有的註冊實現通過 RequestExecutor 傳遞進去。

    • 子類在實現的 #execute(…) 方法,可以呼叫 RequestExecutor#execute(…) 方法,繼續執行原有邏輯。

    • 參考設計樣式:《設計樣式 ( 十九 ) 模板方法樣式Template method(類行為型)》 。

  • RequestType ,請求型別列舉類。代碼如下:

    // EurekaHttpClientDecorator.java
    public enum RequestType {
       Register,
       Cancel,
       SendHeartBeat,
       StatusUpdate,
       DeleteStatusOverride,
       GetApplications,
       GetDelta,
       GetVip,
       GetSecureVip,
       GetApplication,
       GetInstance,
       GetApplicationInstance
    }
  • RequestExecutor ,請求執行器接口。接口代碼如下:

    // EurekaHttpClientDecorator.java
    // 超過微信字數上限

EurekaHttpClientDecorator 的每個實現類實現一個特性,代碼非常非常非常清晰。

FROM 《委托樣式》 
委托樣式是軟體設計樣式中的一項基本技巧。在委托樣式中,有兩個物件參與處理同一個請求,接受請求的物件將請求委托給另一個物件來處理。委托樣式是一項基本技巧,許多其他的樣式,如狀態樣式、策略樣式、訪問者樣式本質上是在更特殊的場合採用了委托樣式。委托樣式使得我們可以用聚合來替代繼承,它還使我們可以模擬mixin。

我們在上圖的基礎上,增加委托的關係,如下圖( 打開大圖 ):

  • 請註意,每個委托著實現類,上面可能有型別為 EurekaHttpClientFactory 的屬性,用於創建其委托的 EurekaHttpClient 。為什麼會有 Factory ?例如,RetryableEurekaHttpClient 重試請求多個 Eureka-Server 地址時,每個 Eureka-Server 地址會創建一個 EurekaHttpClient 。所以,下文涉及到 EurekaHttpClientFactory 和委托的 EurekaHttpClient 的地方,你都需要仔細理解。

5.1 MetricsCollectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient ,監控指標收集 EurekaHttpClient ,配合 Netflix Servo 實現監控信息採集。

#execute() 方法,代碼如下:

// 超過微信字數上限
  • 第 10 行 :呼叫 RequestExecutor#execute(…) 方法,繼續執行請求。

    • `delegate` 屬性,對應 JerseyApplicationClient 。

5.2 RedirectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient ,尋找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。

#execute() 方法,代碼如下:

// 超過微信字數上限
  • 註意:和我們理解的常規的 302 狀態傳回處理不同!!!

  • 整個分成兩部分:【第 4 至 15 行】、【第 16 至 24 行】。

    • 當傳回非 302 狀態碼時,找到非傳回 302 狀態碼的 Eureka-Server 。

    • 當傳回 302 狀態碼時,向新的重定向的 Eureka-Server 執行請求直到成功找到或超過最大次數。

    • 前者,意味著未找到非傳回 302 狀態碼的 Eureka-Server ,此時通過在原始傳遞進來的 `serviceUrls` 執行請求,尋找非 302 狀態碼傳回的 Eureka-Server。

  • 後者,意味著當前已經找到非傳回 302 狀態碼的 Eureka-Server ,直接執行請求。註意 :此時 Eureka-Server 再傳回 302 狀態碼,不再處理。

  • 目前 Eureka 1.x 的 Eureka-Server 不存在傳回 302 狀態碼,猜測和 Eureka 2.X TODO[0028]:寫入集群和讀取集群 有關。

  • 【前者】第 5 行 :使用初始的 serviceEndpoint ( 相當於 serviceUrls ) 創建委托 EurekaHttpClient 。

  • 【前者】第 7 行 :呼叫 #executeOnNewServer(…) 方法,通過執行請求的方式,尋找非 302 狀態碼傳回的 Eureka-Server。實現代碼,點擊 鏈接 查看帶中文註釋的代碼實現。

  • 【前者】【前者】第 9 行 :關閉原有的 delegateRef ( 因為此處可能存在併發,多個執行緒都找到非 302 狀態碼傳回的 Eureka-Server ),並設置當前成功非 302 請求的 EurekaHttpClient 到 delegateRef

  • 【前者】第 13 行 :關閉 currentEurekaClientRef ,當請求發生異常或者超過最大重定向次數。

  • 【後者】第 18 行 :意味著當前已經找到非傳回 302 狀態碼的 Eureka-Server ,直接執行請求。

  • 【後者】第 21 至 22 行 :執行請求發生異常,關閉 currentEurekaClient ,後面要重新非傳回 302 狀態碼的 Eureka-Server 。

5.2.1 工廠

RedirectingEurekaHttpClient 提供 #createFactory(...) 靜態方法獲得創建其的工廠,點擊 鏈接 查看。

5.3 RetryableEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient ,支持向多個 Eureka-Server 請求重試的 EurekaHttpClient 。

#execute() 方法,代碼如下:

// 超過微信字數上限
  • 第 10 行 :當前 currentHttpClient 不存在,意味著原有 delegate 不存在向 Eureka-Server 成功請求的 EurekaHttpClient 。

    • 此時需要從配置中的 Eureka-Server 陣列重試請求,獲得可以請求的 Eureka-Server 。

    • 如果已經存在請求成功的 delegate ,直接使用它進行執行請求。

  • 第 11 至 17 行 :呼叫 #getHostCandidates() 方法,獲得候選的 Eureka-Server serviceUrls 陣列。實現代碼如下:

    // 超過微信字數上限
    • 第 10 行 :最小可用的閥值,配置 eureka.retryableClientQuarantineRefreshPercentage 來設置百分比,預設值:0.66 。

    • 最 13 至 15 行 :quarantineSet 數量超過閥值,清空 quarantineSet ,全部 candidateHosts重試。

    • 第 17 至 24 行 :quarantineSet 數量未超過閥值,移除 candidateHosts 中在 quarantineSet的元素。

    • 第 3 行 :呼叫 ClusterResolver#getClusterEndpoints() 方法,獲得候選的 Eureka-Server 地址陣列( candidateHosts )。註意:該方法傳回的 Eureka-Server 地址陣列,使用以本機 IP 為隨機種子,達到不同 IP 的應用實體獲得的陣列順序不同,而相同 IP 的應用實體獲得的陣列順序一致,效果類似基於 IP HASH 的負載均衡演算法。實現該功能的代碼,在 《Eureka 原始碼解析 —— EndPoint 與 解析器》搜索關鍵字【ResolverUtils#randomize(…)】 詳細解析。

    • 第 6 行 :呼叫 Set#retainAll() 方法,移除隔離的故障 Eureka-Server 地址陣列( quarantineSet ) 中不在 candidateHosts 的元素。

    • 第 8 至 24 行 :在保證最小可用的 candidateHosts,移除在 quarantineSet 的元素。

  • 第 19 至 22 行 :超過 candidateHosts 上限,全部 Eureka-Server 請求失敗,丟擲異常。

  • 第 24 至 26 行 :創建委托的 EurekaHttpClient ,用於下麵請求執行。

  • 第 31 行 :執行請求。

  • 第 33 行 :呼叫 ServerStatusEvaluator#accept() 方法,判斷響應狀態碼和請求型別是否能夠接受。實現代碼如下:

    // ServerStatusEvaluators.java
    // 超過微信字數上限
  • 第 34 行 :請求成功,設置 delegate 。下次請求,優先使用 delegate ,失敗才進行候選的 Eureka-Server 地址陣列重試。

  • 第 47 行 :請求失敗,delegate 若等於 currentHttpClient ,進行清除。

  • 第 50 至 52 行 :請求失敗,將請求的 Eureka-Server 地址添加到 quarantineSet 。

  • 總結來說:

    • 【第一步】若當前有請求成功的 EurekaHttpClient ,繼續使用。若請求失敗,執行【第二步】。

    • 【第二步】若當前無請求成功的 EurekaHttpClient ,獲取候選的 Eureka-Server 地址陣列順序創建新的 EurekaHttpClient,直到成功,或者超過最大重試次數。當請求成功,儲存該 EurekaHttpClient ,下次繼續使用,直到請求失敗。

5.3.1 工廠

RetryableEurekaHttpClient 提供 #createFactory(...) 靜態方法獲得創建其的工廠,點擊 鏈接 查看。

5.4 SessionedEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient ,支持會話的 EurekaHttpClient 。執行定期的重建會話,防止一個 Eureka-Client 永遠只連接一個特定的 Eureka-Server 。反過來,這也保證了 Eureka-Server 集群變更時,Eureka-Client 對 Eureka-Server 連接的負載均衡。

#execute(...) ,代碼如下:

// 超過微信字數上限
  • 第 7 至 12 行 :超過當前會話時間,關閉當前委托的 EurekaHttpClient 。

    • 增加會話過期的隨機性,實現所有 Eureka-Client 的會話過期重連的發生時間更加離散,避免集中時間過期。目前猜測這麼做的目的和 TODO[0028]:寫入集群和讀取集群 有關,即傳回 302 。關聯 1.x new transport enhancements 。

    • 第 10 行 :呼叫 #randomizeSessionDuration(...) 方法,計算計算下一次會話超時時長,公式為 sessionDurationMs * (0.5, 1.5) ,代碼如下:

      protected long randomizeSessionDuration(long sessionDurationMs) {
         long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5));
         return sessionDurationMs + delta;
      }
  • 第 15 至 18 行 :獲得委托的 EurekaHttpClient 。若不存在,創建新的委托的 EurekaHttpClient 。TransportUtils#getOrSetAnotherClient(...) 方法代碼如下:

    // 超過微信字數上限
    • 該方法實現,獲得 eurekaHttpClientRef 里的 EurekaHttpClient 。若獲取不到,將 another 設置到 eurekaHttpClientRef 。當有多個執行緒設置時,有且只有一個執行緒設置成功,另外的設置失敗的執行緒們,意味著當前 eurekaHttpClientRef 有 EurekaHttpClient ,傳回 eurekaHttpClientRef 。

    • 目前該方法存在 BUG ,失敗的執行緒直接傳回 existing 的是 null ,需要修改成 return eurekaHttpClientRef.get() 。模擬重現該 BUG 代碼如下 :

  • 第 19 行 :執行請求。

5.4.1 沒有工廠

在 SessionedEurekaHttpClient 類里,沒有實現創建其的工廠。在 「6. 創建網絡通訊客戶端」搜索 canonicalClientFactory ,可以看到 EurekaHttpClients#canonicalClientFactory(...) 方法,內部有 SessionedEurekaHttpClient 的創建工廠。

6. 創建網絡通訊客戶端

對於 Eureka-Server 來說,呼叫 JerseyReplicationClient#createReplicationClient(...) 靜態方法即可創建用於 Eureka-Server 集群內,Eureka-Server 請求 其它的Eureka-Server 的網絡通信客戶端。

對於 Eureka-Client 來說,分成用於註冊應用實體( registrationClient )查詢註冊信息( newQueryClient )兩個不同網絡通信客戶端。在 DiscoveryClient 初始化時進行創建,代碼如下:

// DiscoveryClient.class
// 超過微信字數上限
  • 第 18 至 27 行 :呼叫 Jersey1TransportClientFactories#newTransportClientFactory(...) 方法,創建 registrationClient 和 queryClient 公用的委托的 EurekaHttpClientFactory ,代碼如下:

    // Jersey1TransportClientFactories.java
    // 超過微信字數上限
    • 在 TransportClientFactory 里委托 JerseyEurekaHttpClientFactory 。

  • 第 34 至 49 行 :呼叫 EurekaHttpClients#registrationClientFactory(...) 方法,創建 registrationClient 的 EurekaHttpClientFactory ,代碼如下 :

    // EurekaHttpClients.java
    // 超過微信字數上限
  • 第 51 至 71 行 :呼叫 EurekaHttpClients#queryClientFactory(...) 方法,創建 queryClient 的 EurekaHttpClientFactory ,代碼如下 :

    // EurekaHttpClients.java
    // 超過微信字數上限

666. 彩蛋

這次真的是彩蛋,我們將整體呼叫關係調整如下如下( 打開大圖 ):

胖友,你學會了麽?

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂