歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— Eureka-Client 初始化(三)之 EurekaClient

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. EurekaClient

  • 2.1 LookupService

  • 3. DiscoveryClient

  • 3.1 構造方法引數

  • 3.2 構造方法

  • 666. 彩蛋


友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】】搞基嗨皮。


1. 概述

本文接《Eureka 原始碼解析 —— Eureka-Client 初始化(二)之 EurekaClientConfig》,主要分享 Eureka-Client 自身初始化的過程的第三部分 —— EurekaClient,不包含 Eureka-Client 向 Eureka-Server 的註冊過程( ?後面會另外文章分享 )。

Eureka-Client 自身初始化過程中,涉及到主要物件如下圖:

  1. 建立 EurekaInstanceConfig物件

  2. 使用 EurekaInstanceConfig物件 建立 InstanceInfo物件

  3. 使用 EurekaInstanceConfig物件 + InstanceInfo物件 建立 ApplicationInfoManager物件

  4. 建立 EurekaClientConfig物件

  5. 使用 ApplicationInfoManager物件 + EurekaClientConfig物件 建立 EurekaClient物件

考慮到整個初始化的過程中涉及的配置特別多,拆分成三篇文章:

  1. (一)EurekaInstanceConfig)

  2. (二)EurekaClientConfig

  3. 【本文】(三)EurekaClient

下麵我們來看看每個的實現。

推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 影片

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

2. EurekaClient

com.netflix.discovery.EurekaClient,Eureka-Client 介面,宣告如下方法:

  • 提供多種方法獲取應用集合(com.netflix.discovery.shared.Applications) 和 應用實體資訊集合( com.netflix.appinfo.InstanceInfo )。

  • 提供方法獲取本地客戶端資訊,例如,應用管理器( com.netflix.appinfo.ApplicationInfoManager )和 Eureka-Client 配置( com.netflix.discovery.EurekaClientConfig )。

  • 提供方法註冊本地客戶端的健康檢查和 Eureka 事件監聽器。

另外,Eureka 2.X 版本正在開發,該介面為 Eureka 1.X 和 2.X 提供平滑過渡介面。

This interface does NOT try to clean up the current client interface for eureka 1.x. Rather it tries to provide an easier transition path from eureka 1.x to eureka 2.x.

2.1 LookupService

com.netflix.discovery.shared.LookupService,查詢服務介面,提供簡單單一的方式獲取應用集合(com.netflix.discovery.shared.Applications) 和 應用實體資訊集合( com.netflix.appinfo.InstanceInfo )。

  • 在 Eureka-Client 裡,EurekaClient 繼承該介面。

  • 在 Eureka-Server 裡,com.netflix.eureka.registry.InstanceRegistry 繼承該介面。

3. DiscoveryClient

com.netflix.discovery.DiscoveryClient,實現 EurekaClient 介面,用於與 Eureka-Server 互動。實現如下方法:

  • 向 Eureka-Server 註冊自身服務

  • 向 Eureka-Server 續約自身服務

  • 向 Eureka-Server 取消自身服務,當關閉時

  • 從 Eureka-Server 查詢應用集合和應用實體資訊

  • 簡單來理解,對 Eureka-Server 服務的增刪改查

3.1 構造方法引數

