歡迎光臨
每天分享高質量文章

註冊中心 Eureka 原始碼解析 —— Eureka-Server 集群同步

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Eureka/server-cluster/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 Eureka 1.8.X 版本

  • 1. 概述

  • 2. 集群節點初始化與更新

    • 2.1 集群節點啟動

    • 2.2 更新集群節點信息

    • 2.3 集群節點

  • 3. 獲取初始註冊信息

  • 4. 同步註冊信息

    • 4.1 同步操作型別

    • 4.2 發起 Eureka-Server 同步操作

    • 4.3 接收 Eureka-Server 同步操作

    • 4.4 處理 Eureka-Server 同步結果


1. 概述

本文主要分享 Eureka-Server 集群同步註冊信息

Eureka-Server 集群如下圖:

  • Eureka-Server 集群不區分主從節點或者 Primary & Secondary 節點,所有節點相同角色( 也就是沒有角色 ),完全對等

  • Eureka-Client 可以向任意 Eureka-Client 發起任意讀寫操作,Eureka-Server 將操作複製到另外的 Eureka-Server 以達到最終一致性。註意,Eureka-Server 是選擇了 AP 的組件。

Eureka-Server 可以使用直接配置所有節點的服務地址,或者基於 DNS 配置。推薦閱讀:《Spring Cloud構建微服務架構(六)高可用服務註冊中心》 。

本文主要類在 com.netflix.eureka.cluster 包下。

OK,讓我們開始愉快的遨游在代碼的海洋。

推薦 Spring Cloud 書籍

  • 請支持正版。下載盜版,等於主動編寫低級 BUG 。

  • 程式猿DD —— 《Spring Cloud微服務實戰》

  • 周立 —— 《Spring Cloud與Docker微服務架構實戰》

  • 兩書齊買,京東包郵。

推薦 Spring Cloud 視頻

  • Java 微服務實踐 – Spring Boot

  • Java 微服務實踐 – Spring Cloud

  • Java 微服務實踐 – Spring Boot / Spring Cloud

ps :註意,本文提到的同步,準確來說是複製( Replication )

2. 集群節點初始化與更新

com.netflix.eureka.cluster.PeerEurekaNodes ,Eureka-Server 集群節點集合 。構造方法如下 :

public class PeerEurekaNodes {

    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);

    /**
     * 應用實體註冊表
     */

    protected final PeerAwareInstanceRegistry registry;
    /**
     * Eureka-Server 配置
     */

    protected final EurekaServerConfig serverConfig;
    /**
     * Eureka-Client 配置
     */

    protected final EurekaClientConfig clientConfig;
    /**
     * Eureka-Server 編解碼
     */

    protected final ServerCodecs serverCodecs;
    /**
     * 應用實體信息管理器
     */

    private final ApplicationInfoManager applicationInfoManager;

    /**
     * Eureka-Server 集群節點陣列
     */

    private volatile List peerEurekaNodes = Collections.emptyList();
    /**
     * Eureka-Server 服務地址陣列
     */

    private volatile Set peerEurekaNodeUrls = Collections.emptySet();

    /**
     * 定時任務服務
     */

    private ScheduledExecutorService taskExecutor;

    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager)
 
{
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
}
  • peerEurekaNodespeerEurekaNodeUrlstaskExecutor 屬性,在構造方法中未設置和初始化,而是在 PeerEurekaNodes#start() 方法,設置和初始化,下文我們會解析這個方法。

  • Eureka-Server 在初始化時,呼叫 EurekaBootStrap#getPeerEurekaNodes(…) 方法,創建 PeerEurekaNodes ,點擊 鏈接 查看該方法的實現。

2.1 集群節點啟動

呼叫 PeerEurekaNodes#start() 方法,集群節點啟動,主要完成兩個邏輯:

  • 初始化集群節點信息

  • 初始化固定周期( 預設:10 分鐘,可配置 )更新集群節點信息的任務

