歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— 應用實體註冊發現(六)之全量獲取

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

摘要: 原創出處 http://www.iocoder.cn/Eureka/instance-registry-fetch-all/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. Eureka-Client 發起全量獲取

    • 2.1 初始化全量獲取

    • 2.2 定時獲取

    • 2.3 掃清註冊資訊快取

    • 2.4 發起獲取註冊資訊

  • 3. Eureka-Server 接收全量獲取

    • 3.1 接收全量獲取請求

    • 3.2 響應快取 ResponseCache

    • 3.3 快取讀取

    • 3.4 主動過期讀寫快取

    • 3.5 被動過期讀寫快取

    • 3.6 定時掃清只讀快取

  • 666. 彩蛋


1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 獲取全量註冊資訊的過程

FROM 《深度剖析服務發現元件Netflix Eureka》 

Eureka-Client 獲取註冊資訊,分成全量獲取增量獲取。預設配置下,Eureka-Client 啟動時,首先執行一次全量獲取進行本地快取註冊資訊,而後每 30 秒增量獲取掃清本地快取( 非“正常”情況下會是全量獲取 )。

本文重點在於全量獲取

推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 影片

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. Eureka-Client 發起全量獲取

本小節呼叫關係如下:

2.1 初始化全量獲取

Eureka-Client 啟動時,首先執行一次全量獲取進行本地快取註冊資訊,首先程式碼如下:

// DiscoveryClient.java
/**
* Applications 在本地的快取
*/

private final AtomicReference localRegionApps = new AtomicReference();

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                   Provider backupRegistryProvider) {

    // ... 省略無關程式碼

   // 【3.2.5】初始化應用集合在本地的快取
   localRegionApps.set(new Applications());

   // ... 省略無關程式碼    

   // 【3.2.12】從 Eureka-Server 拉取註冊資訊
   if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
       fetchRegistryFromBackup();
   }

    // ... 省略無關程式碼      
}
  • com.netflix.discovery.shared.Applications,註冊的應用集合。較為容易理解,點選 連結 連結檢視帶中文註釋的類,這裡就不囉嗦了。Applications 與 InstanceInfo 類關係如下:

  • 配置 eureka.shouldFetchRegistry = true,開啟從 Eureka-Server 獲取註冊資訊。預設值:true 。

  • 呼叫 #fetchRegistry(false) 方法,從 Eureka-Server 全量獲取註冊資訊,在 「2.4 發起獲取註冊資訊」 詳細解析。

2.2 定時獲取

Eureka-Client 在初始化過程中,建立獲取註冊資訊執行緒,固定間隔向 Eureka-Server 發起獲取註冊資訊( fetch ),掃清本地註冊資訊快取。實現程式碼如下:

// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
              Provider backupRegistryProvider) {
   // ... 省略無關程式碼

   // 【3.2.9】初始化執行緒池
   // default size of 2 - 1 each for heartbeat and cacheRefresh
   scheduler = Executors.newScheduledThreadPool(2,
        new ThreadFactoryBuilder()
                .setNameFormat("DiscoveryClient-%d")
                .setDaemon(true)
                .build());

   cacheRefreshExecutor = new ThreadPoolExecutor(
        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
        new SynchronousQueue(),
        new ThreadFactoryBuilder()
                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                .setDaemon(true)
                .build()
    );  // use direct handoff

   // ... 省略無關程式碼

   // 【3.2.14】初始化定時任務
   initScheduledTasks();

   // ... 省略無關程式碼
}