DiscoveryClient 完整構造方法需要傳入四個引數,實現程式碼如下:

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                   Provider backupRegistryProvider) {
    // .... 省略程式碼
}
  • ApplicationInfoManager,在《Eureka 原始碼解析 —— Eureka-Client 初始化(一)之 EurekaInstanceConfig》有詳細解析。

  • EurekaClientConfig,在《Eureka 原始碼解析 —— Eureka-Client 初始化(二)之 EurekaClientConfig》有詳細解析。

  • com.netflix.discovery.BackupRegistry,備份註冊中心介面。當 Eureka-Client 啟動時,無法從 Eureka-Server 讀取註冊資訊(可能掛了),從備份註冊中心讀取註冊資訊。實現程式碼如下:

    // BackupRegistry.java
    public interface BackupRegistry {
    Applications fetchRegistry();

    Applications fetchRegistry(String[] includeRemoteRegions);

    }

    // NotImplementedRegistryImpl.java
    public class NotImplementedRegistryImpl implements BackupRegistry {

    @Override
    public Applications fetchRegistry() {
       return null;
    }

    @Override
    public Applications fetchRegistry(String[] includeRemoteRegions) {
       return null;
    }

    }

    • 從 com.netflix.discovery.NotImplementedRegistryImpl 可以看出,目前 Eureka-Client 未提供合適的預設實現。

  • com.netflix.discovery.AbstractDiscoveryClientOptionalArgs,DiscoveryClient 可選引數抽象基類。不同於上面三個必填引數,該引數是選填引數,實際生產下使用較少。實現程式碼如下:

    public abstract class AbstractDiscoveryClientOptionalArgs<T> {
    /**
    * 生成健康檢查回呼的工廠
    */

    Provider healthCheckCallbackProvider;
    /**
    * 生成健康檢查處理器的工廠
    */

    Provider healthCheckHandlerProvider;
    /**
    * 向 Eureka-Server 註冊之前的處理器
    */

    PreRegistrationHandler preRegistrationHandler;
    /**
    * Jersey 過濾器集合
    */

    Collection additionalFilters;
    /**
    * Jersey 客戶端
    */

    EurekaJerseyClient eurekaJerseyClient;
    /**
    * 生成 Jersey 客戶端的工廠的工廠
    */

    TransportClientFactories transportClientFactories;
    /**
    * Eureka 事件監聽器集合
    */

    private Set eventListeners;

    }

    • x

    • 在 Spring-Cloud-Eureka-Client,提供了預設實現 org.springframework.cloud.netflix.eureka.EurekaHealthCheckHandler,需要結合 spirng-boot-actuate 使用,感興趣的同學可以看看。本文暫不拓展開,後面另開文章分享。(TODO[0004]:健康檢查)

    • com.netflix.appinfo.HealthCheckCallback,健康檢查回呼介面,目前已經廢棄,使用 HealthCheckHandler 替代,你可以不關註該引數

    • com.netflix.appinfo.HealthCheckHandler,健康檢查處理器介面,目前暫未提供合適的預設實現,唯一提供的 com.netflix.appinfo.HealthCheckCallbackToHandlerBridge,用於將 HealthCheckCallback 橋接成 HealthCheckHandler,實現程式碼如下:

      // HealthCheckHandler.java
      public interface HealthCheckHandler {
      InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);

      }

      // HealthCheckCallbackToHandlerBridge.java
      public class HealthCheckCallbackToHandlerBridge implements HealthCheckHandler {

      private final HealthCheckCallback callback;

      public HealthCheckCallbackToHandlerBridge() {
         callback = null;
      }

      public HealthCheckCallbackToHandlerBridge(HealthCheckCallback callback) {
         this.callback = callback;
      }

      @Override
      public InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus) {
         if (null == callback || InstanceInfo.InstanceStatus.STARTING == currentStatus
                 || InstanceInfo.InstanceStatus.OUT_OF_SERVICE == currentStatus) { // Do not go to healthcheck handler if the status is starting or OOS.
             return currentStatus;
         }
      return callback.isHealthy() ? InstanceInfo.InstanceStatus.UP : InstanceInfo.InstanceStatus.DOWN;

      }
      }

    • com.netflix.discovery.PreRegistrationHandler,向 Eureka-Server 註冊之前的處理器介面,目前暫未提供預設實現。透過實現該介面,可以在註冊前做一些自定義的處理。實現程式碼如下:

      public interface PreRegistrationHandler {
      void beforeRegistration();

      }

  • additionalFilters,Jersey 過濾器集合。這裡宣告泛型  的原因,Jersey 1.X 和 Jersey 2.X 的過濾器介面不同,透過泛型來支援。實現程式碼如下:

    “`Java
    // Jersey1DiscoveryClientOptionalArgs.java
    public class Jersey1DiscoveryClientOptionalArgs extends AbstractDiscoveryClientOptionalArgs {
    }

    // Jersey2DiscoveryClientOptionalArgs.java
    public class Jersey2DiscoveryClientOptionalArgs extends AbstractDiscoveryClientOptionalArgs {
    }

    // DiscoveryClientOptionalArgs.java
    public static class DiscoveryClientOptionalArgs extends Jersey1DiscoveryClientOptionalArgs {

    }
    “`

    • Jersey 1.X 使用 ClientFilter 。ClientFilter 目前有兩個過濾器實現:EurekaIdentityHeaderFilter 、DynamicGZIPContentEncodingFilter 。

    • Jersey 2.X 使用 ClientRequestFilter 。

    • DiscoveryClient 使用 DiscoveryClientOptionalArgs,即 Jersey 1.X 。

  • eurekaJerseyClient,Jersey 客戶端。該引數目前廢棄,使用下麵 TransportClientFactories 引數來進行生成。

  • com.netflix.discovery.shared.transport.jersey.TransportClientFactories,生成 Jersey 客戶端工廠的工廠介面。目前有 Jersey1TransportClientFactories 、Jersey2TransportClientFactories 兩個實現。TransportClientFactories 實現程式碼如下:

    “`Java
    // TransportClientFactories.java
    public interface TransportClientFactories {

        @Deprecated

      TransportClientFactory newTransportClientFactory(final Collection additionalFilters
    ,
                                                       final EurekaJerseyClient providedJerseyClien
    t);

    TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfi
    g,
                                                    final Collection additionalFilte
    rs,
                                                   final InstanceInfo myInstanceI
    n
    fo);




    }

    // TransportClientFactory.java
    public interface TransportClientFactory {

        EurekaHttpClient newClient(EurekaEndpoint serviceUrl);
    void shutdown();



    }
    “`

    • 第一個方法已經廢棄,這就是為什麼說上面的 eurekaJerseyClient 引數( 不是 EurekaJerseyClient 類)已經廢棄,被第二個方法取代。相比來說,第二個方法對 EurekaJerseyClient 建立封裝會更好。

  • com.netflix.discovery.EurekaEventListener,Eureka 事件監聽器。實現程式碼如下:

    “`Java
    // EurekaEventListener.java
    public interface EurekaEventListener {
    }

    // EurekaEvent.java
    public interface EurekaEvent {
    }

    // DiscoveryEvent.java
    public abstract class DiscoveryEvent implements EurekaEvent {

        private final long timestamp;

    }
    “`

    • com.netflix.discovery.StatusChangeEvent,應用實體狀態變更事件,在《Eureka 原始碼解析 —— 應用實體註冊發現 (一)之註冊》「2.1 應用實體資訊複製器」 有詳細解析。

    • com.netflix.discovery.CacheRefreshedEvent,在《Eureka 原始碼解析 —— 應用實體註冊發現 (六)之全量獲取》「2.4 發起獲取註冊資訊」 有詳細解析。