代碼如下:

  1public void start() {
  2:     // 創建 定時任務服務
  3:     taskExecutor = Executors.newSingleThreadScheduledExecutor(
  4:             new ThreadFactory() {
  5:                 @Override
  6:                 public Thread newThread(Runnable r) {
  7:                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
  8:                     thread.setDaemon(true);
  9:                     return thread;
 10:                 }
 11:             }
 12:     );
 13:     try {
 14:         // 初始化 集群節點信息
 15:         updatePeerEurekaNodes(resolvePeerUrls());
 16:         // 初始化 初始化固定周期更新集群節點信息的任務
 17:         Runnable peersUpdateTask = new Runnable() {
 18:             @Override
 19:             public void run() {
 20:                 try {
 21:                     updatePeerEurekaNodes(resolvePeerUrls());
 22:                 } catch (Throwable e) {
 23:                     logger.error("Cannot update the replica Nodes", e);
 24:                 }
 25
 26:             }
 27:         };
 28:         taskExecutor.scheduleWithFixedDelay(
 29:                 peersUpdateTask,
 30:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 31:                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
 32:                 TimeUnit.MILLISECONDS
 33:         );
 34:     } catch (Exception e) {
 35:         throw new IllegalStateException(e);
 36:     }
 37:     // 打印 集群節點信息
 38:     for (PeerEurekaNode node : peerEurekaNodes) {
 39:         logger.info("Replica node URL:  " + node.getServiceUrl());
 40:     }
 41: }
  • 第 15 行 && 第 21 行 :呼叫 #updatePeerEurekaNodes() 方法,更新集群節點信息。

2.2 更新集群節點信息

呼叫 #resolvePeerUrls() 方法,獲得 Eureka-Server 集群服務地址陣列,代碼如下:

  1protected List resolvePeerUrls() {
  2:     // 獲得 Eureka-Server 集群服務地址陣列
  3:     InstanceInfo myInfo = applicationInfoManager.getInfo();
  4:     String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
  5:     List replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
  6
  7:     // 移除自己(避免向自己同步)
  8:     int idx = 0;
  9:     while (idx  10:         if (isThisMyUrl(replicaUrls.get(idx))) {
 11:             replicaUrls.remove(idx);
 12:         } else {
 13:             idx++;
 14:         }
 15:     }
 16:     return replicaUrls;
 17: }
  • 第 2 至 5 行 :獲得 Eureka-Server 集群服務地址陣列。EndpointUtils#getDiscoveryServiceUrls(…) 方法,邏輯與 《Eureka 原始碼解析 —— EndPoint 與 解析器》「3.4 ConfigClusterResolver」 基本類似。EndpointUtils 正在逐步,猜測未來這裡會替換。

  • 第 7 至 15 行 :移除自身節點,避免向自己同步。


呼叫 #updatePeerEurekaNodes() 方法,更新集群節點信息,主要完成兩部分邏輯:

  • 添加新增的集群節點

  • 關閉刪除的集群節點

代碼如下:

  1protected void updatePeerEurekaNodes(List newPeerUrls) {
  2:     if (newPeerUrls.isEmpty()) {
  3:         logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
  4:         return;
  5:     }
  6
  7:     // 計算 新增的集群節點地址
  8:     Set toShutdown = new HashSet<>(peerEurekaNodeUrls);
  9:     toShutdown.removeAll(newPeerUrls);
 10
 11:     // 計算 刪除的集群節點地址
 12:     Set toAdd = new HashSet<>(newPeerUrls);
 13:     toAdd.removeAll(peerEurekaNodeUrls);
 14
 15:     if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
 16:         return;
 17:     }
 18
 19:     // 關閉刪除的集群節點
 20:     // Remove peers no long available
 21:     List newNodeList = new ArrayList<>(peerEurekaNodes);
 22:     if (!toShutdown.isEmpty()) {
 23:         logger.info("Removing no longer available peer nodes {}", toShutdown);
 24:         int i = 0;
 25:         while (i  26:             PeerEurekaNode eurekaNode = newNodeList.get(i);
 27:             if (toShutdown.contains(eurekaNode.getServiceUrl())) {
 28:                 newNodeList.remove(i);
 29:                 eurekaNode.shutDown(); // 關閉
 30:             } else {
 31:                 i++;
 32:             }
 33:         }
 34:     }
 35
 36:     // 添加新增的集群節點
 37:     // Add new peers
 38:     if (!toAdd.isEmpty()) {
 39:         logger.info("Adding new peer nodes {}", toAdd);
 40:         for (String peerUrl : toAdd) {
 41:             newNodeList.add(createPeerEurekaNode(peerUrl));
 42:         }
 43:     }
 44
 45:     // 賦值
 46:     this.peerEurekaNodes = newNodeList;
 47:     this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
 48: }
  • 第 7 至 9 行 :計算新增的集群節點地址。

  • 第 11 至 13 行 :計算刪除的集群節點地址。

  • 第 19 至 34 行 :關閉刪除的集群節點。

  • 第 36 至 43 行 :添加新增的集群節點。呼叫 #createPeerEurekaNode(peerUrl) 方法,創建集群節點,代碼如下:

      1protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
      2:     HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
      3:     String targetHost = hostFromUrl(peerEurekaNodeUrl);
      4:     if (targetHost == null) {
      5:         targetHost = "host";
      6:     }
      7:     return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
      8: }
    • 第 2 行 :創建 Eureka-Server 集群通信客戶端,在 《Eureka 原始碼解析 —— 網絡通信》「4.2 JerseyReplicationClient」 有詳細解析。

    • 第 7 行 :創建 PeerEurekaNode ,在 「2.3 PeerEurekaNode」 有詳細解析。