private void initScheduledTasks() {
   // 向 Eureka-Server 心跳(續租)執行器
   if (clientConfig.shouldFetchRegistry()) {
      // registry cache refresh timer
      int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
      int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
      scheduler.schedule(
              new TimedSupervisorTask(
                      "cacheRefresh",
                      scheduler,
                      cacheRefreshExecutor,
                      registryFetchIntervalSeconds,
                      TimeUnit.SECONDS,
                      expBackOffBound,
                      new CacheRefreshThread()
              ),
              registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    // ... 省略無關程式碼
}
  • 初始化定時任務程式碼,和續租的定時任務程式碼類似,在 《Eureka 原始碼解析 —— 應用實體註冊發現(二)之續租 》 有詳細解析,這裡不重覆分享。

  • com.netflix.discovery.DiscoveryClient.CacheRefreshThread,註冊資訊快取掃清任務,實現程式碼如下:

    class CacheRefreshThread implements Runnable {
       public void run() {
           refreshRegistry();
       }
    }
    • 呼叫 #refreshRegistry(false) 方法,掃清註冊資訊快取,在 「2.3 掃清註冊資訊快取」 詳細解析。

2.3 掃清註冊資訊快取

呼叫 #refreshRegistry(false) 方法,掃清註冊資訊快取,實現程式碼如下:

// DiscoveryClient.java
 1: void refreshRegistry() {
 2:     try {
 3:         // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
 4:         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
 5:
 6:         boolean remoteRegionsModified = false;
 7:         // This makes sure that a dynamic change to remote regions to fetch is honored.
 8:         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
 9:         if (null != latestRemoteRegions) {
10:             String currentRemoteRegions = remoteRegionsToFetch.get();
11:             if (!latestRemoteRegions.equals(currentRemoteRegions)) {
12:                 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
13:                 synchronized (instanceRegionChecker.getAzToRegionMapper()) {
14:                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
15:                         String[] remoteRegions = latestRemoteRegions.split(",");
16:                         remoteRegionsRef.set(remoteRegions);
17:                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
18:                         remoteRegionsModified = true;
19:                     } else {
20:                         logger.info("Remote regions to fetch modified concurrently," +
21:                                 " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
22:                     }
23:                 }
24:             } else {
25:                 // Just refresh mapping to reflect any DNS/Property change
26:                 instanceRegionChecker.getAzToRegionMapper().refreshMapping();
27:             }
28:         }
29:
30:         boolean success = fetchRegistry(remoteRegionsModified);
31:         if (success) {
32:             // 設定 註冊資訊的應用實體數
33:             registrySize = localRegionApps.get().size();
34:             // 設定 最後獲取註冊資訊時間
35:             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
36:         }
37:
38:         // 列印日誌
39:         if (logger.isDebugEnabled()) {
40:             StringBuilder allAppsHashCodes = new StringBuilder();
41:             allAppsHashCodes.append("Local region apps hashcode: ");
42:             allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
43:             allAppsHashCodes.append(", is fetching remote regions? ");
44:             allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
45:             for (Map.Entry entry : remoteRegionVsApps.entrySet()) {
46:                 allAppsHashCodes.append(", Remote region: ");
47:                 allAppsHashCodes.append(entry.getKey());
48:                 allAppsHashCodes.append(" , apps hashcode: ");
49:                 allAppsHashCodes.append(entry.getValue().getAppsHashCode());
50:             }
51:             logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
52:                     allAppsHashCodes.toString());
53:         }
54:     } catch (Throwable e) {
55:         logger.error("Cannot fetch registry from server", e);
56:     }        
57: }
  • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry

  • 第 30 行 :呼叫 #fetchRegistry(false) 方法,從 Eureka-Server 獲取註冊資訊,在 「2.4 發起獲取註冊資訊」 詳細解析。

  • 第 31 至 36 行 :獲取註冊資訊成功,設定註冊資訊的應用實體數,最後獲取註冊資訊時間。變數程式碼如下:

    /**
    * 註冊資訊的應用實體數
    */

    private volatile int registrySize = 0;
    /**
    * 最後成功從 Eureka-Server 拉取註冊資訊時間戳
    */

    private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
  • 第 38 至 53 行 :列印除錯日誌。

  • 第 54 至 56 行 :列印異常日誌。

2.4 發起獲取註冊資訊

呼叫 #fetchRegistry(false) 方法,從 Eureka-Server 獲取註冊資訊( 根據條件判斷,可能是全量,也可能是增量 ),實現程式碼如下:

  1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
 2:     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
 3:
 4:     try {
 5:         // 獲取 本地快取的註冊的應用實體集合
 6:         // If the delta is disabled or if it is the first time, get all
 7:         // applications
 8:         Applications applications = getApplications();
 9:
10:         // 全量獲取
11:         if (clientConfig.shouldDisableDelta() // 禁用增量獲取
12:                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
13:                 || forceFullRegistryFetch
14:                 || (applications == null) // 空
15:                 || (applications.getRegisteredApplications().size() == 0) // 空
16:                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
17:         {
18:             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
19:             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
20:             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
21:             logger.info("Application is null : {}", (applications == null));
22:             logger.info("Registered Applications size is zero : {}",
23:                     (applications.getRegisteredApplications().size() == 0));
24:             logger.info("Application version is -1: {}", (applications.getVersion() == -1));
25:             // 執行 全量獲取
26:             getAndStoreFullRegistry();
27:         } else {
28:             // 執行 增量獲取
29:             getAndUpdateDelta(applications);
30:         }
31:         // 設定 應用集合 hashcode
32:         applications.setAppsHashCode(applications.getReconcileHashCode());
33:         // 列印 本地快取的註冊的應用實體數量
34:         logTotalInstances();
35:     } catch (Throwable e) {
36:         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
37:         return false;
38:     } finally {
39:         if (tracer != null) {
40:             tracer.stop();
41:         }
42:     }
43:
44:     // Notify about cache refresh before updating the instance remote status
45:     onCacheRefreshed();
46:
47:     // Update remote status based on refreshed data held in the cache
48:     updateInstanceRemoteStatus();
49:
50:     // registry was fetched successfully, so return true
51:     return true;
52: }
  • 第 5 至 8 行 :獲取本地快取的註冊的應用實體集合,實現程式碼如下:

    public Applications getApplications() {
      return localRegionApps.get();
    }
  • 第 10 至 26 行 :全量獲取註冊資訊。

    • 第 11 行 :配置 eureka.disableDelta = true ,禁用增量獲取註冊資訊。預設值:false 。

    • 第 12 行 :只獲得一個 vipAddress 對應的應用實體們的註冊資訊。

    • 第 13 行 :方法引數 forceFullRegistryFetch 強制全量獲取註冊資訊。

    • 第 14 至 15 行 :本地快取為空。

    • 第 25 至 26 行 :呼叫 #getAndStoreFullRegistry() 方法,全量獲取註冊資訊,並設定到本地快取。下文詳細解析。

  • 第 27 至 30 行 :增量獲取註冊資訊,並掃清本地快取,在 《Eureka 原始碼解析 —— 應用實體註冊發現 (七)之增量獲取》 詳細解析。

  • 第 31 至 32 行 :計算應用集合 hashcode 。該變數用於校驗增量獲取的註冊資訊和 Eureka-Server 全量的註冊資訊是否一致( 完整 ),在 《Eureka 原始碼解析 —— 應用實體註冊發現 (七)之增量獲取》 詳細解析。

  • 第 33 至 34 行 :列印除錯日誌,輸出本地快取的註冊的應用實體數量。實現程式碼如下:

    private void logTotalInstances() {
      if (logger.isDebugEnabled()) {
          int totInstances = 0;
          for (Application application : getApplications().getRegisteredApplications()) {
              totInstances += application.getInstancesAsIsFromEureka().size();
          }
          logger.debug("The total number of all instances in the client now is {}", totInstances);
      }
    }
  • 第 44 至 45 行 :觸發 CacheRefreshedEvent 事件,事件監聽器執行。目前 Eureka 未提供預設的該事件監聽器。

    • x

    • #onCacheRefreshed() 方法,實現程式碼如下:

      /**
      * Eureka 事件監聽器
      */

      private final CopyOnWriteArraySet eventListeners = new CopyOnWriteArraySet<>();

      protected void onCacheRefreshed() {
         fireEvent(new CacheRefreshedEvent());
      }

      protected void fireEvent(final EurekaEvent event) {
         for (EurekaEventListener listener : eventListeners) {
             listener.onEvent(event);
         }
      }
    • 筆者的YY :你可以實現自定義的事件監聽器監聽 CacheRefreshedEvent 事件,以達到持久化最新的註冊資訊到儲存器( 例如,本地檔案 ),透過這樣的方式,配合實現 BackupRegistry 介面讀取儲存器。BackupRegistry 介面呼叫如下:

      // 【3.2.12】從 Eureka-Server 拉取註冊資訊
      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
         fetchRegistryFromBackup();
      }
  • 第47 至 48 行 :更新本地快取的當前應用實體在 Eureka-Server 的狀態。

      1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 
     2:
     3: private synchronized void updateInstanceRemoteStatus() {
     4:     // Determine this instance's status for this app and set to UNKNOWN if not found
     5:     InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
     6:     if (instanceInfo.getAppName() != null) {
     7:         Application app = getApplication(instanceInfo.getAppName());
     8:         if (app != null) {
     9:             InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
    10:             if (remoteInstanceInfo != null) {
    11:                 currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
    12:             }
    13:         }
    14:     }
    15:     if (currentRemoteInstanceStatus == null) {
    16:         currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
    17:     }
    18:
    19:     // Notify if status changed
    20:     if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
    21:         onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
    22:         lastRemoteInstanceStatus = currentRemoteInstanceStatus;
    23:     }
    24: }
    • Eureka-Client 本地應用實體與 Eureka-Server 的該應用實體狀態不同的原因,因為應用實體的改寫狀態,在 《Eureka 原始碼解析 —— 應用實體註冊發現 (八)之改寫狀態》 有詳細解析。

    • 第 4 至 14 行 :從註冊資訊中獲取當前應用在 Eureka-Server 的狀態。

    • 第 19 至 23 行 :對比本地快取最新的的當前應用實體在 Eureka-Server 的狀態,若不同,更新本地快取註意,只更新該快取變數,不更新本地當前應用實體的狀態( instanceInfo.status ) ),觸發 StatusChangeEvent 事件,事件監聽器執行。目前 Eureka 未提供預設的該事件監聽器。#onRemoteStatusChanged(...) 實現程式碼如下:

      protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
        fireEvent(new StatusChangeEvent(oldStatus, newStatus));
      }

