歡迎光臨
每天分享高質量文章

分散式做系統 Elastic-Job-Lite 原始碼分析 —— 作業初始化

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-init/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作業登錄檔

  • 3. 作業排程器

    • 3.1 建立

    • 3.2 初始化

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作業初始化

涉及到主要類的類圖如下( 開啟大圖 ):

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. 作業登錄檔

作業登錄檔( JobRegistry ),維護了單個 Elastic-Job-Lite 行程內作業相關資訊,可以理解成其專屬的 Spring IOC 容器。因此,其本身是一個單例

public final class JobRegistry {

    /**
     * 單例
     */

    private static volatile JobRegistry instance;
    /**
     * 作業排程控制器集合
     * key:作業名稱
     */

    private Map schedulerMap = new ConcurrentHashMap<>();
    /**
     * 註冊中心集合
     * key:作業名稱
     */

    private Map regCenterMap = new ConcurrentHashMap<>();
    /**
     * 作業執行實體集合
     * key:作業名稱
     */

    private Map jobInstanceMap = new ConcurrentHashMap<>();
    /**
     * 執行中作業集合
     * key:作業名字
     */

    private Map jobRunningMap = new ConcurrentHashMap<>();
    /**
     * 作業總分片數量集合
     * key:作業名字
     */

    private Map currentShardingTotalCountMap = new ConcurrentHashMap<>();

    /**
     * 獲取作業登錄檔實體.
     * 
     * @return 作業登錄檔實體
     */

    public static JobRegistry getInstance() {
        if (null == instance) {
            synchronized (JobRegistry.class) {
                if (null == instance) {
                    instance = new JobRegistry();
                }
            }
        }
        return instance;
    }

    // .... 省略方法
}
  • instance 是一個單例,透過 #getInstance() 方法獲取該單例。該單例的建立方式為雙重檢驗鎖樣式

  • Map集合屬性全部作業名稱作為 KEY,透過作業名稱,可以獲得作業相關資訊。

  • 省略的方法,下文在實際呼叫時,進行解析。

3. 作業排程器

作業排程器( JobScheduler ),建立並初始化後,進行作業排程。

Elastic-Job-Lite 使用 Quartz 作為排程核心。

3.1 建立

public class JobScheduler {
    /**
     * Lite作業配置
     */

    private final LiteJobConfiguration liteJobConfig;
    /**
     * 註冊中心
     */

    private final CoordinatorRegistryCenter regCenter;
    /**
     * 排程器門面物件
     */

    @Getter
    private final SchedulerFacade schedulerFacade;
    /**
     * 作業門面物件
     */

    private final JobFacade jobFacade;

    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
    }

    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                        final ElasticJobListener... elasticJobListeners)
 
{
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }

    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 新增 作業執行實體
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        // 設定 Lite作業配置
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        // 設定 作業監聽器
        List elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        // 設定 排程器門面物件
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        // 設定 作業門面物件
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
}
  • 呼叫 #JobRegistry#addJobInstance() 方法添加作業執行實體( JobInstance )

    // JobRegistry.java
    /**
    * 作業執行實體集合
    * key:作業名稱
    */

    private Map jobInstanceMap = new ConcurrentHashMap<>();
    /**
    * 新增作業實體.
    *
    @param jobName 作業名稱
    @param jobInstance 作業實體
    */

    public void addJobInstance(final String jobName, final JobInstance jobInstance) {
       jobInstanceMap.put(jobName, jobInstance);
    }

    // JobInstance.java
    public final class JobInstance {
    private static final String DELIMITER = "@-@";

    /**
     * 作業實體主鍵.
     */

    private final String jobInstanceId;

    public JobInstance() {
        jobInstanceId = IpUtils.getIp()
                + DELIMITER
                + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID
    }

    }

    • jobInstanceId 格式:${IP}@-@${PID}。其中 PID 為行程編號。同一個 Elastic-Job-Lite 實體,不同的作業使用相同的作業實體主鍵。

  • 設定作業監聽器,在《Elastic-Job-Lite 原始碼解析 —— 作業監聽器》詳細分享。

  • SchedulerFacade,為排程器提供內部服務的門面類。

    public final class SchedulerFacade {
    /**
     * 作業名稱
     */

    private final String jobName;
    /**
     * 作業配置服務
     */

    private final ConfigurationService configService;
    /**
     * 作業分片服務
     */

    private final ShardingService shardingService;
    /**
     * 主節點服務
     */

    private final LeaderService leaderService;
    /**
     * 作業伺服器服務
     */

    private final ServerService serverService;
    /**
     * 作業執行實體服務
     */

    private final InstanceService instanceService;
    /**
     * 執行作業服務
     */

    private final ExecutionService executionService;
    /**
     * 作業監控服務
     */

    private final MonitorService monitorService;
    /**
     * 調解作業不一致狀態服務
     */

    private final ReconcileService reconcileService;
    /**
     * 作業註冊中心的監聽器管理者
     */

    private ListenerManager listenerManager;

    public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List elasticJobListeners) {
        this.jobName = jobName;
        // .... 省略 new XXXXX() 物件
    }

  • LiteJobFacade,為作業提供內部服務的門面類。

    public final class LiteJobFacade implements JobFacade {
        /**
         * 作業配置服務
         */

        private final ConfigurationService configService;
        /**
         * 作業分片服務
         */

        private final ShardingService shardingService;
        /**
         * 執行作業服務
         */

        private final ExecutionService executionService;
        /**
         * 作業執行時背景關係服務
         */

        private final ExecutionContextService executionContextService;
        /**
         * 作業失效轉移服務
         */

        private final FailoverService failoverService;
        /**
         * 作業監聽器陣列
         */

        private final List elasticJobListeners;
        /**
         * 作業事件匯流排
         */

        private final JobEventBus jobEventBus;
    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List elasticJobListeners, final JobEventBus jobEventBus) {
        // .... 省略 new XXXXX() 物件
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }

    }