2.3 集群節點

com.netflix.eureka.cluster.PeerEurekaNode ,單個集群節點。

點擊 鏈接 查看構造方法

  • 第 129 行 :創建 ReplicationTaskProcessor 。在 「4.1.2 同步操作任務處理器」 詳細解析

  • 第 131 至 140 行 :創建批量任務分發器,在 《Eureka 原始碼解析 —— 任務批處理》 有詳細解析。

  • 第 142 至 151 行 :創建單任務分發器,用於 Eureka-Server 向亞馬遜 AWS 的 ASG ( Autoscaling Group ) 同步狀態。暫時跳過。

3. 獲取初始註冊信息

Eureka-Server 啟動時,呼叫 PeerAwareInstanceRegistryImpl#syncUp() 方法,從集群的一個 Eureka-Server 節點獲取初始註冊信息,代碼如下:

  1@Override
  2public int syncUp() {
  3:     // Copy entire entry from neighboring DS node
  4:     int count = 0;
  5
  6:     for (int i = 0; ((i 0)); i++) {
  7:         // 未讀取到註冊信息,sleep 等待
  8:         if (i > 0) {
  9:             try {
 10:                 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
 11:             } catch (InterruptedException e) {
 12:                 logger.warn("Interrupted during registry transfer..");
 13:                 break;
 14:             }
 15:         }
 16
 17:         // 獲取註冊信息
 18:         Applications apps = eurekaClient.getApplications();
 19:         for (Application app : apps.getRegisteredApplications()) {
 20:             for (InstanceInfo instance : app.getInstances()) {
 21:                 try {
 22:                     if (isRegisterable(instance)) { // 判斷是否能夠註冊
 23:                         register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 註冊
 24:                         count++;
 25:                     }
 26:                 } catch (Throwable t) {
 27:                     logger.error("During DS init copy", t);
 28:                 }
 29:             }
 30:         }
 31:     }
 32:     return count;
 33: }
  • 第 7 至 15 行 :未獲取到註冊信息,sleep 等待再次重試。

  • 第 17 至 30 行 :獲取註冊信息,若獲取到,註冊到自身節點。

    • 第 22 行 :判斷應用實體是否能夠註冊到自身節點。主要用於亞馬遜 AWS 環境下的判斷,若非部署在亞馬遜里,都傳回 `true` 。點擊 鏈接 查看實現。

    • 第 23 行 :呼叫 `#register()` 方法,註冊應用實體到自身節點。在 《Eureka 原始碼解析 —— 應用實體註冊發現(一)之註冊》 有詳細解析。


若呼叫 #syncUp() 方法,未獲取到應用實體,則 Eureka-Server 會有一段時間( 預設:5 分鐘,可配 )不允許被 Eureka-Client 獲取註冊信息,避免影響 Eureka-Client 。

  • 標記 Eureka-Server 啟動時,未獲取到應用實體,代碼如下:

    // PeerAwareInstanceRegistryImpl.java

    private boolean peerInstancesTransferEmptyOnStartup = true;

    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // ... 省略其他代碼
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        // ... 省略其他代碼
    }
  • 判斷 Eureka-Server 是否允許被 Eureka-Client 獲取註冊信息,代碼如下:

    // PeerAwareInstanceRegistryImpl.java
    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
       if (this.peerInstancesTransferEmptyOnStartup) {
           // 設置啟動時間
           this.startupTime = System.currentTimeMillis();
           if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
               return false;
           }
       }
       // ... 省略其他代碼
       return true;
    }

4. 同步註冊信息