2.4.1 全量獲取註冊資訊,並設定到本地快取

呼叫 #getAndStoreFullRegistry() 方法,全量獲取註冊資訊,並設定到本地快取。下實現程式碼如下:

  1: private void getAndStoreFullRegistry() throws Throwable {
 2:     long currentUpdateGeneration = fetchRegistryGeneration.get();
 3:
 4:     logger.info("Getting all instance registry info from the eureka server");
 5:
 6:     // 全量獲取註冊資訊
 7:     Applications apps = null;
 8:     EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
 9:             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
10:             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
11:     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
12:         apps = httpResponse.getEntity();
13:     }
14:     logger.info("The response status is {}", httpResponse.getStatusCode());
15:
16:     // 設定到本地快取
17:     if (apps == null) {
18:         logger.error("The application is null for some reason. Not storing this information");
19:     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
20:         localRegionApps.set(this.filterAndShuffle(apps));
21:         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
22:     } else {
23:         logger.warn("Not updating applications as another thread is updating it already");
24:     }
25: }
  • 第 6 至 14 行 :全量獲取註冊資訊,實現程式碼如下:

    // AbstractJerseyEurekaHttpClient.java
    @Override
    public EurekaHttpResponse getApplications(String... regions) {
      return getApplicationsInternal("apps/", regions);
    }

    private EurekaHttpResponse getApplicationsInternal(String urlPath, String[] regions) {
      ClientResponse response = null;
      String regionsParamValue = null;
      try {
          WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
          if (regions != null && regions.length > 0) {
              regionsParamValue = StringUtil.join(regions);
              webResource = webResource.queryParam("regions", regionsParamValue);
          }
          Builder requestBuilder = webResource.getRequestBuilder();
          addExtraHeaders(requestBuilder);
          response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
       Applications applications = null;
      if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
          applications = response.getEntity(Applications.class);
      }
      return anEurekaHttpResponse(response.getStatus(), Applications.class)
              .essay-headers(essay-headersOf(response))
              .entity(applications)
              .build();

      } finally {
          if (logger.isDebugEnabled()) {
              logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                      serviceUrl, urlPath,
                      regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                      response == null ? "N/A" : response.getStatus()
              );
          }
          if (response != null) {
              response.close();
          }
      }
    }

    • 呼叫 AbstractJerseyEurekaHttpClient#getApplications(...) 方法,GET 請求 Eureka-Server 的 apps/ 介面,引數為 regions ,傳回格式為 JSON ,實現全量獲取註冊資訊

  • 第 16 至 24 行 :設定到本地註冊資訊快取

    • 第 19 行 :TODO[0025] :併發更新的情況???

    • 第 20 行 :呼叫 #filterAndShuffle(...) 方法,根據配置 eureka.shouldFilterOnlyUpInstances = true ( 預設值 :true ) 過濾只保留狀態為開啟( UP )的應用實體,並隨機打亂應用實體順序。打亂後,實現呼叫應用服務的隨機性。程式碼比較易懂,點選連結檢視方法實現。