3.2 構造方法

DiscoveryClient 的構造方法實現程式碼相對較多,已經將程式碼切塊 + 中文註冊,點選 DiscoveryClient 連結,對照下麵每個小結閱讀理解。

3.2.1 賦值 AbstractDiscoveryClientOptionalArgs

// DiscoveryClient.java 構造方法
if (args != null) {
 this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
 this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
 this.eventListeners.addAll(args.getEventListeners());
 this.preRegistrationHandler = args.preRegistrationHandler;
} else {
 this.healthCheckCallbackProvider = null;
 this.healthCheckHandlerProvider = null;
 this.preRegistrationHandler = null;
}

3.2.2 賦值 ApplicationInfoManager、EurekaClientConfig

// DiscoveryClient.java 構造方法
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();

clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
 appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); // 無實際業務用途,用於打 logger
} else {
 logger.warn("Setting instanceInfo to a passed in null value");
}

3.2.3 賦值 BackupRegistry

this.backupRegistryProvider = backupRegistryProvider;

3.2.4 初始化 InstanceInfoBasedUrlRandomizer

TODO[0016]:InstanceInfoBasedUrlRandomizer

this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);

3.2.5 初始化 Applications 在本地的快取

// DiscoveryClient.java 變數
/**
* Applications 在本地的快取
*/

private final AtomicReference localRegionApps = new AtomicReference();
/**
* 拉取註冊資訊次數
* monotonically increasing generation counter to ensure stale threads do not reset registry to an older version
*/

private final AtomicLong fetchRegistryGeneration;

// DiscoveryClient.java 構造方法
localRegionApps.set(new Applications());

fetchRegistryGeneration = new AtomicLong(0);
  • 在建立 DiscoveryClient 時,localRegionApps 為空。

  • 定時任務間隔從 Eureka-Server 拉取註冊應用資訊到本地快取,在 《Eureka 原始碼解析 —— 應用實體註冊發現 (六)之全量獲取》 有詳細解析。

3.2.6 獲取哪些 Region 集合的註冊資訊

// DiscoveryClient.java 變數
/**
* 獲取哪些區域( Region )集合的註冊資訊
*/

private final AtomicReference remoteRegionsToFetch;
/**
* 獲取哪些區域( Region )集合的註冊資訊
*/

private final AtomicReference remoteRegionsRef;

// DiscoveryClient.java 構造方法
remoteRegionsToFetch = new AtomicReference<>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

3.2.7 初始化拉取、心跳的監控

// DiscoveryClient.java 變數
/**
* 最後成功從 Eureka-Server 拉取註冊資訊時間戳
*/

private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
/**
* 最後成功向 Eureka-Server 心跳時間戳
*/

private volatile long lastSuccessfulHeartbeatTimestamp = -1;
/**
* 心跳監控
*/