Eureka-Server 集群同步註冊信息如下圖:

  • Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定間隔( 預設值 :500 毫秒,可配 )向 Eureka-Server 集群內其他節點同步( 準實時,非實時 )。

4.1 同步操作型別

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action ,同步操作型別,代碼如下:

public enum Action {
   Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

   // ... 省略監控相關屬性
}
  • Register ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(一)之註冊》 有詳細解析

  • Heartbeat ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(二)之續租》 有詳細解析

  • Cancel ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(三)之下線》 有詳細解析

  • StatusUpdate ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(八)之改寫狀態》 有詳細解析

  • DeleteStatusOverride ,在 《Eureka 原始碼解析 —— 應用實體註冊發現(八)之改寫狀態》 有詳細解析

4.2 發起 Eureka-Server 同步操作

Eureka-Server 在完成 Eureka-Client 發起的上述操作在自身節點的執行後,向集群內其他 Eureka-Server 發起同步操作。以 Register 操作舉例子,代碼如下:

// PeerAwareInstanceRegistryImpl.java
public void register(final InstanceInfo info, final boolean isReplication) {
   // 租約過期時間
   int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
   if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
       leaseDuration = info.getLeaseInfo().getDurationInSecs();
   }
   // 註冊應用實體信息
   super.register(info, leaseDuration, isReplication);
   // Eureka-Server 複製
   replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
  • 最後一行,呼叫 #replicateToPeers(…) 方法,傳遞對應的同步操作型別,發起同步操作。


#replicateToPeers(...) 方法,代碼如下:

  1private void replicateToPeers(Action action, String appName, String id,
  2:                               InstanceInfo info /* optional */,
  3:                               InstanceStatus newStatus /* optional */boolean isReplication)
 
{
  4:     Stopwatch tracer = action.getTimer().start();
  5:     try {
  6:         if (isReplication) {
  7:             numberOfReplicationsLastMin.increment();
  8:         }
  9
 10:         // Eureka-Server 發起的請求 或者 集群為空
 11:         // If it is a replication already, do not replicate again as this will create a poison replication
 12:         if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
 13:             return;
 14:         }
 15
 16:         for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
 17:             // If the url represents this host, do not replicate to yourself.
 18:             if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
 19:                 continue;
 20:             }
 21:             replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
 22:         }
 23:     } finally {
 24:         tracer.stop();
 25:     }
 26: }
  • 第 10 至 14 行 :Eureka-Server 在處理上述操作( Action ),無論來自 Eureka-Client 發起請求,還是 Eureka-Server 發起同步,呼叫的內部方法相同,通過 isReplication=true 引數,避免死迴圈同步。

  • 第 16 至 22 行 :迴圈集群內每個節點,呼叫 #replicateInstanceActionsToPeers(…) 方法,發起同步操作。


#replicateInstanceActionsToPeers(...) 方法,代碼如下:

 // ... 省略代碼,太長了。
  • Cancel :呼叫 PeerEurekaNode#cancel(…) 方法,點擊 鏈接 查看實現。

  • Heartbeat :呼叫 PeerEurekaNode#heartbeat(…) 方法,點擊 鏈接 查看實現。

  • Register :呼叫 PeerEurekaNode#register(…) 方法,點擊 鏈接 查看實現。

  • StatusUpdate :呼叫 PeerEurekaNode#statusUpdate(…) 方法,點擊 鏈接 查看實現。

  • DeleteStatusOverride :呼叫 PeerEurekaNode#deleteStatusOverride(…) 方法,點擊 鏈接 查看實現。

  • 上面的每個方法實現,我們會看到類似這麼一段代碼 :

    batchingDispatcher.process(
        taskId("${action}", appName, id), // id
        new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
        @Override
        public EurekaHttpResponse execute() {
            return replicationClient.doString(...);
        }

        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            // do Something...
        }

    }, // ReplicationTask 子類
    expiryTime

    )

    • 相同應用實體的相同同步操作使用相同任務編號。在 《Eureka 原始碼解析 —— 任務批處理》「2. 整體流程」 中,我們看到” 接收執行緒( Runner )合併任務,將相同任務編號的任務合併,只執行一次。 “,因此,相同應用實體的相同同步操作就能被合併,減少操作量。例如,Eureka-Server 同步某個應用實體的 Heartbeat 操作,接收同步的 Eureak-Server 掛了,一方面這個應用的這次操作會重試,另一方面,這個應用實體會發起新的 Heartbeat 操作,通過任務編號合併,接收同步的 Eureka-Server 恢復後,減少收到重覆積壓的任務。

    • #task(...) 方法,生成同步操作任務編號。代碼如下:

      private static String taskId(String requestType, String appName, String id) {
         return requestType + '#' + appName + '/' + id;
      }
  • InstanceReplicationTask ,同步操作任務,在 「4.1.1 同步操作任務」 詳細解析。

  • expiryTime ,任務過期時間。