3. Eureka-Server 接收全量獲取

3.1 接收全量獲取請求

com.netflix.eureka.resources.ApplicationsResource,處理所有應用的請求操作的 Resource ( Controller )。

接收全量獲取請求,對映 ApplicationsResource#getContainers() 方法,實現程式碼如下:

  1: @GET
 2: public Response getContainers(@PathParam("version") String version,
 3:                               @HeaderParam(HEADER_ACCEPT) String acceptHeader,
 4:                               @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
 5:                               @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
 6:                               @Context UriInfo uriInfo,
 7:                               @Nullable @QueryParam("regions") String regionsStr)
{
 8:     // TODO[0009]:RemoteRegionRegistry
 9:     boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
10:     String[] regions = null;
11:     if (!isRemoteRegionRequested) {
12:         EurekaMonitors.GET_ALL.increment();
13:     } else {
14:         regions = regionsStr.toLowerCase().split(",");
15:         Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
16:         EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
17:     }
18:
19:     // 判斷是否可以訪問
20:     // Check if the server allows the access to the registry. The server can
21:     // restrict access if it is not
22:     // ready to serve traffic depending on various reasons.
23:     if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
24:         return Response.status(Status.FORBIDDEN).build();
25:     }
26:
27:     // API 版本
28:     CurrentRequestVersion.set(Version.toEnum(version));
29:
30:     // 傳回資料格式
31:     KeyType keyType = Key.KeyType.JSON;
32:     String returnMediaType = MediaType.APPLICATION_JSON;
33:     if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
34:         keyType = Key.KeyType.XML;
35:         returnMediaType = MediaType.APPLICATION_XML;
36:     }
37:
38:     // 響應快取鍵( KEY )
39:     Key cacheKey = new Key(Key.EntityType.Application,
40:             ResponseCacheImpl.ALL_APPS,
41:             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
42:     );
43:
44:     //
45:     Response response;
46:     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
47:         response = Response.ok(responseCache.getGZIP(cacheKey))
48:                 .essay-header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
49:                 .essay-header(HEADER_CONTENT_TYPE, returnMediaType)
50:                 .build();
51:     } else {
52:         response = Response.ok(responseCache.get(cacheKey))
53:                 .build();
54:     }
55:     return response;
56: }
  • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry

  • 第 19 至 25 行 :Eureka-Server 啟動完成,但是未處於就緒( Ready )狀態,不接受請求全量應用註冊資訊的請求,例如,Eureka-Server 啟動時,未能從其他 Eureka-Server 叢集的節點獲取到應用註冊資訊。

  • 第 27 至 28 行 :設定 API 版本號。預設最新 API 版本為 V2。實現程式碼如下:

    public enum Version {
       V1, V2;
    public static Version toEnum(String v) {
       for (Version version : Version.values()) {
           if (version.name().equalsIgnoreCase(v)) {
               return version;
           }
       }
       //Defaults to v2
       return V2;
    }

    }

  • 第 30 至 36 行 :設定傳回資料格式,預設 JSON 。

  • 第 38 至 42 行 :建立響應快取( ResponseCache ) 的鍵( KEY ),在 「3.2.1 快取鍵」詳細解析。

  • 第 44 至 55 行 :從響應快取讀取全量註冊資訊,在 「3.3 快取讀取」詳細解析。

