歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— 網路通訊

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Eureka/transport/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. EurekaHttpClient

    • 2.1 EurekaJerseyClientImpl

    • 2.2 EurekaJerseyClientBuilder

  • 3. EurekaHttpClient

    • 3.1 EurekaHttpResponse

    • 3.2 TransportClientFactory

  • 4. AbstractJerseyEurekaHttpClient

    • 4.1 JerseyApplicationClient

    • 4.2 JerseyReplicationClient

  • 5. EurekaHttpClientDecorator

    • 5.1 MetricsCollectingEurekaHttpClient

    • 5.2 RedirectingEurekaHttpClient

    • 5.3 RetryableEurekaHttpClient

    • 5.4 SessionedEurekaHttpClient

  • 6. 建立網路通訊客戶端

  • 666. 彩蛋


1. 概述

本文主要分享 Eureka 的網路通訊部分。在不考慮 Eureka 2.x 的相容的情況下,Eureka 1.x 主要兩部分的網路通訊:

  • Eureka-Client 請求 Eureka-Server 的網路通訊

  • Eureka-Server 叢集內,Eureka-Server 請求 其它的Eureka-Server 的網路通訊

本文涉及類在 com.netflix.discovery.shared.transport 包下,涉及到主體類的類圖如下( 開啟大圖 ):

  • 粉色部分 —— EurekaJerseyClient ,對基於 Jersey Server 的 Eureka-Server 的 Jersey 客戶端封裝。

  • 綠色部分 —— EurekaHttpClient ,Eureka-Server HTTP 訪問客戶端,定義了具體的 Eureka-Server API 呼叫方法。如果把 DiscoveryClient 類比成 Service ,那麼 EurekaHttpClient 可以類比城 Dao 。

  • 綜色部分 —— EurekaHttpClient 實現類,真正實現了具體的 Eureka-Server API 呼叫方法。

  • 紅色部分 —— EurekaHttpClient 委託類,提供了會話、重試、重定向、監控指標收集等特性。

  • 黃色部分 —— EurekaHttpClientFactory,用於建立 EurekaHttpClient 。

類圖看起來很複雜,整體呼叫關係如下( 開啟大圖 ):

OK ,我們逐層解析,嗨起來。

推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 影片

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. EurekaHttpClient

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClient ,EurekaHttpClient 介面。介面程式碼如下:

public interface EurekaJerseyClient {

    ApacheHttpClient4 getClient();

    void destroyResources();
}
  • com.sun.jersey.client.apache4.ApacheHttpClient4 ,基於 Apache HttpClient4 實現的 Jersey Client 。

2.1 EurekaJerseyClientImpl

com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl ,EurekaHttpClient 實現類。實現程式碼如下:

// 超過微信字數上限
  • com.netflix.discovery.shared.transport.jersey.ApacheHttpClientConnectionCleaner ,Apache HttpClient 空閑連線清理器,負責週期性關閉處於 half-close 狀態的空閑連線。點選 連結 檢視帶中文註釋的 ApacheHttpClientConnectionCleaner。推薦閱讀:《HttpClient容易忽視的細節——連線關閉》 。

2.2 EurekaJerseyClientBuilder

EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 內部類,用於建立 EurekaJerseyClientImpl 。

呼叫 #build() 方法,建立 EurekaJerseyClientImpl ,實現程式碼如下:

// EurekaJerseyClientBuilder.java
public EurekaJerseyClient build() {
    MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
    try {
        return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
    } catch (Throwable e) {
        throw new RuntimeException("Cannot create Jersey client ", e);
    }
}
  • MyDefaultApacheHttpClient4Config ,繼承自 com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config ,實現自定義配置。點選 連結 檢視帶中文註釋的 MyDefaultApacheHttpClient4Config。例如 :

    • 自定義的請求、響應的編解碼器 `com.netflix.discovery.provider.DiscoveryJerseyProvider` 。

    • 禁用重定向,使用 RedirectingEurekaHttpClient 實現該特性。

    • 自定義 UserAgent 。

    • 自定義 Http Proxy 。

    • SSL 功能的增強。ApacheHttpClient4 使用的是 Apache HttpClient 4.1.1 版本,`com.netflix.discovery.shared.transport.jersey.SSLSocketFactoryAdapter` 將 Apache HttpClient 4.3.4 對 SSL 功能的增強適配到老版本 API 。點選 連結 檢視帶中文註釋的 SSLSocketFactoryAdapter。