private final ThresholdLevelsMetric heartbeatStalenessMonitor;
/**
* 拉取監控
*/

private final ThresholdLevelsMetric registryStalenessMonitor;

// DiscoveryClient.java 構造方法
if (config.shouldFetchRegistry()) {
 this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
 this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

if (config.shouldRegisterWithEureka()) {
 this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
 this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
  • 每次成功向 Eureka-Serve 心跳或者從從 Eureka-Server 拉取註冊資訊後,都會更新相應時間戳。

  • 配合 Netflix Servo 實現監控資訊採集。

  • 對 com.netflix.discovery.util.ThresholdLevelsMetric 感興趣的同學可以點選連結檢視。本文暫不拓展開,後面另開文章分享。(TODO[0012]:監控相關)

3.2.8 結束初始化,當無需和 Eureka-Server 互動

// DiscoveryClient.java 構造方法
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
 logger.info("Client configured to neither register nor query for data.");
 scheduler = null;
 heartbeatExecutor = null;
 cacheRefreshExecutor = null;
 eurekaTransport = null;
 instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

 // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
 // to work with DI'd DiscoveryClient
 DiscoveryManager.getInstance().setDiscoveryClient(this);
 DiscoveryManager.getInstance().setEurekaClientConfig(config);

 initTimestampMs = System.currentTimeMillis();
 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
         initTimestampMs, this.getApplications().size());

 return;  // no need to setup up an network tasks and we are done
}

3.2.9 初始化執行緒池

// DiscoveryClient.java 變數
/**
* 執行緒池
*
* A scheduler to be used for the following 3 tasks: 【目前只有兩個】
* - updating service urls
* - scheduling a TimedSuperVisorTask
*/

private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
/**
* 心跳執行器
*/

private final ThreadPoolExecutor heartbeatExecutor;
/**
* {@link #localRegionApps} 掃清執行器
*/

private final ThreadPoolExecutor cacheRefreshExecutor;

// DiscoveryClient.java 構造方法
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
    new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-%d")
            .setDaemon(true)
            .build());

heartbeatExecutor = new ThreadPoolExecutor(
    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
    new SynchronousQueue(),
    new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
            .setDaemon(true)
            .build()
);  // use direct handoff

cacheRefreshExecutor = new ThreadPoolExecutor(
    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
    new SynchronousQueue(),
    new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
            .setDaemon(true)
            .build()
);  // use direct handoff
  • scheduler定時任務執行緒池,初始化大小為 2,一個給 heartbeatExecutor,一個給 cacheRefreshExecutor

  • heartbeatExecutorcacheRefreshExecutor 在提交給 scheduler 才宣告具體的任務

3.2.10 初始化 Eureka 網路通訊相關

// DiscoveryClient.java 構造方法
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
  • 本文暫不拓展開,在 《Eureka 原始碼解析 —— EndPoint 與 解析器》 和 《Eureka 原始碼解析 —— 網路通訊》 詳細解析。

3.2.11 初始化 InstanceRegionChecker

// DiscoveryClient.java 構造方法
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
   azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
   azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
   azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
  • com.netflix.discovery.AzToRegionMapper,主要用於亞馬遜 AWS,跳過。

  • com.netflix.discovery.InstanceRegionChecker,應用實體資訊區域( region )校驗,實現程式碼如下:

    public class InstanceRegionChecker {
    // ... 省略和亞馬遜 AWS 相關的屬性和方法

    /**
    * 本地區域( Region )
    */

    private final String localRegion;

    public boolean isLocalRegion(@Nullable String instanceRegion) {
       return null == instanceRegion || instanceRegion.equals(localRegion); // no region == local
    }

    public String getLocalRegion() {
       return localRegion;
    }

    }

3.2.12 從 Eureka-Server 拉取註冊資訊

// DiscoveryClient.java 構造方法
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
 fetchRegistryFromBackup();
}
  • 呼叫 #fetchRegistry(false) 方法,從 Eureka-Server 初始拉取註冊資訊。在(TO後文連結)詳細解析。

  • 呼叫 #fetchRegistryFromBackup() 方法,若初始拉取註冊資訊失敗,從備份註冊中心獲取。實現程式碼如下:

    // DiscoveryClient.java
    private void fetchRegistryFromBackup() {
       try {
           @SuppressWarnings("deprecation")
           BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
           if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
               backupRegistryInstance = backupRegistryProvider.get();
           }
           if (null != backupRegistryInstance) {
               Applications apps = null;
               if (isFetchingRemoteRegionRegistries()) {
                   String remoteRegionsStr = remoteRegionsToFetch.get();
                   if (null != remoteRegionsStr) {
                       apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
                   }
               } else {
                   apps = backupRegistryInstance.fetchRegistry();
               }
               if (apps != null) {
                   final Applications applications = this.filterAndShuffle(apps);
                   applications.setAppsHashCode(applications.getReconcileHashCode());
                   localRegionApps.set(applications);
                   logTotalInstances();
                   logger.info("Fetched registry successfully from the backup");
               }
           } else {
               logger.warn("No backup registry instance defined & unable to find any discovery servers.");
           }
       } catch (Throwable e) {
           logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
       }
    }
    • BackupRegistry 目前暫未提供預設實現,需要自行相關邏輯。