3.2 響應快取 ResponseCache

com.netflix.eureka.registry.ResponseCache,響應快取介面,介面程式碼如下:

public interface ResponseCache {

   String get(Key key);

   byte[] getGZIP(Key key);

   void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);

   AtomicLong getVersionDelta();

   AtomicLong getVersionDeltaWithRegions();

}
  • 其中,#getVersionDelta() 和 #getVersionDeltaWithRegions() 已經廢棄。這裡保留的原因主要是考慮相容性。判斷依據來自如下程式碼:

    // Applications.java
    @Deprecated
    public void setVersion(Long version) {
      this.versionDelta = version;
    }

    // AbstractInstanceRegistry.java
    public Applications getApplicationDeltas() {
       // ... 省略其它無關程式碼
       apps.setVersion(responseCache.getVersionDelta().get()); // 唯一呼叫到 ResponseCache#getVersionDelta() 方法的地方
       // ... 省略其它無關程式碼
    }
  • #get() :獲得快取。

  • #getGZIP() :獲得快取,並 GZIP 。

  • #invalidate() :過期快取。

3.2.1 快取鍵

com.netflix.eureka.registry.Key,快取鍵。實現程式碼如下:

public class Key {

   public enum KeyType {
       JSON, XML
   }

   /**
    * An enum to define the entity that is stored in this cache for this key.
    */