SchedulerFacade 和 LiteJobFacade,看起來很相近,實際差別很大。它們分別為排程器、作業提供需要的方法。下文也會體現這一特點。

3.2 初始化

作業排程器建立後,呼叫 #init() 方法初始化,作業方開始排程。

/**
* 初始化作業.
*/

public void init() {
   // 更新 作業配置
   LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
   // 設定 當前作業分片總數
   JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
   // 建立 作業排程控制器
   JobScheduleController jobScheduleController = new JobScheduleController(
           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
   // 新增 作業排程控制器
   JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
   // 註冊 作業啟動資訊
   schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
   // 排程作業
   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

3.2.1 更新作業配置

// SchedulerFacade.java
/**
* 更新作業配置.
*
@param liteJobConfig 作業配置
@return 更新後的作業配置
*/

public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
   // 更新 作業配置
   configService.persist(liteJobConfig);
   // 讀取 作業配置
   return configService.load(false);
}
  • 從《Elastic-Job 原始碼分析 —— 作業配置》的「3.2 持久化作業配置」,呼叫 ConfigService#persist(…) 方法也不一定會更新作業配置,因此呼叫 ConfigService#load(…) 方法傳回的可能是本地的作業配置,也可能是註冊中心儲存的作業配置。

3.2.2 設定當前作業分片總數

// JobRegistry.java
private Map currentShardingTotalCountMap = new ConcurrentHashMap<>();
/**
* 設定當前分片總數.
*
@param jobName 作業名稱
@param currentShardingTotalCount 當前分片總數
*/

public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
   currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
}

3.2.3 建立作業排程控制器

public void init() {
   // .... 省略
   // 建立 作業排程控制器
   JobScheduleController jobScheduleController = new JobScheduleController(
           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
   // .... 省略
}
  • JobScheduleController,作業排程控制器,提供對 Quartz 方法的封裝:

    public final class JobScheduleController {
    /**
     * Quartz 排程器
     */

    private final Scheduler scheduler;
    /**
     * 作業資訊
     */

    private final JobDetail jobDetail;
    /**
     * 觸發器編號
     * 目前使用工作名字( jobName )
     */

    private final String triggerIdentity;

    public void scheduleJob(final String cron) {} // 排程作業
    public synchronized void rescheduleJob(final String cron) {} // 重新排程作業
    private CronTrigger createTrigger(final String cron) {} // 建立觸發器
    public synchronized boolean isPaused() {} // 判斷作業是否暫停
    public synchronized void pauseJob() {} // 暫停作業
    public synchronized void resumeJob() {} // 恢復作業
    public synchronized void triggerJob() {} // 立刻啟動作業
    public synchronized void shutdown() {} // 關閉排程器

    }

  • 呼叫 #createScheduler() 方法建立 Quartz 排程器:

    // JobScheduler.java
    private Scheduler createScheduler() {
       Scheduler result;
       try {
           StdSchedulerFactory factory = new StdSchedulerFactory();
           factory.initialize(getBaseQuartzProperties());
           result = factory.getScheduler();
           result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
       } catch (final SchedulerException ex) {
           throw new JobSystemException(ex);
       }
       return result;
    }

    private Properties getBaseQuartzProperties() {
       Properties result = new Properties();
       result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
       result.put("org.quartz.threadPool.threadCount""1"); // Quartz 執行緒數:1
       result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
       result.put("org.quartz.jobStore.misfireThreshold""1");
       result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName()); // 作業關閉鉤子
       result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString()); // 關閉時,清理所有資源
       return result;
    }
    • org.quartz.threadPool.threadCount = 1,即 Quartz 執行作業執行緒數量為 1。原因:一個作業( ElasticJob )的排程,需要配置獨有的一個作業排程器( JobScheduler ),兩者是 1 : 1 的關係。

    • org.quartz.plugin.shutdownhook.class 設定作業優雅關閉鉤子:JobShutdownHookPlugin。

    • 觸發器監聽器( TriggerListener ),在《Elastic-Job-Lite 原始碼解析 —— 作業執行》詳細分享。

  • 呼叫 #createJobDetail() 方法建立 Quartz 作業:

    // JobScheduler.java
    private JobDetail createJobDetail(final String jobClass) {
       // 建立 Quartz 作業
       JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
       //
       result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
       // 建立 Elastic-Job 物件
       Optional elasticJobInstance = createElasticJobInstance();
       if (elasticJobInstance.isPresent()) {
           result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
       } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
           try {
               result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
           } catch (final ReflectiveOperationException ex) {
               throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
           }
       }
       return result;
    }

    protected Optional createElasticJobInstance() {
       return Optional.absent();
    }

    // SpringJobScheduler.java
    @Override
    protected Optional createElasticJobInstance() {
       return Optional.fromNullable(elasticJob);
    }
    • 建立 Quartz 作業設定了 LiteJob 類,這樣 Quartz 觸發作業執行時,LiteJob 會去呼叫 Elastic-Job 作業物件。在《Elastic-Job-Lite 原始碼解析 —— 作業執行》詳細分享。

    • 在 Spring 裡,Elastic-Job 如果已經建立好註入到 SpringJobScheduler,無需進行建立。

    • Jodetail.jobDataMap 屬性裡添加了作業門面物件( LiteJobFacade )、Elastic-Job 物件,Quartz 觸發作業時,會設定到 LiteJob 物件裡。