3.2.13 執行向 Eureka-Server 註冊之前的處理器

// DiscoveryClient.java 構造方法
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
 this.preRegistrationHandler.beforeRegistration();
}

3.2.14 初始化定時任務

// DiscoveryClient.java 構造方法
initScheduledTasks();

// DiscoveryClient.java
private void initScheduledTasks() {
  // 從 Eureka-Server 拉取註冊資訊執行器
  if (clientConfig.shouldFetchRegistry()) {
      // registry cache refresh timer
      int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
      int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
      scheduler.schedule(
              new TimedSupervisorTask(
                      "cacheRefresh",
                      scheduler,
                      cacheRefreshExecutor,
                      registryFetchIntervalSeconds,
                      TimeUnit.SECONDS,
                      expBackOffBound,
                      new CacheRefreshThread()
              ),
              registryFetchIntervalSeconds, TimeUnit.SECONDS);
  }

  // 向 Eureka-Server 心跳(續租)執行器
  if (clientConfig.shouldRegisterWithEureka()) {
      int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
      int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
      logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

      // Heartbeat timer
      scheduler.schedule(
              new TimedSupervisorTask(
                      "heartbeat",
                      scheduler,
                      heartbeatExecutor,
                      renewalIntervalInSecs,
                      TimeUnit.SECONDS,
                      expBackOffBound,
                      new HeartbeatThread()
              ),
              renewalIntervalInSecs, TimeUnit.SECONDS);

      // InstanceInfo replicator
      instanceInfoReplicator = new InstanceInfoReplicator(
              this,
              instanceInfo,
              clientConfig.getInstanceInfoReplicationIntervalSeconds(),
              2); // burstSize

      statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
          @Override
          public String getId() {
              return "statusChangeListener";
          }

          @Override
          public void notify(StatusChangeEvent statusChangeEvent) {
              if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                      InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                  // log at warn level if DOWN was involved
                  logger.warn("Saw local status change event {}", statusChangeEvent);
              } else {
                  logger.info("Saw local status change event {}", statusChangeEvent);
              }
              instanceInfoReplicator.onDemandUpdate();
          }
      };

      if (clientConfig.shouldOnDemandUpdateStatusChange()) {
          applicationInfoManager.registerStatusChangeListener(statusChangeListener);
      }

      instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
  } else {
      logger.info("Not registering with Eureka server per configuration");
  }
}
  • 初始化從 Eureka-Server 拉取註冊資訊執行器,在 《Eureka 原始碼解析 —— 應用實體註冊發現 (六)之全量獲取》 詳細解析。

  • 初始化向 Eureka-Server 心跳(續租)執行器,在 《Eureka 原始碼解析 —— 應用實體註冊發現(二)之續租》 詳細解析。

3.2.15 向 Servo 註冊監控

// DiscoveryClient.java 構造方法
try {
 Monitors.registerObject(this);
} catch (Throwable e) {
 logger.warn("Cannot register timers", e);
}
  • 配合 Netflix Servo 實現監控資訊採集。

3.2.16 初始化完成

// DiscoveryClient.java 變數
/**
* 初始化完成時間戳
*/

private final long initTimestampMs;

// DiscoveryClient.java 構造方法
// 【3.2.16】初始化完成
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
     initTimestampMs, this.getApplications().size());

666. 彩蛋

知識星球

由於筆者是邊理解原始碼邊輸出部落格內容,如果有錯誤或者不清晰的地方,歡迎微笑給我的微信公眾號( 芋道原始碼 ) 留言,我會仔細回覆。感謝 + 1024。

後面文章不斷更新,會慢慢完善本文中的。

推薦參考閱讀:

  • 程式猿DD —— 《Spring Cloud微服務實戰》 Spring Cloud Eureka —— 原始碼分析

  • 買盜版書,等於編寫一個初級 BUG

贊(0)

分享創造快樂