   public enum EntityType {
       Application, VIP, SVIP
   }

   /**
    * 物體名
    */

   private final String entityName;
   /**
    * TODO[0009]:RemoteRegionRegistry
    */

   private final String[] regions;
   /**
    * 請求引數型別
    */

   private final KeyType requestType;
   /**
    * 請求 API 版本號
    */

   private final Version requestVersion;
   /**
    * hashKey
    */

   private final String hashKey;
   /**
    * 物體型別
    *
    * {@link EntityType}
    */

   private final EntityType entityType;
   /**
    * {@link EurekaAccept}
    */

   private final EurekaAccept eurekaAccept;

   public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
       this.regions = regions;
       this.entityType = entityType;
       this.entityName = entityName;
       this.requestType = type;
       this.requestVersion = v;
       this.eurekaAccept = eurekaAccept;
       hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
               + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
   }

   public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
       this.regions = regions;
       this.entityType = entityType;
       this.entityName = entityName;
       this.requestType = type;
       this.requestVersion = v;
       this.eurekaAccept = eurekaAccept;
       hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
               + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
   }

   @Override
   public int hashCode() {
       String hashKey = getHashKey();
       return hashKey.hashCode();
   }

   @Override
   public boolean equals(Object other) {
       if (other instanceof Key) {
           return getHashKey().equals(((Key) other).getHashKey());
       } else {
           return false;
       }
   }

}

3.2.2 響應快取實現類

com.netflix.eureka.registry.ResponseCacheImpl,響應快取實現類。

在 ResponseCacheImpl 裡,將快取拆分成兩層 :

  • 只讀快取readOnlyCacheMap )

  • 固定過期 + 固定大小讀寫快取readWriteCacheMap )

預設配置下,快取讀取策略如下:

快取過期策略如下:

  • 應用實體註冊、下線、過期時,只只只過期 readWriteCacheMap 。

  • readWriteCacheMap 寫入一段時間( 可配置 )後自動過期。

  • 定時任務對比 readWriteCacheMap 和 readOnlyCacheMap 的快取值,若不一致,以前者為主。透過這樣的方式,實現了 readOnlyCacheMap 的定時過期。

註意:應用實體註冊、下線、過期時,不會很快掃清到 readWriteCacheMap 快取裡。預設配置下,最大延遲在 30 秒。

為什麼可以使用快取?

在 CAP 的選擇上,Eureka 選擇了 AP ,不同於 Zookeeper 選擇了 CP 。

推薦閱讀:

  • 《為什麼不應該使用ZooKeeper做服務發現》

  • 《Spring Cloud Netflix Eureka原始碼導讀與原理分析》「4. 作為服務註冊中心,Eureka比Zookeeper好在哪裡」

3.3 快取讀取

