歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— Eureka-Server 叢集同步

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. 叢集節點初始化與更新

    • 2.1 叢集節點啟動

    • 2.2 更新叢集節點資訊

    • 2.3 叢集節點

  • 3. 獲取初始註冊資訊

  • 4. 同步註冊資訊

    • 4.1 同步操作型別

    • 4.2 發起 Eureka-Server 同步操作

    • 4.3 接收 Eureka-Server 同步操作

    • 4.4 處理 Eureka-Server 同步結果


1. 概述

本文主要分享 Eureka-Server 叢集同步註冊資訊

Eureka-Server 叢集如下圖:

  • Eureka-Server 叢集不區分主從節點或者 Primary & Secondary 節點,所有節點相同角色( 也就是沒有角色 ),完全對等

  • Eureka-Client 可以向任意 Eureka-Client 發起任意讀寫操作,Eureka-Server 將操作複製到另外的 Eureka-Server 以達到最終一致性。註意,Eureka-Server 是選擇了 AP 的元件。

Eureka-Server 可以使用直接配置所有節點的服務地址,或者基於 DNS 配置。推薦閱讀:《Spring Cloud構建微服務架構(六)高可用服務註冊中心》 。

本文主要類在 com.netflix.eureka.cluster 包下。

OK,讓我們開始愉快的遨遊在程式碼的海洋。

推薦 Spring Cloud 書籍

  • 請支援正版。下載盜版,等於主動編寫低階 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 影片

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

ps :註意,本文提到的同步,準確來說是複製( Replication )

2. 叢集節點初始化與更新

com.netflix.eureka.cluster.PeerEurekaNodes ,Eureka-Server 叢集節點集合 。構造方法如下 :

public class PeerEurekaNodes {

    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);

    /**
     * 應用實體登錄檔
     */

    protected final PeerAwareInstanceRegistry registry;
    /**
     * Eureka-Server 配置
     */

    protected final EurekaServerConfig serverConfig;
    /**
     * Eureka-Client 配置
     */

    protected final EurekaClientConfig clientConfig;
    /**
     * Eureka-Server 編解碼
     */

    protected final ServerCodecs serverCodecs;
    /**
     * 應用實體資訊管理器
     */

    private final ApplicationInfoManager applicationInfoManager;

    /**
     * Eureka-Server 叢集節點陣列
     */

    private volatile List peerEurekaNodes = Collections.emptyList();
    /**
     * Eureka-Server 服務地址陣列
     */

    private volatile Set peerEurekaNodeUrls = Collections.emptySet();

    /**
     * 定時任務服務
     */

    private ScheduledExecutorService taskExecutor;

    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager)
 
{
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
}
  • peerEurekaNodespeerEurekaNodeUrlstaskExecutor 屬性,在構造方法中未設定和初始化,而是在 PeerEurekaNodes#start() 方法,設定和初始化,下文我們會解析這個方法。

  • Eureka-Server 在初始化時,呼叫 EurekaBootStrap#getPeerEurekaNodes(…) 方法,建立 PeerEurekaNodes ,點選 連結 檢視該方法的實現。

2.1 叢集節點啟動

呼叫 PeerEurekaNodes#start() 方法,叢集節點啟動,主要完成兩個邏輯:

  • 初始化叢集節點資訊

  • 初始化固定週期( 預設:10 分鐘,可配置 )更新叢集節點資訊的任務

程式碼如下:

  1public void start() {
  2:     // 建立 定時任務服務
  3:     taskExecutor = Executors.newSingleThreadScheduledExecutor(
  4:             new ThreadFactory() {
  5:                 @Override
  6:                 public Thread newThread(Runnable r) {
  7:                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
  8:                     thread.setDaemon(true);
  9:                     return thread;
 10:                 }
 11:             }
 12:     );
 13:     try {
 14:         // 初始化 叢集節點資訊
 15:         updatePeerEurekaNodes(resolvePeerUrls());
 16:         // 初始化 初始化固定週期更新叢集節點資訊的任務
 17:         Runnable peersUpdateTask = new Runnable() {
 18:             @Override
 19:             public void run() {
 20:                 try {
 21:                     updatePeerEurekaNodes(resolvePeerUrls());
 22:                 } catch (Throwable e) {
 23:                     logger.error("Cannot update the replica Nodes", e);
 24:                 }
 25
 26:             }
 27:         };
 28:         taskExecutor.scheduleWithFixedDelay(
 29:                 peersUpdateTask,
 30:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 31:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 32:                 TimeUnit.MILLISECONDS
 33:         );
 34:     } catch (Exception e) {
 35:         throw new IllegalStateException(e);
 36:     }
 37:     // 列印 叢集節點資訊
 38:     for (PeerEurekaNode node : peerEurekaNodes) {
 39:         logger.info("Replica node URL:  " + node.getServiceUrl());
 40:     }
 41: }
  • 第 15 行 && 第 21 行 :呼叫 #updatePeerEurekaNodes() 方法,更新叢集節點資訊。