4.1.1 同步操作任務

  • com.netflix.eureka.cluster.ReplicationTask ,同步任務抽象類

    • 點擊 鏈接 查看 ReplicationTask 代碼。

    • 定義了 `#getTaskName()` 抽象方法。

    • 定義了 `#execute()` 抽象方法,執行同步任務。

    • 實現了 `#handleSuccess()` 方法,處理成功執行同步結果。

    • 實現了 `#handleFailure(…)` 方法,處理失敗執行同步結果。

  • com.netflix.eureka.cluster.InstanceReplicationTask ,同步應用實體任務抽象類

    • 點擊 鏈接 查看 InstanceReplicationTask 代碼。

    • 實現了父類 `#getTaskName()` 抽象方法。

  • com.netflix.eureka.cluster.AsgReplicationTask ,亞馬遜 AWS 使用,暫時跳過。

從上面 PeerEurekaNode#同步操作(...) 方法,全部實現了 InstanceReplicationTask 類的 #execute() 方法,部分重寫了 #handleFailure(...) 方法。

4.1.2 同步操作任務處理器

com.netflix.eureka.cluster.InstanceReplicationTask ,實現 TaskProcessor 接口,同步操作任務處理器。

  • TaskProcessor ,在 《Eureka 原始碼解析 —— 任務批處理》「10. 任務執行器【執行任務】」 有詳細解析。

  • 點擊 鏈接 查看 InstanceReplicationTask 代碼。

ReplicationTaskProcessor#process(task) ,處理單任務,用於 Eureka-Server 向亞馬遜 AWS 的 ASG ( Autoscaling Group ) 同步狀態,暫時跳過,感興趣的同學可以點擊 鏈接 查看方法代碼。

ReplicationTaskProcessor#process(tasks) ,處理批量任務,用於 Eureka-Server 集群註冊信息的同步操作任務,通過呼叫被同步的 Eureka-Server 的 peerreplication/batch/ 接口,一次性將批量( 多個 )的同步操作任務發起請求,代碼如下:

 // ... 省略代碼,太長了。
  • 第 4 行 :創建批量提交同步操作任務的請求物件( ReplicationList ) 。比較易懂,咱就不啰嗦貼代碼了。

    • ReplicationList ,點擊 鏈接 查看類。

    • ReplicationInstance ,點擊 鏈接 查看類。

    • `#createReplicationListOf(…)` ,點擊 鏈接 查看方法。

    • `#createReplicationInstanceOf(…)` ,點擊 鏈接 查看方法。

  • 第 7 行 :呼叫 JerseyReplicationClient#submitBatchUpdates(…) 方法,請求 peerreplication/batch/ 接口,一次性將批量( 多個 )的同步操作任務發起請求。

    • `JerseyReplicationClient#submitBatchUpdates(…)` 方法,點擊 鏈接 查看方法。

    • ReplicationListResponse ,點擊 鏈接 查看類。

    • ReplicationInstanceResponse ,點擊 鏈接 查看類。

  • 第 9 至 31 行 :處理批量提交同步操作任務的響應,在 「4.4 處理 Eureka-Server 同步結果」 詳細解析。

4.3 接收 Eureka-Server 同步操作

com.netflix.eureka.resources.PeerReplicationResource ,同步操作任務 Resource ( Controller )。