呼叫 ResponseCacheImpl#get(...) 方法( #getGzip(...) 類似 ),讀取快取,實現程式碼如下:

  1: private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
 2:
 3: private final LoadingCache readWriteCacheMap;
 4:
 5: public String get(final Key key) {
 6:     return get(key, shouldUseReadOnlyResponseCache);
 7: }
 8:
 9: String get(final Key key, boolean useReadOnlyCache) {
10:     Value payload = getValue(key, useReadOnlyCache);
11:     if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
12:         return null;
13:     } else {
14:         return payload.getPayload();
15:     }
16: }
17:
18: Value getValue(final Key key, boolean useReadOnlyCache) {
19:     Value payload = null;
20:     try {
21:         if (useReadOnlyCache) {
22:             final Value currentPayload = readOnlyCacheMap.get(key);
23:             if (currentPayload != null) {
24:                 payload = currentPayload;
25:             } else {
26:                 payload = readWriteCacheMap.get(key);
27:                 readOnlyCacheMap.put(key, payload);
28:             }
29:         } else {
30:             payload = readWriteCacheMap.get(key);
31:         }
32:     } catch (Throwable t) {
33:         logger.error("Cannot get value for key :" + key, t);
34:     }
35:     return payload;
36: }
  • 第 5 至 7 行 :呼叫 #get(key, useReadOnlyCache) 方法,讀取快取。其中 shouldUseReadOnlyResponseCache 透過配置 eureka.shouldUseReadOnlyResponseCache = true (預設值 :true ) 開啟只讀快取。如果你對資料的一致性有相對高的要求,可以關閉這個開關,當然因為少了 readOnlyCacheMap ,效能會有一定的下降。

  • 第 9 至 16 行 :呼叫 getValue(key, useReadOnlyCache) 方法,讀取快取。從 readOnlyCacheMap 和 readWriteCacheMap 變數可以看到快取值的類為 com.netflix.eureka.registry.ResponseCacheImpl.Value ,實現程式碼如下:

    public class Value {

      /**
       * 原始值
       */

      private final String payload;
      /**
       * GZIP 壓縮後的值
       */

      private byte[] gzipped;

      public Value(String payload) {
          this.payload = payload;
          if (!EMPTY_PAYLOAD.equals(payload)) {
              // ... 省略 GZIP 壓縮程式碼
              gzipped = bos.toByteArray();
          } else {
              gzipped = null;
          }
      }

      public String getPayload() {
          return payload;
      }

      public byte[] getGzipped() {
          return gzipped;
      }

    }
  • 第 21 至 31 行 :讀取快取。

    • readWriteCacheMap 最大快取數量為 1000 。

    • 呼叫 #generatePayload(key) 方法,生成快取值。

    • 第 21 至 28 行 :先讀取 readOnlyCacheMap 。讀取不到,讀取 readWriteCacheMap ,並設定到 readOnlyCacheMap 。

    • 第 29 至 31 行 :讀取 readWriteCacheMap 。

    • readWriteCacheMap 實現程式碼如下:

      this.readWriteCacheMap =
           CacheBuilder.newBuilder().initialCapacity(1000)
                   .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                   .removalListener(new RemovalListener() {
                       @Override
                       public void onRemoval(RemovalNotification notification) {
                           // TODO[0009]:RemoteRegionRegistry
                           Key removedKey = notification.getKey();
                           if (removedKey.hasRegions()) {
                               Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                               regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                           }
                       }
                   })
                   .build(new CacheLoader() {
                       @Override
                       public Value load(Key key) throws Exception {
                           // // TODO[0009]:RemoteRegionRegistry
                           if (key.hasRegions()) {
                               Key cloneWithNoRegions = key.cloneWithoutRegions();
                               regionSpecificKeys.put(cloneWithNoRegions, key);
                           }
                           Value value = generatePayload(key);
                           return value;
                       }
                   });
  • #generatePayload(key) 方法,實現程式碼如下:

      1: private Value generatePayload(Key key) {
     2:     Stopwatch tracer = null;
     3:     try {
     4:         String payload;
     5:         switch (key.getEntityType()) {
     6:             case Application:
     7:                 boolean isRemoteRegionRequested = key.hasRegions();
     8:
     9:                 if (ALL_APPS.equals(key.getName())) {
    10:                     if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
    11:                         tracer = serializeAllAppsWithRemoteRegionTimer.start();
    12:                         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
    13:                     } else {
    14:                         tracer = serializeAllAppsTimer.start();
    15:                         payload = getPayLoad(key, registry.getApplications());
    16:                     }
    17:                 } else if (ALL_APPS_DELTA.equals(key.getName())) {
    18:                     // ... 省略增量獲取相關的程式碼
    19:                  } else {
    20:                     tracer = serializeOneApptimer.start();
    21:                     payload = getPayLoad(key, registry.getApplication(key.getName()));
    22:                 }
    23:                 break;
    24:             // ... 省略部分程式碼
    25:         }
    26:         return new Value(payload);
    27:     } finally {
    28:         if (tracer != null) {
    29:             tracer.stop();
    30:         }
    31:     }
    32: }
    • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry

    • 第 13 至 16 行 :呼叫 AbstractInstanceRegistry#getApplications() 方法,獲得註冊的應用集合。後呼叫 #getPayLoad() 方法,將註冊的應用集合轉換成快取值。? 這兩個方法程式碼較多,下麵詳細解析。

    • 第 17 至 18 行 :獲取增量註冊資訊的快取值,在 《Eureka 原始碼解析 —— 應用實體註冊發現 (七)之增量獲取》 詳細解析。