2.2 更新叢集節點資訊

呼叫 #resolvePeerUrls() 方法,獲得 Eureka-Server 叢集服務地址陣列,程式碼如下:

  1protected List resolvePeerUrls() {
  2:     // 獲得 Eureka-Server 叢集服務地址陣列
  3:     InstanceInfo myInfo = applicationInfoManager.getInfo();
  4:     String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
  5:     List replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
  6
  7:     // 移除自己(避免向自己同步)
  8:     int idx = 0;
  9:     while (idx  10:         if (isThisMyUrl(replicaUrls.get(idx))) {
 11:             replicaUrls.remove(idx);
 12:         } else {
 13:             idx++;
 14:         }
 15:     }
 16:     return replicaUrls;
 17: }
  • 第 2 至 5 行 :獲得 Eureka-Server 叢集服務地址陣列。EndpointUtils#getDiscoveryServiceUrls(…) 方法,邏輯與 《Eureka 原始碼解析 —— EndPoint 與 解析器》「3.4 ConfigClusterResolver」 基本類似。EndpointUtils 正在逐步,猜測未來這裡會替換。

  • 第 7 至 15 行 :移除自身節點,避免向自己同步。


呼叫 #updatePeerEurekaNodes() 方法,更新叢集節點資訊,主要完成兩部分邏輯:

  • 新增新增的叢集節點

  • 關閉刪除的叢集節點

程式碼如下:

  1protected void updatePeerEurekaNodes(List newPeerUrls) {
  2:     if (newPeerUrls.isEmpty()) {
  3:         logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
  4:         return;
  5:     }
  6
  7:     // 計算 新增的叢集節點地址
  8:     Set toShutdown = new HashSet<>(peerEurekaNodeUrls);
  9:     toShutdown.removeAll(newPeerUrls);
 10
 11:     // 計算 刪除的叢集節點地址
 12:     Set toAdd = new HashSet<>(newPeerUrls);
 13:     toAdd.removeAll(peerEurekaNodeUrls);
 14
 15:     if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
 16:         return;
 17:     }
 18
 19:     // 關閉刪除的叢集節點
 20:     // Remove peers no long available
 21:     List newNodeList = new ArrayList<>(peerEurekaNodes);
 22:     if (!toShutdown.isEmpty()) {
 23:         logger.info("Removing no longer available peer nodes {}", toShutdown);
 24:         int i = 0;
 25:         while (i  26:             PeerEurekaNode eurekaNode = newNodeList.get(i);
 27:             if (toShutdown.contains(eurekaNode.getServiceUrl())) {
 28:                 newNodeList.remove(i);
 29:                 eurekaNode.shutDown(); // 關閉
 30:             } else {
 31:                 i++;
 32:             }
 33:         }
 34:     }
 35
 36:     // 新增新增的叢集節點
 37:     // Add new peers
 38:     if (!toAdd.isEmpty()) {
 39:         logger.info("Adding new peer nodes {}", toAdd);
 40:         for (String peerUrl : toAdd) {
 41:             newNodeList.add(createPeerEurekaNode(peerUrl));
 42:         }
 43:     }
 44
 45:     // 賦值
 46:     this.peerEurekaNodes = newNodeList;
 47:     this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
 48: }
  • 第 7 至 9 行 :計算新增的叢集節點地址。

  • 第 11 至 13 行 :計算刪除的叢集節點地址。

  • 第 19 至 34 行 :關閉刪除的叢集節點。

  • 第 36 至 43 行 :新增新增的叢集節點。呼叫 #createPeerEurekaNode(peerUrl) 方法,建立叢集節點,程式碼如下:

      1protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
      2:     HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
      3:     String targetHost = hostFromUrl(peerEurekaNodeUrl);
      4:     if (targetHost == null) {
      5:         targetHost = "host";
      6:     }
      7:     return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
      8: }
    • 第 2 行 :建立 Eureka-Server 叢集通訊客戶端,在 《Eureka 原始碼解析 —— 網路通訊》「4.2 JerseyReplicationClient」 有詳細解析。

    • 第 7 行 :建立 PeerEurekaNode ,在 「2.3 PeerEurekaNode」 有詳細解析。