peerreplication/batch/ 接口,映射 PeerReplicationResource#batchReplication(...) 方法,代碼如下:

 // ... 省略代碼,太長了。
  • 第 7 至 15 行 :逐個處理單個同步操作任務,並將處理結果( ReplicationInstanceResponse ) 添加到 ReplicationListResponse 。

  • 第 23 至 50 行 :處理單個同步操作任務,傳回處理結果( ReplicationInstanceResponse )。

    • 第 24 至 25 行 :創建 ApplicationResource , InstanceResource 。我們看到,實際該方法是把單個同步操作任務提交到其他 Resource ( Controller ) 處理,Eureka-Server 收到 Eureka-Client 請求響應的 Resource ( Controller ) 是相同的邏輯

    • Register :點擊 鏈接 查看 `#handleRegister(…)` 方法。

    • Heartbeat :點擊 鏈接 查看 `#handleHeartbeat(…)` 方法。

    • Cancel :點擊 鏈接 查看 `#handleCancel(…)` 方法。

    • StatusUpdate :點擊 鏈接 查看 `#handleStatusUpdate(…)` 方法。

    • DeleteStatusOverride :點擊 鏈接 查看 `#handleDeleteStatusOverride(…)` 方法。

4.4 處理 Eureka-Server 同步結果

? 想想就有小激動,終於寫到這裡了。

接 ReplicationTaskProcessor#process(tasks) 方法,處理批量提交同步操作任務的響應,代碼如下:

 // ... 省略代碼,太長了。
  • 第 10 行 ,呼叫 #isSuccess(…) 方法,判斷請求是否成功,響應狀態碼是否在 [200, 300) 範圍內。

  • 第 11 至 13 行 :狀態碼 503 ,目前 Eureka-Server 傳回 503 的原因是被限流。在 《Eureka 原始碼解析 —— 基於令牌桶演算法的 RateLimiter》 詳細解析。該情況為瞬時錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。

  • 第 14 至 18 行 :非預期狀態碼,目前 Eureka-Server 在代碼上看下來,不會傳回這樣的狀態碼。該情況為永久錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。

  • 第 20 行 :請求成功,呼叫 #handleBatchResponse(…) 方法,逐個處理每個 ReplicationTask 和 ReplicationInstanceResponse 。這裡有一點要註意下,請求成功指的是整個請求成功,實際每個 ReplicationInstanceResponse 可能傳回的狀態碼不在 [200, 300) 範圍內。該方法下文詳細解析。

  • 第 23 至 25 行 :請求發生網絡異常,例如網絡超時,打印網絡異常日誌。目前日誌的打印為部分採樣,條件為網絡發生異常每間隔 10 秒打印一條,避免網絡發生異常打印超級大量的日誌。該情況為永久錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。

    • #isNetworkConnectException(…) ,點擊 鏈接 查看方法。

    • #logNetworkErrorSample(…) ,點擊 鏈接 查看方法。

  • 第 26 至 29 行 :非預期異常,目前 Eureka-Server 在代碼上看下來,不會丟擲這樣的異常。該情況為永久錯誤,會重試該同步操作任務,在 《Eureka 原始碼解析 —— 任務批處理》「3. 任務處理器」 有詳細解析。


#handleBatchResponse(...) 方法,代碼如下:

 // ... 省略代碼,太長了。
  • ReplicationTask#handleSuccess() 方法,無任務同步操作任務重寫,是個空方法,代碼如下:

    // ReplicationTask.java
    public void handleSuccess() {
    }
  • ReplicationTask#handleFailure() 方法,有兩個同步操作任務重寫:

    • x

    • Cancel :當 Eureka-Server 不存在下線的應用實體時,傳回 404 狀態碼,此時打印錯誤日誌,代碼如下:

      // PeerEurekaNode#cancel(...)
      @Override
      public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
          super.handleFailure(statusCode, responseEntity);
          if (statusCode == 404) {
              logger.warn("{}: missing entry.", getTaskName());
          }
      }   
    • Heartbeat :情況較為複雜,我們換一行繼續說,避免排版有問題,影響閱讀。

噔噔噔恰,本文的重要頭戲來啦!Last But Very Importment !!!

Eureka-Server 是允許同一時刻允許在任意節點被 Eureka-Client 發起寫入相關的操作,網絡是不可靠的資源,Eureka-Client 可能向一個 Eureka-Server 註冊成功,但是網絡波動,導致 Eureka-Client 誤以為失敗,此時恰好 Eureka-Client 變更了應用實體的狀態,重試向另一個 Eureka-Server 註冊,那麼兩個 Eureka-Server 對該應用實體的狀態產生衝突。

再例如…… 我們不要繼續舉例子,網絡波動真的很複雜。我們來看看 Eureka 是怎麼處理的。