3. EurekaHttpClient

com.netflix.discovery.shared.transport.EurekaHttpClient ,Eureka-Server HTTP 訪問客戶端,定義了具體的 Eureka-Server API 呼叫方法 。點選 連結 檢視帶中文註釋的 EurekaHttpClient。

3.1 EurekaHttpResponse

com.netflix.discovery.shared.transport.EurekaHttpResponse ,請求響應物件,實現程式碼如下:

public class EurekaHttpResponse<T{

    /**
     * 傳回狀態碼
     */

    private final int statusCode;
    /**
     * 傳回物件( Entity )
     */

    private final T entity;
    /**
     * 傳回 essay-header
     */

    private final Map essay-headers;
    /**
     * 重定向地址
     */

    private final URI location;

    // ... 省略 setting / getting 和 Builder
}

3.2 TransportClientFactory

com.netflix.discovery.shared.transport.TransportClientFactory ,建立 EurekaHttpClient 的工廠介面。介面程式碼如下:

public interface TransportClientFactory {

    /**
     * 建立 EurekaHttpClient
     *
     * @param serviceUrl Eureka-Server 地址
     * @return EurekaHttpClient
     */

    EurekaHttpClient newClient(EurekaEndpoint serviceUrl);

    /**
     * 關閉工廠
     */

    void shutdown();

}

大多數 EurekaHttpClient 實現類都有其對應的工廠實現類

4. AbstractJerseyEurekaHttpClient

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient ,實現 EurekaHttpClient 的抽象類真正實現了具體的 Eureka-Server API 呼叫方法。實現程式碼如下:

// 超過微信字數上限
  • jerseyClient 屬性,Jersey Client ,使用上文的 EurekaHttpClient#getClient(…) 方法,獲取 ApacheHttpClient4 。

  • serviceUrl 屬性,請求的 Eureka-Server 地址。

  • #register() 方法,實現向 Eureka-Server 註冊應用實體。其他方法程式碼類似

    • x

    • 第 22 至 26 行 :設定請求地址。

    • 第 28 行 :呼叫 #addExtraHeaders(...) 方法,設定請求頭( essay-header )。該方法是抽象方法,提供子類實現自定義的請求頭。程式碼如下:

      protected abstract void addExtraHeaders(Builder webResource);
  • 第 29 至 34 行 :請求 Eureka-Server 。

  • 第 35 至 36 行 :解析響應結果,建立 EurekaHttpResponse 。

4.1 JerseyApplicationClient

com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient ,實現 Eureka-Client 請求 Eureka-Server 的網路通訊。點選 連結 檢視帶中文註釋的 JerseyApplicationClient。

4.1.1 JerseyEurekaHttpClientFactory

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory ,建立 JerseyApplicationClient 的工廠類。實現程式碼如下:

// 超過微信字數上限

4.1.2 JerseyEurekaHttpClientFactoryBuilder

JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 內部類,用於建立 JerseyEurekaHttpClientFactory 。點選 連結 檢視帶中文註釋的 JerseyEurekaHttpClientFactory。

呼叫 JerseyEurekaHttpClientFactory#create(...) 方法,建立 JerseyEurekaHttpClientFactory ,實現程式碼如下:

// 超過微信字數上限

4.2 JerseyReplicationClient

com.netflix.eureka.transport.JerseyReplicationClient ,Eureka-Server 叢集內,Eureka-Server 請求 其它的Eureka-Server 的網路通訊。

  • 實現 AbstractJerseyEurekaHttpClient#addExtraHeaders() 方法,新增自定義頭 x-netflix-discovery-replication=true ,程式碼如下:

    @Override
    protected void addExtraHeaders(Builder webResource) {
       webResource.essay-header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }
  • 重寫了 #sendHeartBeat(…) 方法,在 《Eureka 原始碼解析 —— Eureka-Server 叢集同步》 有詳細解析。

  • 實現 com.netflix.eureka.cluster.HttpReplicationClient 介面,實現了 #submitBatchUpdates(…) 方法,在 《Eureka 原始碼解析 —— Eureka-Server 叢集同步》 有詳細解析。

4.2.1 沒有工廠

JerseyReplicationClient 沒有專屬的工廠

呼叫 JerseyReplicationClient#createReplicationClient(...) 靜態方法,建立 JerseyReplicationClient 。點選 連結 檢視帶中文註釋的方法程式碼。