2.3 叢集節點

com.netflix.eureka.cluster.PeerEurekaNode ,單個叢集節點。

點選 連結 檢視構造方法

  • 第 129 行 :建立 ReplicationTaskProcessor 。在 「4.1.2 同步操作任務處理器」 詳細解析

  • 第 131 至 140 行 :建立批次任務分發器,在 《Eureka 原始碼解析 —— 任務批處理》 有詳細解析。

  • 第 142 至 151 行 :建立單任務分發器,用於 Eureka-Server 向亞馬遜 AWS 的 ASG ( Autoscaling Group ) 同步狀態。暫時跳過。

3. 獲取初始註冊資訊

Eureka-Server 啟動時,呼叫 PeerAwareInstanceRegistryImpl#syncUp() 方法,從叢集的一個 Eureka-Server 節點獲取初始註冊資訊,程式碼如下:

  1@Override
  2public int syncUp() {
  3:     // Copy entire entry from neighboring DS node
  4:     int count = 0;
  5
  6:     for (int i = 0; ((i 0)); i++) {
  7:         // 未讀取到註冊資訊,sleep 等待
  8:         if (i > 0) {
  9:             try {
 10:                 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
 11:             } catch (InterruptedException e) {
 12:                 logger.warn("Interrupted during registry transfer..");
 13:                 break;
 14:             }
 15:         }
 16
 17:         // 獲取註冊資訊
 18:         Applications apps = eurekaClient.getApplications();
 19:         for (Application app : apps.getRegisteredApplications()) {
 20:             for (InstanceInfo instance : app.getInstances()) {
 21:                 try {
 22:                     if (isRegisterable(instance)) { // 判斷是否能夠註冊
 23:                         register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 註冊
 24:                         count++;
 25:                     }
 26:                 } catch (Throwable t) {
 27:                     logger.error("During DS init copy", t);
 28:                 }
 29:             }
 30:         }
 31:     }
 32:     return count;
 33: }
  • 第 7 至 15 行 :未獲取到註冊資訊,sleep 等待再次重試。

  • 第 17 至 30 行 :獲取註冊資訊,若獲取到,註冊到自身節點。

    • 第 22 行 :判斷應用實體是否能夠註冊到自身節點。主要用於亞馬遜 AWS 環境下的判斷,若非部署在亞馬遜裡,都傳回 `true` 。點選 連結 檢視實現。

    • 第 23 行 :呼叫 `#register()` 方法,註冊應用實體到自身節點。在 《Eureka 原始碼解析 —— 應用實體註冊發現(一)之註冊》 有詳細解析。


若呼叫 #syncUp() 方法,未獲取到應用實體,則 Eureka-Server 會有一段時間( 預設:5 分鐘,可配 )不允許被 Eureka-Client 獲取註冊資訊,避免影響 Eureka-Client 。

  • 標記 Eureka-Server 啟動時,未獲取到應用實體,程式碼如下:

    // PeerAwareInstanceRegistryImpl.java

    private boolean peerInstancesTransferEmptyOnStartup = true;

    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // ... 省略其他程式碼
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        // ... 省略其他程式碼
    }
  • 判斷 Eureka-Server 是否允許被 Eureka-Client 獲取註冊資訊,程式碼如下:

    // PeerAwareInstanceRegistryImpl.java
    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
       if (this.peerInstancesTransferEmptyOnStartup) {
           // 設定啟動時間
           this.startupTime = System.currentTimeMillis();
           if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
               return false;
           }
       }
       // ... 省略其他程式碼
       return true;
    }

4. 同步註冊資訊

Eureka-Server 叢集同步註冊資訊如下圖:

  • Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定間隔( 預設值 :500 毫秒,可配 )向 Eureka-Server 叢集內其他節點同步( 準實時,非實時 )。