3.2.4 註冊作業啟動資訊

/**
* 註冊作業啟動資訊.

@param enabled 作業是否啟用
*/

public void registerStartUpInfo(final boolean enabled) {
   // 開啟 所有監聽器
   listenerManager.startAllListeners();
   // 選舉 主節點
   leaderService.electLeader();
   // 持久化 作業伺服器上線資訊
   serverService.persistOnline(enabled);
   // 持久化 作業執行實體上線相關資訊
   instanceService.persistOnline();
   // 設定 需要重新分片的標記
   shardingService.setReshardingFlag();
   // 初始化 作業監聽服務
   monitorService.listen();
   // 初始化 調解作業不一致狀態服務
   if (!reconcileService.isRunning()) {
       reconcileService.startAsync();
   }
}
  • 開啟所有監聽器。每個功能模組都有其相應的監聽器,在模組對應「文章」詳細分享。

  • 選舉主節點,在《Elastic-Job-Lite 原始碼解析 —— 主節點選舉》詳細分享。

  • 呼叫 ServerService#persistOnline() 方法,持久化作業伺服器上線資訊。

    public final class ServerService {
        /**
         * 持久化作業伺服器上線資訊.
         * 
         * @param enabled 作業是否啟用
         */

        public void persistOnline(final boolean enabled) {
            if (!JobRegistry.getInstance().isShutdown(jobName)) {
                jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
            }
        }
    }
    • 當作業配置設定作業禁用時( LiteJobConfiguration.disabled = true ),作業排程但排程作業分片為空。不太好理解?《Elastic-Job-Lite 原始碼解析 —— 作業分片》詳細分享。

  • 呼叫 InstanceService#persistOnline() 方法,持久化作業執行實體上線相關資訊:

    public final class InstanceService {
        /**
         * 持久化作業執行實體上線相關資訊.
         */

        public void persistOnline() {
            jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
        }
    }
  • 設定需要重新分片的標記,在《Elastic-Job-Lite 原始碼解析 —— 作業分片》詳細分享。

  • 初始化作業監聽服務,在《Elastic-Job-Lite 原始碼解析 —— 作業監控服務》詳細分享。

  • 初始化調解作業不一致狀態服務,在《Elastic-Job-Lite 原始碼解析 —— 自診斷修複》詳細分享。

3.2.5 排程作業

// JobScheduler.java
public void init() {
   // .... 省略部分程式碼
   // 排程作業
   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

// JobScheduleController.java
/**
* 排程作業.
*
@param cron CRON運算式
*/

public void scheduleJob(final String cron) {
   try {
       if (!scheduler.checkExists(jobDetail.getKey())) {
           scheduler.scheduleJob(jobDetail, createTrigger(cron));
       }
       scheduler.start();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}
  • 呼叫 #scheduleJob() 方法後,該 Elastic-Job 作業開始被排程。

666. 彩蛋

作業初始化,如果你對 Quartz 不是特別瞭解,可以再看 Quartz 再重新理解。

下一篇,《Elastic-Job-Lite 原始碼解析 —— 作業執行》 起航!

道友,分享一波微信朋友圈支援支援支援,可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支援老艿艿↓↓

贊(0)

分享創造快樂