5. EurekaHttpClientDecorator

com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator,EurekaHttpClient 委託者抽象類。實現程式碼如下:

// 超過微信字數上限
  • #execute(…) 抽象方法,子類實現該方法,實現自己的特性。

  • #register() 方法,實現向 Eureka-Server 註冊應用實體。其他方法程式碼類似

    • 呼叫 #execute(…) 方法,並將原有的註冊實現透過 RequestExecutor 傳遞進去。

    • 子類在實現的 #execute(…) 方法,可以呼叫 RequestExecutor#execute(…) 方法,繼續執行原有邏輯。

    • 參考設計樣式:《設計樣式 ( 十九 ) 模板方法樣式Template method(類行為型)》 。

  • RequestType ,請求型別列舉類。程式碼如下:

    // EurekaHttpClientDecorator.java
    public enum RequestType {
       Register,
       Cancel,
       SendHeartBeat,
       StatusUpdate,
       DeleteStatusOverride,
       GetApplications,
       GetDelta,
       GetVip,
       GetSecureVip,
       GetApplication,
       GetInstance,
       GetApplicationInstance
    }
  • RequestExecutor ,請求執行器介面。介面程式碼如下:

    // EurekaHttpClientDecorator.java
    // 超過微信字數上限

EurekaHttpClientDecorator 的每個實現類實現一個特性,程式碼非常非常非常清晰。

FROM 《委託樣式》 
委託樣式是軟體設計樣式中的一項基本技巧。在委託樣式中,有兩個物件參與處理同一個請求,接受請求的物件將請求委託給另一個物件來處理。委託樣式是一項基本技巧,許多其他的樣式,如狀態樣式、策略樣式、訪問者樣式本質上是在更特殊的場合採用了委託樣式。委託樣式使得我們可以用聚合來替代繼承,它還使我們可以模擬mixin。

我們在上圖的基礎上,增加委託的關係,如下圖( 開啟大圖 ):

  • 請註意,每個委託著實現類,上面可能有型別為 EurekaHttpClientFactory 的屬性,用於建立其委託的 EurekaHttpClient 。為什麼會有 Factory ?例如,RetryableEurekaHttpClient 重試請求多個 Eureka-Server 地址時,每個 Eureka-Server 地址會建立一個 EurekaHttpClient 。所以,下文涉及到 EurekaHttpClientFactory 和委託的 EurekaHttpClient 的地方,你都需要仔細理解。

5.1 MetricsCollectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient ,監控指標收集 EurekaHttpClient ,配合 Netflix Servo 實現監控資訊採集。

#execute() 方法,程式碼如下:

// 超過微信字數上限
  • 第 10 行 :呼叫 RequestExecutor#execute(…) 方法,繼續執行請求。

    • `delegate` 屬性,對應 JerseyApplicationClient 。

5.2 RedirectingEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient ,尋找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。

#execute() 方法,程式碼如下:

// 超過微信字數上限
  • 註意:和我們理解的常規的 302 狀態傳回處理不同!!!

  • 整個分成兩部分:【第 4 至 15 行】、【第 16 至 24 行】。

    • 當傳回非 302 狀態碼時,找到非傳回 302 狀態碼的 Eureka-Server 。

    • 當傳回 302 狀態碼時,向新的重定向的 Eureka-Server 執行請求直到成功找到或超過最大次數。

    • 前者,意味著未找到非傳回 302 狀態碼的 Eureka-Server ,此時透過在原始傳遞進來的 `serviceUrls` 執行請求,尋找非 302 狀態碼傳回的 Eureka-Server。

  • 後者,意味著當前已經找到非傳回 302 狀態碼的 Eureka-Server ,直接執行請求。註意 :此時 Eureka-Server 再傳回 302 狀態碼,不再處理。

  • 目前 Eureka 1.x 的 Eureka-Server 不存在傳回 302 狀態碼,猜測和 Eureka 2.X TODO[0028]:寫入叢集和讀取叢集 有關。

  • 【前者】第 5 行 :使用初始的 serviceEndpoint ( 相當於 serviceUrls ) 建立委託 EurekaHttpClient 。

  • 【前者】第 7 行 :呼叫 #executeOnNewServer(…) 方法,透過執行請求的方式,尋找非 302 狀態碼傳回的 Eureka-Server。實現程式碼,點選 連結 檢視帶中文註釋的程式碼實現。

  • 【前者】【前者】第 9 行 :關閉原有的 delegateRef ( 因為此處可能存在併發,多個執行緒都找到非 302 狀態碼傳回的 Eureka-Server ),並設定當前成功非 302 請求的 EurekaHttpClient 到 delegateRef

  • 【前者】第 13 行 :關閉 currentEurekaClientRef ,當請求發生異常或者超過最大重定向次數。

  • 【後者】第 18 行 :意味著當前已經找到非傳回 302 狀態碼的 Eureka-Server ,直接執行請求。

  • 【後者】第 21 至 22 行 :執行請求發生異常,關閉 currentEurekaClient ,後面要重新非傳回 302 狀態碼的 Eureka-Server 。