4.1 同步操作型別

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action ,同步操作型別,程式碼如下:

public enum Action {
   Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

   // ... 省略監控相關屬性
}
  • Register ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(一)之註冊》 有詳細解析

  • Heartbeat ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(二)之續租》 有詳細解析

  • Cancel ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(三)之下線》 有詳細解析

  • StatusUpdate ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(八)之改寫狀態》 有詳細解析

  • DeleteStatusOverride ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(八)之改寫狀態》 有詳細解析

4.2 發起 Eureka-Server 同步操作

Eureka-Server 在完成 Eureka-Client 發起的上述操作在自身節點的執行後,向叢集內其他 Eureka-Server 發起同步操作。以 Register 操作舉例子,程式碼如下:

// PeerAwareInstanceRegistryImpl.java
public void register(final InstanceInfo info, final boolean isReplication) {
   // 租約過期時間
   int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
   if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
       leaseDuration = info.getLeaseInfo().getDurationInSecs();
   }
   // 註冊應用實體資訊
   super.register(info, leaseDuration, isReplication);
   // Eureka-Server 複製
   replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 最後一行,呼叫 #replicateToPeers(…) 方法,傳遞對應的同步操作型別,發起同步操作。


#replicateToPeers(...) 方法,程式碼如下:

  1private void replicateToPeers(Action action, String appName, String id,
  2:                               InstanceInfo info /* optional */,
  3:                               InstanceStatus newStatus /* optional */boolean isReplication)
 
{
  4:     Stopwatch tracer = action.getTimer().start();
  5:     try {
  6:         if (isReplication) {
  7:             numberOfReplicationsLastMin.increment();
  8:         }
  9
 10:         // Eureka-Server 發起的請求 或者 叢集為空
 11:         // If it is a replication already, do not replicate again as this will create a poison replication
 12:         if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
 13:             return;
 14:         }
 15
 16:         for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
 17:             // If the url represents this host, do not replicate to yourself.
 18:             if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
 19:                 continue;
 20:             }
 21:             replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
 22:         }
 23:     } finally {
 24:         tracer.stop();
 25:     }
 26: }
  • 第 10 至 14 行 :Eureka-Server 在處理上述操作( Action ),無論來自 Eureka-Client 發起請求,還是 Eureka-Server 發起同步,呼叫的內部方法相同,透過 isReplication=true 引數,避免死迴圈同步。

  • 第 16 至 22 行 :迴圈叢集內每個節點,呼叫 #replicateInstanceActionsToPeers(…) 方法,發起同步操作。


#replicateInstanceActionsToPeers(...) 方法,程式碼如下:

 // ... 省略程式碼,太長了。
  • Cancel :呼叫 PeerEurekaNode#cancel(…) 方法,點選 連結 檢視實現。

  • Heartbeat :呼叫 PeerEurekaNode#heartbeat(…) 方法,點選 連結 檢視實現。

  • Register :呼叫 PeerEurekaNode#register(…) 方法,點選 連結 檢視實現。

  • StatusUpdate :呼叫 PeerEurekaNode#statusUpdate(…) 方法,點選 連結 檢視實現。

  • DeleteStatusOverride :呼叫 PeerEurekaNode#deleteStatusOverride(…) 方法,點選 連結 檢視實現。

  • 上面的每個方法實現,我們會看到類似這麼一段程式碼 :

    batchingDispatcher.process(
        taskId("${action}", appName, id), // id
        new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
        @Override
        public EurekaHttpResponse execute() {
            return replicationClient.doString(...);
        }

        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            // do Something...
        }

    }, // ReplicationTask 子類
    expiryTime

    )

    • 相同應用實體的相同同步操作使用相同任務編號。在 《Eureka 原始碼解析 —— 任務批處理》「2. 整體流程」 中,我們看到” 接收執行緒( Runner )合併任務,將相同任務編號的任務合併,只執行一次。 “,因此,相同應用實體的相同同步操作就能被合併,減少操作量。例如,Eureka-Server 同步某個應用實體的 Heartbeat 操作,接收同步的 Eureak-Server 掛了,一方面這個應用的這次操作會重試,另一方面,這個應用實體會發起新的 Heartbeat 操作,透過任務編號合併,接收同步的 Eureka-Server 恢復後,減少收到重覆積壓的任務。

    • #task(...) 方法,生成同步操作任務編號。程式碼如下:

      private static String taskId(String requestType, String appName, String id) {
         return requestType + '#' + appName + '/' + id;
      }
  • InstanceReplicationTask ,同步操作任務,在 「4.1.1 同步操作任務」 詳細解析。

  • expiryTime ,任務過期時間。