3.3.1 獲得註冊的應用集合

呼叫 AbstractInstanceRegistry#getApplications() 方法,獲得註冊的應用集合,實現程式碼如下:

// ... 省略程式碼,超過微信文章長度
  • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry

  • 第 9 至 16 行 :呼叫 #getApplicationsFromMultipleRegions(...) 方法,獲得註冊的應用集合,實現程式碼如下:

    // ... 省略程式碼,超過微信文章長度
    • 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry

    • 第 11 至 29 行 :獲得獲得註冊的應用集合。

    • 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry

    • 第 61 行 :計算應用集合 hashcode 。該變數用於校驗增量獲取的註冊資訊和 Eureka-Server 全量的註冊資訊是否一致( 完整 ),在 《Eureka 原始碼解析 —— 應用實體註冊發現 (七)之增量獲取》 詳細解析。

3.3.2 轉換成快取值

呼叫 #getPayLoad() 方法,將註冊的應用集合轉換成快取值,實現程式碼如下:

// ... 省略程式碼,超過微信文章長度

3.4 主動過期讀寫快取

應用實體註冊、下線、過期時,呼叫 ResponseCacheImpl#invalidate() 方法,主動過期讀寫快取( readWriteCacheMap ),實現程式碼如下:

// ... 省略程式碼,超過微信文章長度
  • 呼叫 #invalidate(keys) 方法,逐個過期每個快取鍵值,實現程式碼如下:

    // ... 省略程式碼,超過微信文章長度

3.5 被動過期讀寫快取

讀寫快取( readWriteCacheMap ) 寫入後,一段時間自動過期,實現程式碼如下:

expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
  • 配置 eureka.responseCacheAutoExpirationInSeconds ,設定寫入過期時長。預設值 :180 秒。

3.6 定時掃清只讀快取

定時任務對比 readWriteCacheMap 和 readOnlyCacheMap 的快取值,若不一致,以前者為主。透過這樣的方式,實現了 readOnlyCacheMap 的定時過期。實現程式碼如下:

// ... 省略程式碼,超過微信文章長度
  • 第 7 至 12 行 :初始化定時任務。配置 eureka.responseCacheUpdateIntervalMs,設定任務執行頻率,預設值 :30 * 1000 毫秒。

  • 第 17 至 39 行 :建立定時任務。

    • 第 22 行 :迴圈 readOnlyCacheMap 的快取鍵。為什麼不迴圈 readWriteCacheMap 呢? readOnlyCacheMap 的快取過期依賴 readWriteCacheMap,因此快取鍵會更多。

    • 第 28 行 至 33 行 :對比 readWriteCacheMap 和 readOnlyCacheMap 的快取值,若不一致,以前者為主。透過這樣的方式,實現了 readOnlyCacheMap 的定時過期。

666. 彩蛋

比預期,比想想,長老多老多的一篇文章。細思極恐。

估計下一篇增量獲取會簡潔很多。

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?


知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽

09. 拓展機制 SPI

10. 執行緒池


一共 60 篇++

贊(0)

分享創造快樂