應用實體( InstanceInfo ) 的 lastDirtyTimestamp 屬性,使用時間戳,表示應用實體的版本號,當請求方( 不僅僅是 Eureka-Client ,也可能是同步註冊操作的 Eureka-Server ) 向 Eureka-Server 發起註冊時,若 Eureka-Server 已存在擁有更大 lastDirtyTimestamp 該實體( 相同應用並且相同應用實體編號被認為是相同實體 ),則請求方註冊的應用實體( InstanceInfo ) 無法改寫註冊此 Eureka-Server 的該實體( 見 AbstractInstanceRegistry#register(...) 方法 )。例如我們上面舉的例子,第一個 Eureka-Server 向 第二個 Eureka-Server 同步註冊應用實體時,不會註冊改寫,反倒是第二個 Eureka-Server 同步註冊應用到第一個 Eureka-Server ,註冊改寫成功,因為 lastDirtyTimestamp ( 應用實體狀態變更時,可以設置 lastDirtyTimestamp 為當前時間,見 ApplicationInfoManager#setInstanceStatus(status) 方法 )。

但是光靠註冊請求判斷 lastDirtyTimestamp 顯然是不夠的,因為網絡異常情況下時,同步操作任務多次執行失敗到達過期時間後,此時在 Eureka-Server 集群同步起到最終一致性最最最關鍵性出現了:Heartbeat 。因為 Heartbeat 會周期性的執行,通過它一方面可以判斷 Eureka-Server 是否存在心跳對應的應用實體,另外一方面可以比較應用實體的 lastDirtyTimestamp 。當滿足下麵任意條件,Eureka-Server 傳回 404 狀態碼:

  • 1)Eureka-Server 應用實體不存在,點擊 鏈接 查看觸發條件代碼位置。

  • 2)Eureka-Server 應用實體狀態為 UNKNOWN,點擊 鏈接 查看觸發條件代碼位置。為什麼會是 UNKNOWN,在 《Eureka 原始碼解析 —— 應用實體註冊發現(八)之改寫狀態》「 4.3 續租場景」 有詳細解析。

  • 3)請求的 lastDirtyTimestamp 更大,點擊 鏈接 查看觸發條件代碼位置。

請求方接收到 404 狀態碼傳回後,認為 Eureka-Server 應用實體實際是不存在的,重新發起應用實體的註冊。以本文的 Heartbeat 為例子,代碼如下:

// PeerEurekaNode#heartbeat(...)
  1@Override
  2public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
          // ... 省略代碼,太長了。
 17: }
  • 第 4 至 10 行 :接收到 404 狀態碼,呼叫 #register(...) 方法,向該被心跳同步操作失敗的 Eureka-Server 發起註冊本地的應用實體的請求。

    • 上述 3) ,會使用請求引數 overriddenStatus 儲存到 Eureka-Server 的應用實體改寫狀態集合( AbstractInstanceRegistry.overriddenInstanceStatusMap ),點擊 鏈接 查看觸發條件代碼位置。

  • 第 11 至 16 行 :恰好是 3) 反過來的情況,本地的應用實體的 lastDirtyTimestamp 小於 Eureka-Server 該應用實體的,此時 Eureka-Server 傳回 409 狀態碼,點擊 鏈接 查看觸發條件代碼位置。呼叫 #syncInstancesIfTimestampDiffers() 方法,改寫註冊本地應用實體,點擊 鏈接 查看方法。

OK,撒花!記住:Eureka 通過 Heartbeat 實現 Eureka-Server 集群同步的最終一致性。

666. 彩蛋

寫的比較嗨皮,所以就送胖友一隻胖友

胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?

以下是草稿,可以湊合看

eureka server 集群假定是 s1 s2

1)client 向 s1 註冊,有一個 lastDirtyTime ,正常情況下成功, s1 會向 s2 同步 
2)client 向 s1 註冊(成功,但是網絡波動),然後 client 發生狀態的變化,lastDirtyTime 變化,向 s2 註冊。 
這個時候,s1 s2 是衝突的,但是他們會互相同步,實際 s2 => s1 的註冊會真正成功,s1 => s2 的註冊不會傳回失敗,但是實際 s2 處理的時候,用的是自身的。

心跳只是最終去校驗。

理論來說,心跳不應該帶 lastDirtyTime 引數。帶的原因就是為了做固定周期的比較。

最優解是 註冊 就處理掉資料不一致 
次優解是 心跳 處理掉資料不一致

如果在類比,

註冊,相當於 insertOrUpdate 
心跳,附加了校驗是否要發起【註冊】




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