4.1.1 同步操作任務

  • com.netflix.eureka.cluster.ReplicationTask ,同步任務抽象類

    • 點選 連結 檢視 ReplicationTask 程式碼。

    • 定義了 `#getTaskName()` 抽象方法。

    • 定義了 `#execute()` 抽象方法,執行同步任務。

    • 實現了 `#handleSuccess()` 方法,處理成功執行同步結果。

    • 實現了 `#handleFailure(…)` 方法,處理失敗執行同步結果。

  • com.netflix.eureka.cluster.InstanceReplicationTask ,同步應用實體任務抽象類

    • 點選 連結 檢視 InstanceReplicationTask 程式碼。

    • 實現了父類 `#getTaskName()` 抽象方法。

  • com.netflix.eureka.cluster.AsgReplicationTask ,亞馬遜 AWS 使用,暫時跳過。

從上面 PeerEurekaNode#同步操作(...) 方法,全部實現了 InstanceReplicationTask 類的 #execute() 方法,部分重寫了 #handleFailure(...) 方法。

4.1.2 同步操作任務處理器

com.netflix.eureka.cluster.InstanceReplicationTask ,實現 TaskProcessor 介面,同步操作任務處理器。

  • TaskProcessor ,在 《Eureka 原始碼解析 —— 任務批處理》「10. 任務執行器【執行任務】」 有詳細解析。

  • 點選 連結 檢視 InstanceReplicationTask 程式碼。

ReplicationTaskProcessor#process(task) ,處理單任務,用於 Eureka-Server 向亞馬遜 AWS 的 ASG ( Autoscaling Group ) 同步狀態,暫時跳過,感興趣的同學可以點選 連結 檢視方法程式碼。

ReplicationTaskProcessor#process(tasks) ,處理批次任務,用於 Eureka-Server 叢集註冊資訊的同步操作任務,透過呼叫被同步的 Eureka-Server 的 peerreplication/batch/ 介面,一次性將批次( 多個 )的同步操作任務發起請求,程式碼如下:

 // ... 省略程式碼,太長了。
  • 第 4 行 :建立批次提交同步操作任務的請求物件( ReplicationList ) 。比較易懂,咱就不囉嗦貼程式碼了。

    • ReplicationList ,點選 連結 檢視類。

    • ReplicationInstance ,點選 連結 檢視類。

    • `#createReplicationListOf(…)` ,點選 連結 檢視方法。

    • `#createReplicationInstanceOf(…)` ,點選 連結 檢視方法。

  • 第 7 行 :呼叫 JerseyReplicationClient#submitBatchUpdates(…) 方法,請求 peerreplication/batch/ 介面,一次性將批次( 多個 )的同步操作任務發起請求。

    • `JerseyReplicationClient#submitBatchUpdates(…)` 方法,點選 連結 檢視方法。

    • ReplicationListResponse ,點選 連結 檢視類。

    • ReplicationInstanceResponse ,點選 連結 檢視類。

  • 第 9 至 31 行 :處理批次提交同步操作任務的響應,在 「4.4 處理 Eureka-Server 同步結果」 詳細解析。

4.3 接收 Eureka-Server 同步操作

com.netflix.eureka.resources.PeerReplicationResource ,同步操作任務 Resource ( Controller )。

peerreplication/batch/ 介面,對映 PeerReplicationResource#batchReplication(...) 方法,程式碼如下:

 // ... 省略程式碼,太長了。
  • 第 7 至 15 行 :逐個處理單個同步操作任務,並將處理結果( ReplicationInstanceResponse ) 新增到 ReplicationListResponse 。

  • 第 23 至 50 行 :處理單個同步操作任務,傳回處理結果( ReplicationInstanceResponse )。

    • 第 24 至 25 行 :建立 ApplicationResource , InstanceResource 。我們看到,實際該方法是把單個同步操作任務提交到其他 Resource ( Controller ) 處理,Eureka-Server 收到 Eureka-Client 請求響應的 Resource ( Controller ) 是相同的邏輯

    • Register :點選 連結 檢視 `#handleRegister(…)` 方法。

    • Heartbeat :點選 連結 檢視 `#handleHeartbeat(…)` 方法。

    • Cancel :點選 連結 檢視 `#handleCancel(…)` 方法。

    • StatusUpdate :點選 連結 檢視 `#handleStatusUpdate(…)` 方法。

    • DeleteStatusOverride :點選 連結 檢視 `#handleDeleteStatusOverride(…)` 方法。