5.2.1 工廠

RedirectingEurekaHttpClient 提供 #createFactory(...) 靜態方法獲得建立其的工廠,點選 連結 檢視。

5.3 RetryableEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient ,支援向多個 Eureka-Server 請求重試的 EurekaHttpClient 。

#execute() 方法,程式碼如下:

// 超過微信字數上限
  • 第 10 行 :當前 currentHttpClient 不存在,意味著原有 delegate 不存在向 Eureka-Server 成功請求的 EurekaHttpClient 。

    • 此時需要從配置中的 Eureka-Server 陣列重試請求,獲得可以請求的 Eureka-Server 。

    • 如果已經存在請求成功的 delegate ,直接使用它進行執行請求。

  • 第 11 至 17 行 :呼叫 #getHostCandidates() 方法,獲得候選的 Eureka-Server serviceUrls 陣列。實現程式碼如下:

    // 超過微信字數上限
    • 第 10 行 :最小可用的閥值,配置 eureka.retryableClientQuarantineRefreshPercentage 來設定百分比,預設值:0.66 。

    • 最 13 至 15 行 :quarantineSet 數量超過閥值,清空 quarantineSet ,全部 candidateHosts重試。

    • 第 17 至 24 行 :quarantineSet 數量未超過閥值,移除 candidateHosts 中在 quarantineSet的元素。

    • 第 3 行 :呼叫 ClusterResolver#getClusterEndpoints() 方法,獲得候選的 Eureka-Server 地址陣列( candidateHosts )。註意:該方法傳回的 Eureka-Server 地址陣列,使用以本機 IP 為隨機種子,達到不同 IP 的應用實體獲得的陣列順序不同,而相同 IP 的應用實體獲得的陣列順序一致,效果類似基於 IP HASH 的負載均衡演演算法。實現該功能的程式碼,在 《Eureka 原始碼解析 —— EndPoint 與 解析器》搜尋關鍵字【ResolverUtils#randomize(…)】 詳細解析。

    • 第 6 行 :呼叫 Set#retainAll() 方法,移除隔離的故障 Eureka-Server 地址陣列( quarantineSet ) 中不在 candidateHosts 的元素。

    • 第 8 至 24 行 :在保證最小可用的 candidateHosts,移除在 quarantineSet 的元素。

  • 第 19 至 22 行 :超過 candidateHosts 上限,全部 Eureka-Server 請求失敗,丟擲異常。

  • 第 24 至 26 行 :建立委託的 EurekaHttpClient ,用於下麵請求執行。

  • 第 31 行 :執行請求。

  • 第 33 行 :呼叫 ServerStatusEvaluator#accept() 方法,判斷響應狀態碼和請求型別是否能夠接受。實現程式碼如下:

    // ServerStatusEvaluators.java
    // 超過微信字數上限
  • 第 34 行 :請求成功,設定 delegate 。下次請求,優先使用 delegate ,失敗才進行候選的 Eureka-Server 地址陣列重試。

  • 第 47 行 :請求失敗,delegate 若等於 currentHttpClient ,進行清除。

  • 第 50 至 52 行 :請求失敗,將請求的 Eureka-Server 地址新增到 quarantineSet 。

  • 總結來說:

    • 【第一步】若當前有請求成功的 EurekaHttpClient ,繼續使用。若請求失敗,執行【第二步】。

    • 【第二步】若當前無請求成功的 EurekaHttpClient ,獲取候選的 Eureka-Server 地址陣列順序建立新的 EurekaHttpClient,直到成功,或者超過最大重試次數。當請求成功,儲存該 EurekaHttpClient ,下次繼續使用,直到請求失敗。

