歡迎光臨
每天分享高質量文章

分散式作業 Elastic-Job-Lite 原始碼分析 —— 註冊中心監聽器

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper-listener/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. ListenerManager

  • 3. AbstractListenerManager

  • 4. AbstractJobListener

  • 5. RegistryCenterConnectionStateListener

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 註冊中心監聽器

建議前置閱讀:

  • 《Elastic-Job-Lite 原始碼分析 —— 註冊中心》

涉及到主要類的類圖如下( 開啟大圖 ):

你行好事會因為得到贊賞而愉悅  
同理,開源專案貢獻者會因為 Star 而更加有動力  
為 Elastic-Job 點贊!傳送門

2. ListenerManager

ListenerManager,作業註冊中心的監聽器管理者。管理者兩類元件:

  • 監聽管理器

  • 註冊中心連線狀態監聽器

其中監聽管理器管理著自己的作業註冊中心監聽器。

一起從程式碼層面看看:

public final class ListenerManager {

    private final JobNodeStorage jobNodeStorage;

    private final ElectionListenerManager electionListenerManager;

    private final ShardingListenerManager shardingListenerManager;

    private final FailoverListenerManager failoverListenerManager;

    private final MonitorExecutionListenerManager monitorExecutionListenerManager;

    private final ShutdownListenerManager shutdownListenerManager;

    private final TriggerListenerManager triggerListenerManager;

    private final RescheduleListenerManager rescheduleListenerManager;

    private final GuaranteeListenerManager guaranteeListenerManager;

    private final RegistryCenterConnectionStateListener regCenterConnectionStateListener;
}
  • 第一類:electionListenerManager / shardingListenerManager / failoverListenerManager / MonitorExecutionListenerManager / shutdownListenerManager / triggerListenerManager / rescheduleListenerManager / guaranteeListenerManager 是不同服務的監聽管理器,都繼承作業註冊中心的監聽器管理者的抽象類( AbstractListenerManager )。我們以下一篇文章會涉及到的分片監聽管理器( ShardingListenerManager ) 來瞅瞅內部整體實現:

    public final class ShardingListenerManager extends AbstractListenerManager {
        @Override
        public void start() {
            addDataListener(new ShardingTotalCountChangedJobListener());
            addDataListener(new ListenServersChangedJobListener());
        }
    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
        // .... 省略方法
    }

    class ListenServersChangedJobListener extends AbstractJobListener {
        // .... 省略方法
    }

    }

    • ShardingListenerManager 內部管理了 ShardingTotalCountChangedJobListener / ListenServersChangedJobListener 兩個作業註冊中心監聽器。具體作業註冊中心監聽器是什麼,有什麼用途,下文會詳細解析。

  • 第二類:regCenterConnectionStateListener 是註冊中心連線狀態監聽器。下文也會詳細解析。

在《Elastic-Job-Lite 原始碼分析 —— 作業初始化》「3.2.4」註冊作業啟動資訊,我們看到作業初始化時,會開啟所有註冊中心監聽器:

// SchedulerFacade.java
/**
* 註冊作業啟動資訊.

@param enabled 作業是否啟用
*/

public void registerStartUpInfo(final boolean enabled) {
   // 開啟 所有監聽器
   listenerManager.startAllListeners();
   // .... 省略方法
}

// ListenerManager.java
/**
* 開啟所有監聽器.
*/

public void startAllListeners() {
   // 開啟 不同服務監聽管理器
   electionListenerManager.start();
   shardingListenerManager.start();
   failoverListenerManager.start();
   monitorExecutionListenerManager.start();
   shutdownListenerManager.start();
   triggerListenerManager.start();
   rescheduleListenerManager.start();
   guaranteeListenerManager.start();
   // 開啟 註冊中心連線狀態監聽器
   jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}

3. AbstractListenerManager

AbstractListenerManager,作業註冊中心的監聽器管理者的抽象類

public abstract class AbstractListenerManager {

    private final JobNodeStorage jobNodeStorage;

