歡迎光臨
每天分享高質量文章

分佈式作業 Elastic-Job-Lite 原始碼分析 —— 主節點選舉

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/election/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 為什麼需要選舉主節點

  • 3. 選舉主節點

  • 4. 刪除主節點

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 主節點選舉

建議前置閱讀:

  • 《Elastic-Job-Lite 原始碼分析 —— 註冊中心》

  • 《Elastic-Job-Lite 原始碼分析 —— 作業資料儲存》

  • 《Elastic-Job-Lite 原始碼分析 —— 註冊中心監聽器》

涉及到主要類的類圖如下( 打開大圖 ):

  • 粉色的類在 com.dangdang.ddframe.job.lite.internal.election 包下,實現了 Elastic-Job-Lite 主節點選舉。

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. 為什麼需要選舉主節點

首先我們來看一段官方對 Elastic-Job-Lite 的介紹:

Elastic-Job-Lite 定位為輕量級無中心化解決方案,使用 jar 包的形式提供分佈式任務的協調服務。

無中心化,意味著 Elastic-Job-Lite 不存在一個中心執行一些操作,例如:分配作業分片項。Elastic-Job-Lite 選舉主節點,通過主節點進行作業分片項分配。目前,必須在主節點執行的操作有:分配作業分片項,調解分佈式作業不一致狀態。

另外,主節點的選舉是以作業為維度。例如:有一個 Elastic-Job-Lite 集群有三個作業節點 ABC,存在兩個作業 ab,可能 a 作業的主節點是 Cb 作業的主節點是 A

3. 選舉主節點

呼叫 LeaderService#electLeader() 選舉主節點。

大體流程如下( 打開大圖 ):

實現代碼如下:

// LeaderService.java
/**
* 選舉主節點.
*/

public void electLeader() {
   log.debug("Elect a new leader now.");
   jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
   log.debug("Leader election completed.");
}

// JobNodeStorage.java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
   try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
       latch.start();
       latch.await();
       callback.execute();
   } catch (final Exception ex) {
       handleException(ex);
   }
}

// LeaderElectionExecutionCallback.java
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {

   @Override
   public void execute() {
       if (!hasLeader()) { // 當前無主節點
           jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
       }
   }
}
  • 使用 Curator LeaderLatch 分佈式鎖,保證同一時間有且僅有一個工作節點能夠呼叫 LeaderElectionExecutionCallback#execute() 方法執行主節點設置。Curator LeaderLatch 在《Elastic-Job-Lite 原始碼分析 —— 註冊中心》「3.1 在主節點執行操作」有詳細解析。

  • 在 LeaderElectionExecutionCallback#execute() 為什麼要呼叫 #hasLeader() 呢?LeaderLatch 只保證同一時間有且僅有一個工作節點,在獲得分佈式鎖的工作節點結束邏輯後,第二個工作節點會開始邏輯,如果不判斷當前是否有主節點,原來的主節點會被改寫。

    // LeaderService.java
    /**
     * 判斷是否已經有主節點.
     * 
     * @return 是否已經有主節點
     */

    public boolean hasLeader() {
        return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
    }
  • 選舉成功後,Zookeeper 儲存作業的主節點:/${JOB_NAME}/leader/electron/instance 為當前節點。該節點為臨時節點。

    bash [zk: localhost:2181(CONNECTED) 7] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance [email protected]@82496

選舉主節點時機

第一種,註冊作業啟動信息時。

/**
* 註冊作業啟動信息.

@param enabled 作業是否啟用
*/

public void registerStartUpInfo(final boolean enabled) {
   // .... 省略部分方法
   // 選舉 主節點
   leaderService.electLeader();
   // .... 省略部分方法
}
  • 新的作業啟動時,即能保證選舉出主節點。

    • 當該作業不存在主節點時,當前作業節點成為主節點。

    • 當該作業存在主節點,當前作業節主節點不變

第二種,節點資料發生變化時。

class LeaderElectionJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
           leaderService.electLeader();
       }
   }
}
  • 符合重新選舉主節點分成兩種情況。

  • 主動選舉 #isActiveElection(...)

    private boolean isActiveElection(final String path, final String data) {
        return !leaderService.hasLeader() // 不存在主節點
              && isLocalServerEnabled(path, data); // 開啟作業
    }

    private boolean isLocalServerEnabled(final String path, final String data) {
        return serverNode.isLocalServerPath(path) 
           && !ServerStatus.DISABLED.name().equals(data);
    }
    • 當作業被禁用( LiteJobConfiguration.disabled = true )時,作業是不存在主節點的。那有同學就有疑問了?LeaderService#electLeader() 沒做這個限制呀,作業註冊作業啟動信息時也進行了選舉。在「4. 刪除主節點」小結,我們會解開這個答案。這裡大家先記住這個結論。

    • 根據上面我們說的結論,這裡就很好理解了,#isActiveElection() 方法判斷了兩個條件:( 1 ) 不存在主節點;( 2 ) 開啟作業,不再禁用,因此需要進行主節點選舉落。

    • 這裡判斷開啟作業的方法 #isLocalServerEnabled(…) 有點特殊,它不是通過作業節點是否處於開啟狀態,而是該資料不是將作業節點更新成關閉狀態。舉個例子:作業節點處於禁用狀態,使用運維平臺設置作業節點開啟,會進行主節點選舉;作業節點處於開啟狀態,使用運維平臺設置作業節點禁用,不會進行主節點選舉。

  • 被動選舉 #isPassiveElection(...)

    private boolean isPassiveElection(final String path, final Type eventType) {
      return isLeaderCrashed(path, eventType) // 主節點 Crashed
              && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()); // 當前節點正在運行中(未掛掉)
    }

    private boolean isLeaderCrashed(final String path, final Type eventType) {
      return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
    }
    • 當主節點因為各種情況( 「4. 刪除主節點」會列舉 )被刪除,需要重新進行選舉。對的,必須主節點被刪除後才可以重新進行選舉

    • #isPassiveElection(…) 方法判斷了兩個條件:( 1 ) 原主節點被刪除;( 2 ) 當前節點正在運行中(未掛掉),可以參加主節點選舉。

    • #isLeaderCrashed(…) 方法雖然命名帶有 Crashed 英文,實際主作業節點正常退出也符合被動選舉條件。