5.3.1 工廠

RetryableEurekaHttpClient 提供 #createFactory(...) 靜態方法獲得建立其的工廠,點選 連結 檢視。

5.4 SessionedEurekaHttpClient

com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient ,支援會話的 EurekaHttpClient 。執行定期的重建會話,防止一個 Eureka-Client 永遠只連線一個特定的 Eureka-Server 。反過來,這也保證了 Eureka-Server 叢集變更時,Eureka-Client 對 Eureka-Server 連線的負載均衡。

#execute(...) ,程式碼如下:

// 超過微信字數上限
  • 第 7 至 12 行 :超過當前會話時間,關閉當前委託的 EurekaHttpClient 。

    • 增加會話過期的隨機性,實現所有 Eureka-Client 的會話過期重連的發生時間更加離散,避免集中時間過期。目前猜測這麼做的目的和 TODO[0028]:寫入叢集和讀取叢集 有關,即傳回 302 。關聯 1.x new transport enhancements 。

    • 第 10 行 :呼叫 #randomizeSessionDuration(...) 方法,計算計算下一次會話超時時長,公式為 sessionDurationMs * (0.5, 1.5) ,程式碼如下:

      protected long randomizeSessionDuration(long sessionDurationMs) {
         long delta = (long) (sessionDurationMs * (random.nextDouble() - 0.5));
         return sessionDurationMs + delta;
      }
  • 第 15 至 18 行 :獲得委託的 EurekaHttpClient 。若不存在,建立新的委託的 EurekaHttpClient 。TransportUtils#getOrSetAnotherClient(...) 方法程式碼如下:

    // 超過微信字數上限
    • 該方法實現,獲得 eurekaHttpClientRef 裡的 EurekaHttpClient 。若獲取不到,將 another 設定到 eurekaHttpClientRef 。當有多個執行緒設定時,有且只有一個執行緒設定成功,另外的設定失敗的執行緒們,意味著當前 eurekaHttpClientRef 有 EurekaHttpClient ,傳回 eurekaHttpClientRef 。

    • 目前該方法存在 BUG ,失敗的執行緒直接傳回 existing 的是 null ,需要修改成 return eurekaHttpClientRef.get() 。模擬重現該 BUG 程式碼如下 :

  • 第 19 行 :執行請求。

5.4.1 沒有工廠

在 SessionedEurekaHttpClient 類裡,沒有實現建立其的工廠。在 「6. 建立網路通訊客戶端」搜尋 canonicalClientFactory ,可以看到 EurekaHttpClients#canonicalClientFactory(...) 方法,內部有 SessionedEurekaHttpClient 的建立工廠。

6. 建立網路通訊客戶端

對於 Eureka-Server 來說,呼叫 JerseyReplicationClient#createReplicationClient(...) 靜態方法即可建立用於 Eureka-Server 叢集內,Eureka-Server 請求 其它的Eureka-Server 的網路通訊客戶端。

對於 Eureka-Client 來說,分成用於註冊應用實體( registrationClient )查詢註冊資訊( newQueryClient )兩個不同網路通訊客戶端。在 DiscoveryClient 初始化時進行建立,程式碼如下:

// DiscoveryClient.class
// 超過微信字數上限
  • 第 18 至 27 行 :呼叫 Jersey1TransportClientFactories#newTransportClientFactory(...) 方法,建立 registrationClient 和 queryClient 公用的委託的 EurekaHttpClientFactory ,程式碼如下:

    // Jersey1TransportClientFactories.java
    // 超過微信字數上限
    • 在 TransportClientFactory 裡委託 JerseyEurekaHttpClientFactory 。

  • 第 34 至 49 行 :呼叫 EurekaHttpClients#registrationClientFactory(...) 方法,建立 registrationClient 的 EurekaHttpClientFactory ,程式碼如下 :

    // EurekaHttpClients.java
    // 超過微信字數上限
  • 第 51 至 71 行 :呼叫 EurekaHttpClients#queryClientFactory(...) 方法,建立 queryClient 的 EurekaHttpClientFactory ,程式碼如下 :

    // EurekaHttpClients.java
    // 超過微信字數上限

666. 彩蛋

這次真的是彩蛋,我們將整體呼叫關係調整如下如下( 開啟大圖 ):

胖友,你學會了麼?

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支援老艿艿↓↓

贊(0)

分享創造快樂