4.4 處理 Eureka-Server 同步結果

? 想想就有小激動,終於寫到這裡了。

接 ReplicationTaskProcessor#process(tasks) 方法,處理批次提交同步操作任務的響應,程式碼如下:

 // ... 省略程式碼,太長了。
  • 第 10 行 ,呼叫 #isSuccess(…) 方法,判斷請求是否成功,響應狀態碼是否在 [200, 300) 範圍內。

  • 第 11 至 13 行 :狀態碼 503 ,目前 Eureka-Server 傳回 503 的原因是被限流。在 《Eureka 原始碼解析 —— 基於令牌桶演演算法的 RateLimiter》 詳細解析。該情況為瞬時錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。

  • 第 14 至 18 行 :非預期狀態碼,目前 Eureka-Server 在程式碼上看下來,不會傳回這樣的狀態碼。該情況為永久錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。

  • 第 20 行 :請求成功,呼叫 #handleBatchResponse(…) 方法,逐個處理每個 ReplicationTask 和 ReplicationInstanceResponse 。這裡有一點要註意下,請求成功指的是整個請求成功,實際每個 ReplicationInstanceResponse 可能傳回的狀態碼不在 [200, 300) 範圍內。該方法下文詳細解析。

  • 第 23 至 25 行 :請求發生網路異常,例如網路超時,列印網路異常日誌。目前日誌的列印為部分取樣,條件為網路發生異常每間隔 10 秒列印一條,避免網路發生異常列印超級大量的日誌。該情況為永久錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。

    • #isNetworkConnectException(…) ,點選 連結 檢視方法。

    • #logNetworkErrorSample(…) ,點選 連結 檢視方法。

  • 第 26 至 29 行 :非預期異常,目前 Eureka-Server 在程式碼上看下來,不會丟擲這樣的異常。該情況為永久錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。


#handleBatchResponse(...) 方法,程式碼如下:

 // ... 省略程式碼,太長了。
  • ReplicationTask#handleSuccess() 方法,無任務同步操作任務重寫,是個空方法,程式碼如下:

    // ReplicationTask.java
    public void handleSuccess() {
    }
  • ReplicationTask#handleFailure() 方法,有兩個同步操作任務重寫:

    • x

    • Cancel :當 Eureka-Server 不存在下線的應用實體時,傳回 404 狀態碼,此時列印錯誤日誌,程式碼如下:

      // PeerEurekaNode#cancel(...)
      @Override
      public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
          super.handleFailure(statusCode, responseEntity);
          if (statusCode == 404) {
              logger.warn("{}: missing entry.", getTaskName());
          }
      }   
    • Heartbeat :情況較為複雜,我們換一行繼續說,避免排版有問題,影響閱讀。

噔噔噔恰,本文的重要頭戲來啦!Last But Very Importment !!!

Eureka-Server 是允許同一時刻允許在任意節點被 Eureka-Client 發起寫入相關的操作,網路是不可靠的資源,Eureka-Client 可能向一個 Eureka-Server 註冊成功,但是網路波動,導致 Eureka-Client 誤以為失敗,此時恰好 Eureka-Client 變更了應用實體的狀態,重試向另一個 Eureka-Server 註冊,那麼兩個 Eureka-Server 對該應用實體的狀態產生衝突。

再例如…… 我們不要繼續舉例子,網路波動真的很複雜。我們來看看 Eureka 是怎麼處理的。