    protected AbstractListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) {
        jobNodeStorage = new JobNodeStorage(regCenter, jobName);
    }

    /**
     * 開啟監聽器.
     */

    public abstract void start();

    /**
     * 新增註冊中心監聽器
     *
     * @param listener 註冊中心監聽器
     */

    protected void addDataListener(final TreeCacheListener listener) {
        jobNodeStorage.addDataListener(listener);
    }
}
  • #addDataListener(),將作業註冊中心的監聽器新增到註冊中心 TreeCache 的監聽者裡。JobNodeStorage#addDataListener(…) 在《Elastic-Job-Lite 原始碼分析 —— 作業初始化》「2.2」快取已經詳細解析。

  • 子類實現 #start() 方法實現監聽器初始化。目前所有子類的實現都是將自己管理的註冊中心監聽器呼叫 #addDataListener(...),還是以 ShardingListenerManager 舉例子:

    public final class ShardingListenerManager extends AbstractListenerManager {
    @Override
    public void start() {
        addDataListener(new ShardingTotalCountChangedJobListener());
        addDataListener(new ListenServersChangedJobListener());
    }

    }

4. AbstractJobListener

AbstractJobListener,作業註冊中心的監聽器抽象類

public abstract class AbstractJobListener implements TreeCacheListener {

    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        // 忽略掉非資料變化的事件,例如 event.type 為 CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }

    /**
     * 節點資料變化
     *
     * @param path 節點路徑
     * @param eventType 事件型別
     * @param data 資料
     */

    protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
  • 作業註冊中心的監聽器實現類實現 #dataChanged(…),對節點資料變化進行處理。

  • #childEvent(…) 遮蔽掉非節點資料變化事件,例如:CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件,只處理 NODE_ADDED、NODE_UPDATED、NODE_REMOVED 事件。

我們再拿 ShardingListenerManager 舉例子:

public final class ShardingListenerManager extends AbstractListenerManager {

    class ShardingTotalCountChangedJobListener extends AbstractJobListener {

        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }

    class ListenServersChangedJobListener extends AbstractJobListener {

        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }

        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }

        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }

}
  • 在《Elastic-Job-Lite 原始碼解析 —— 任務分片》詳細解析。

5. RegistryCenterConnectionStateListener

RegistryCenterConnectionStateListener,實現 Curator ConnectionStateListener 介面,註冊中心連線狀態監聽器。

public final class RegistryCenterConnectionStateListener implements ConnectionStateListener {

    @Override
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
        if (JobRegistry.getInstance().isShutdown(jobName)) {
            return;
        }
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) { // Zookeeper 連線終端 或 連線丟失
            // 暫停作業排程
            jobScheduleController.pauseJob();
        } else if (ConnectionState.RECONNECTED == newState) { // Zookeeper 重新連上
            // 持久化作業伺服器上線資訊
            serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
            // 持久化作業執行實體上線相關資訊
            instanceService.persistOnline();
            // 清除本地分配的作業分片項執行中的標記
            executionService.clearRunningInfo(shardingService.getLocalShardingItems());
            // 恢復作業排程
            jobScheduleController.resumeJob();
        }
    }

}
  • 當註冊中心連線 SUSPENDED 或 LOST 時,暫停本地作業排程:

    // JobScheduleController.java
    public synchronized void pauseJob() {
       try {
           if (!scheduler.isShutdown()) {
               scheduler.pauseAll();
           }
       } catch (final SchedulerException ex) {
           throw new JobSystemException(ex);
       }
    }
  • 當註冊中心重新連線成功( RECONNECTED ),恢復本地作業排程:

    /**
    * 恢復作業.
    */

    public synchronized void resumeJob() {
      try {
          if (!scheduler.isShutdown()) {
              scheduler.resumeAll();
          }
      } catch (final SchedulerException ex) {
          throw new JobSystemException(ex);
      }
    }

666. 彩蛋

旁白君:芋道君,你又水更了!  
芋道君:是是是,是是是!

道友,趕緊上車,分享一波朋友圈!



如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢

目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing

… 一共 69+ 篇

目前在知識星球更新了《Netty 原始碼解析》目錄如下:

01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap

05. 事件輪詢 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 位元組緩衝區 ByteBuf

09. 通道處理器 ChannelHandler

10. 編解碼 Codec

11. 工具類 Util

… 一共 61+ 篇

目前在知識星球更新了《資料庫物體設計》目錄如下:


01. 商品模組
02. 交易模組
03. 營銷模組
04. 公用模組

… 一共 17+ 篇

原始碼不易↓↓↓

點贊支援老艿艿↓↓

贊(0)

分享創造快樂