等待主節點選舉完成

必須在主節點執行的操作,執行之前,需要判斷當前節點是否為主節點。如果主節點已經選舉好,可以直接進行判斷。但是,不排除主節點還沒選舉到,因而需要阻塞等待到主節點選舉完成後才能進行判斷。

實現代碼如下:

// LeaderService.java

/**
* 判斷當前節點是否是主節點.

* 如果主節點正在選舉中而導致取不到主節點, 則阻塞至主節點選舉完成再傳回.

@return 當前節點是否是主節點
*/

public boolean isLeaderUntilBlock() {
   // 不存在主節點 && 有可用的服務器節點
   while (!hasLeader() && serverService.hasAvailableServers()) {
       log.info("Leader is electing, waiting for {} ms"100);
       BlockUtils.waitingShortTime();
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { // 當前服務器節點可用
           electLeader();
       }
   }
   // 傳回當前節點是否是主節點
   return isLeader();
}
  • 呼叫 BlockUtils#waitingShortTime() 方法,選舉不到主節點進行等待,避免不間斷、無間隔的進行主節點選舉。

4. 刪除主節點

有主節點的選舉,必然有主節點的刪除,否則怎麼進行重新選舉

實現代碼如下:

// LeaderService.java
/**
* 刪除主節點供重新選舉.
*/

public void removeLeader() {
   jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
}    

刪除主節點時機

第一種,主節點行程正常關閉時。

public final class JobShutdownHookPlugin extends ShutdownHookPlugin {

    @Override
    public void shutdown() {
        CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
        if (null == regCenter) {
            return;
        }
        LeaderService leaderService = new LeaderService(regCenter, jobName);
        if (leaderService.isLeader()) {
            leaderService.removeLeader(); // 移除主節點
        }
        new InstanceService(regCenter, jobName).removeInstance();
    }
}
  • 這個比較好理解,退出行程,若該行程為主節點,需要將自己移除。

第二種,主節點行程 CRASHED 時。

${JOB_NAME}/leader/electron/instance 是臨時節點,主節點行程 CRASHED 後,超過最大會話時間,Zookeeper 自動進行刪除,觸發重新選舉邏輯。

第三種,作業被禁用時。

class LeaderAbdicationJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
           leaderService.removeLeader();
       }
   }

   private boolean isLocalServerDisabled(final String path, final String data) {
       return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
   }
}
  • 這裡就解答上面我們遺留的疑問。被禁用的作業註冊作業啟動信息時即使進行了主節點選舉,也會被該監聽器處理,移除該選舉的主節點。

第四種,主節點行程遠程關閉。

// InstanceShutdownStatusJobListener.java
class InstanceShutdownStatusJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused() // 作業未暫停調度
               && isRemoveInstance(path, eventType) // 移除【運行實體】事件
               && !isReconnectedRegistryCenter()) { // 運行實體被移除
           schedulerFacade.shutdownInstance();
       }
   }

   private boolean isRemoveInstance(final String path, final Type eventType) {
       return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
   }

   private boolean isReconnectedRegistryCenter() {
       return instanceService.isLocalJobInstanceExisted();
   }
}

// SchedulerFacade.java
/**
* 終止作業調度.
*/

public void shutdownInstance() {
   if (leaderService.isLeader()) {
       leaderService.removeLeader(); // 移除主節點
   }
   monitorService.close();
   if (reconcileService.isRunning()) {
       reconcileService.stopAsync();
   }
   JobRegistry.getInstance().shutdown(jobName);
}
  • 遠程關閉作業節點有兩種方式:

    • zkClient 發起命令:`rmr /${NAMESPACE}/${JOB_NAME}/instances/${JOB_INSTANCE_ID}`。

    • 運維平臺發起 `Shutdown` 操作。`Shutdown` 操作實質上就是第一種。



如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢

目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing

… 一共 69+ 篇

目前在知識星球更新了《Netty 原始碼解析》目錄如下:

01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap

05. 事件輪詢 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 位元組緩衝區 ByteBuf

09. 通道處理器 ChannelHandler

10. 編解碼 Codec

11. 工具類 Util

… 一共 61+ 篇

目前在知識星球更新了《資料庫物體設計》目錄如下:


01. 商品模塊
02. 交易模塊
03. 營銷模塊
04. 公用模塊

… 一共 17+ 篇


目前在知識星球更新了《Spring 原始碼解析》目錄如下:


01. 除錯環境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 載入

04. IoC BeanDefinition 註冊

05. IoC Bean 獲取

06. IoC Bean 生命周期

… 一共 35+ 篇


原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