應用實體( InstanceInfo ) 的 lastDirtyTimestamp 屬性,使用時間戳,表示應用實體的版本號,當請求方( 不僅僅是 Eureka-Client ,也可能是同步註冊操作的 Eureka-Server ) 向 Eureka-Server 發起註冊時,若 Eureka-Server 已存在擁有更大 lastDirtyTimestamp 該實體( 相同應用並且相同應用實體編號被認為是相同實體 ),則請求方註冊的應用實體( InstanceInfo ) 無法改寫註冊此 Eureka-Server 的該實體( 見 AbstractInstanceRegistry#register(...) 方法 )。例如我們上面舉的例子,第一個 Eureka-Server 向 第二個 Eureka-Server 同步註冊應用實體時,不會註冊改寫,反倒是第二個 Eureka-Server 同步註冊應用到第一個 Eureka-Server ,註冊改寫成功,因為 lastDirtyTimestamp ( 應用實體狀態變更時,可以設定 lastDirtyTimestamp 為當前時間,見 ApplicationInfoManager#setInstanceStatus(status) 方法 )。

但是光靠註冊請求判斷 lastDirtyTimestamp 顯然是不夠的,因為網路異常情況下時,同步操作任務多次執行失敗到達過期時間後,此時在 Eureka-Server 叢集同步起到最終一致性最最最關鍵性出現了:Heartbeat 。因為 Heartbeat 會週期性的執行,透過它一方面可以判斷 Eureka-Server 是否存在心跳對應的應用實體,另外一方面可以比較應用實體的 lastDirtyTimestamp 。當滿足下麵任意條件,Eureka-Server 傳回 404 狀態碼:

  • 1)Eureka-Server 應用實體不存在,點選 連結 檢視觸發條件程式碼位置。

  • 2)Eureka-Server 應用實體狀態為 UNKNOWN,點選 連結 檢視觸發條件程式碼位置。為什麼會是 UNKNOWN,在 《Eureka 原始碼解析 —— 應用實體註冊發現(八)之改寫狀態》「 4.3 續租場景」 有詳細解析。

  • 3)請求的 lastDirtyTimestamp 更大,點選 連結 檢視觸發條件程式碼位置。

請求方接收到 404 狀態碼傳回後,認為 Eureka-Server 應用實體實際是不存在的,重新發起應用實體的註冊。以本文的 Heartbeat 為例子,程式碼如下:

// PeerEurekaNode#heartbeat(...)
  1@Override
  2public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
          // ... 省略程式碼,太長了。
 17: }
  • 第 4 至 10 行 :接收到 404 狀態碼,呼叫 #register(...) 方法,向該被心跳同步操作失敗的 Eureka-Server 發起註冊本地的應用實體的請求。

    • 上述 3) ,會使用請求引數 overriddenStatus 儲存到 Eureka-Server 的應用實體改寫狀態集合( AbstractInstanceRegistry.overriddenInstanceStatusMap ),點選 連結 檢視觸發條件程式碼位置。

  • 第 11 至 16 行 :恰好是 3) 反過來的情況,本地的應用實體的 lastDirtyTimestamp 小於 Eureka-Server 該應用實體的,此時 Eureka-Server 傳回 409 狀態碼,點選 連結 檢視觸發條件程式碼位置。呼叫 #syncInstancesIfTimestampDiffers() 方法,改寫註冊本地應用實體,點選 連結 檢視方法。

OK,撒花!記住:Eureka 透過 Heartbeat 實現 Eureka-Server 叢集同步的最終一致性。

666. 彩蛋

寫的比較嗨皮,所以就送胖友一隻胖友

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?

以下是草稿,可以湊合看

eureka server 叢集假定是 s1 s2

1)client 向 s1 註冊,有一個 lastDirtyTime ,正常情況下成功, s1 會向 s2 同步 
2)client 向 s1 註冊(成功,但是網路波動),然後 client 發生狀態的變化,lastDirtyTime 變化,向 s2 註冊。 
這個時候,s1 s2 是衝突的,但是他們會互相同步,實際 s2 => s1 的註冊會真正成功,s1 => s2 的註冊不會傳回失敗,但是實際 s2 處理的時候,用的是自身的。

心跳只是最終去校驗。

理論來說,心跳不應該帶 lastDirtyTime 引數。帶的原因就是為了做固定週期的比較。

最優解是 註冊 就處理掉資料不一致 
次優解是 心跳 處理掉資料不一致

如果在類比,

註冊,相當於 insertOrUpdate 
心跳,附加了校驗是否要發起【註冊】




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支援老艿艿↓↓

贊(0)

分享創造快樂