歡迎光臨
每天分享高質量文章

分佈式做系統 Elastic-Job-Lite 原始碼分析 —— 作業初始化

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-init/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作業註冊表

  • 3. 作業調度器

    • 3.1 創建

    • 3.2 初始化

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作業初始化

涉及到主要類的類圖如下( 打開大圖 ):

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. 作業註冊表

作業註冊表( JobRegistry ),維護了單個 Elastic-Job-Lite 行程內作業相關信息,可以理解成其專屬的 Spring IOC 容器。因此,其本身是一個單例

public final class JobRegistry {

    /**
     * 單例
     */

    private static volatile JobRegistry instance;
    /**
     * 作業調度控制器集合
     * key:作業名稱
     */

    private Map schedulerMap = new ConcurrentHashMap<>();
    /**
     * 註冊中心集合
     * key:作業名稱
     */

    private Map regCenterMap = new ConcurrentHashMap<>();
    /**
     * 作業運行實體集合
     * key:作業名稱
     */

    private Map jobInstanceMap = new ConcurrentHashMap<>();
    /**
     * 運行中作業集合
     * key:作業名字
     */

    private Map jobRunningMap = new ConcurrentHashMap<>();
    /**
     * 作業總分片數量集合
     * key:作業名字
     */

    private Map currentShardingTotalCountMap = new ConcurrentHashMap<>();

    /**
     * 獲取作業註冊表實體.
     * 
     * @return 作業註冊表實體
     */

    public static JobRegistry getInstance() {
        if (null == instance) {
            synchronized (JobRegistry.class) {
                if (null == instance) {
                    instance = new JobRegistry();
                }
            }
        }
        return instance;
    }

    // .... 省略方法
}
  • instance 是一個單例,通過 #getInstance() 方法獲取該單例。該單例的創建方式為雙重檢驗鎖樣式

  • Map集合屬性全部作業名稱作為 KEY,通過作業名稱,可以獲得作業相關信息。

  • 省略的方法,下文在實際呼叫時,進行解析。

3. 作業調度器

作業調度器( JobScheduler ),創建並初始化後,進行作業調度。

Elastic-Job-Lite 使用 Quartz 作為調度內核。

3.1 創建

public class JobScheduler {
    /**
     * Lite作業配置
     */

    private final LiteJobConfiguration liteJobConfig;
    /**
     * 註冊中心
     */

    private final CoordinatorRegistryCenter regCenter;
    /**
     * 調度器門面物件
     */

    @Getter
    private final SchedulerFacade schedulerFacade;
    /**
     * 作業門面物件
     */

    private final JobFacade jobFacade;

    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
        this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
    }

    public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                        final ElasticJobListener... elasticJobListeners)
 
{
        this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
    }

    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 添加 作業運行實體
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        // 設置 Lite作業配置
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        // 設置 作業監聽器
        List elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        // 設置 調度器門面物件
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        // 設置 作業門面物件
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
}
  • 呼叫 #JobRegistry#addJobInstance() 方法添加作業運行實體( JobInstance )

    // JobRegistry.java
    /**
    * 作業運行實體集合
    * key:作業名稱
    */

    private Map jobInstanceMap = new ConcurrentHashMap<>();
    /**
    * 添加作業實體.
    *
    @param jobName 作業名稱
    @param jobInstance 作業實體
    */

    public void addJobInstance(final String jobName, final JobInstance jobInstance) {
       jobInstanceMap.put(jobName, jobInstance);
    }

    // JobInstance.java
    public final class JobInstance {
    private static final String DELIMITER = "@-@";

    /**
     * 作業實體主鍵.
     */

    private final String jobInstanceId;

    public JobInstance() {
        jobInstanceId = IpUtils.getIp()
                + DELIMITER
                + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID
    }

    }

    • jobInstanceId 格式:${IP}@-@${PID}。其中 PID 為行程編號。同一個 Elastic-Job-Lite 實體,不同的作業使用相同的作業實體主鍵。

  • 設置作業監聽器,在《Elastic-Job-Lite 原始碼解析 —— 作業監聽器》詳細分享。

  • SchedulerFacade,為調度器提供內部服務的門面類。

    public final class SchedulerFacade {
    /**
     * 作業名稱
     */

    private final String jobName;
    /**
     * 作業配置服務
     */

    private final ConfigurationService configService;
    /**
     * 作業分片服務
     */

    private final ShardingService shardingService;
    /**
     * 主節點服務
     */

    private final LeaderService leaderService;
    /**
     * 作業服務器服務
     */

    private final ServerService serverService;
    /**
     * 作業運行實體服務
     */

    private final InstanceService instanceService;
    /**
     * 執行作業服務
     */

    private final ExecutionService executionService;
    /**
     * 作業監控服務
     */

    private final MonitorService monitorService;
    /**
     * 調解作業不一致狀態服務
     */

    private final ReconcileService reconcileService;
    /**
     * 作業註冊中心的監聽器管理者
     */

    private ListenerManager listenerManager;

    public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List elasticJobListeners) {
        this.jobName = jobName;
        // .... 省略 new XXXXX() 物件
    }

  • LiteJobFacade,為作業提供內部服務的門面類。

    public final class LiteJobFacade implements JobFacade {
        /**
         * 作業配置服務
         */

        private final ConfigurationService configService;
        /**
         * 作業分片服務
         */

        private final ShardingService shardingService;
        /**
         * 執行作業服務
         */

        private final ExecutionService executionService;
        /**
         * 作業運行時背景關係服務
         */

        private final ExecutionContextService executionContextService;
        /**
         * 作業失效轉移服務
         */

        private final FailoverService failoverService;
        /**
         * 作業監聽器陣列
         */

        private final List elasticJobListeners;
        /**
         * 作業事件總線
         */

        private final JobEventBus jobEventBus;
    public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List elasticJobListeners, final JobEventBus jobEventBus) {
        // .... 省略 new XXXXX() 物件
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }

    }

SchedulerFacade 和 LiteJobFacade,看起來很相近,實際差別很大。它們分別為調度器、作業提供需要的方法。下文也會體現這一特點。

3.2 初始化

作業調度器創建後,呼叫 #init() 方法初始化,作業方開始調度。

/**
* 初始化作業.
*/

public void init() {
   // 更新 作業配置
   LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
   // 設置 當前作業分片總數
   JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
   // 創建 作業調度控制器
   JobScheduleController jobScheduleController = new JobScheduleController(
           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
   // 添加 作業調度控制器
   JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
   // 註冊 作業啟動信息
   schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
   // 調度作業
   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

3.2.1 更新作業配置

// SchedulerFacade.java
/**
* 更新作業配置.
*
@param liteJobConfig 作業配置
@return 更新後的作業配置
*/

public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
   // 更新 作業配置
   configService.persist(liteJobConfig);
   // 讀取 作業配置
   return configService.load(false);
}
  • 從《Elastic-Job 原始碼分析 —— 作業配置》的「3.2 持久化作業配置」,呼叫 ConfigService#persist(…) 方法也不一定會更新作業配置,因此呼叫 ConfigService#load(…) 方法傳回的可能是本地的作業配置,也可能是註冊中心儲存的作業配置。

3.2.2 設置當前作業分片總數

// JobRegistry.java
private Map currentShardingTotalCountMap = new ConcurrentHashMap<>();
/**
* 設置當前分片總數.
*
@param jobName 作業名稱
@param currentShardingTotalCount 當前分片總數
*/

public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
   currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
}

3.2.3 創建作業調度控制器

public void init() {
   // .... 省略
   // 創建 作業調度控制器
   JobScheduleController jobScheduleController = new JobScheduleController(
           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
   // .... 省略
}
  • JobScheduleController,作業調度控制器,提供對 Quartz 方法的封裝:

    public final class JobScheduleController {
    /**
     * Quartz 調度器
     */

    private final Scheduler scheduler;
    /**
     * 作業信息
     */

    private final JobDetail jobDetail;
    /**
     * 觸發器編號
     * 目前使用工作名字( jobName )
     */

    private final String triggerIdentity;

    public void scheduleJob(final String cron) {} // 調度作業
    public synchronized void rescheduleJob(final String cron) {} // 重新調度作業
    private CronTrigger createTrigger(final String cron) {} // 創建觸發器
    public synchronized boolean isPaused() {} // 判斷作業是否暫停
    public synchronized void pauseJob() {} // 暫停作業
    public synchronized void resumeJob() {} // 恢復作業
    public synchronized void triggerJob() {} // 立刻啟動作業
    public synchronized void shutdown() {} // 關閉調度器

    }

  • 呼叫 #createScheduler() 方法創建 Quartz 調度器:

    // JobScheduler.java
    private Scheduler createScheduler() {
       Scheduler result;
       try {
           StdSchedulerFactory factory = new StdSchedulerFactory();
           factory.initialize(getBaseQuartzProperties());
           result = factory.getScheduler();
           result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
       } catch (final SchedulerException ex) {
           throw new JobSystemException(ex);
       }
       return result;
    }

    private Properties getBaseQuartzProperties() {
       Properties result = new Properties();
       result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
       result.put("org.quartz.threadPool.threadCount""1"); // Quartz 執行緒數:1
       result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
       result.put("org.quartz.jobStore.misfireThreshold""1");
       result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName()); // 作業關閉鉤子
       result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString()); // 關閉時,清理所有資源
       return result;
    }
    • org.quartz.threadPool.threadCount = 1,即 Quartz 執行作業執行緒數量為 1。原因:一個作業( ElasticJob )的調度,需要配置獨有的一個作業調度器( JobScheduler ),兩者是 1 : 1 的關係。

    • org.quartz.plugin.shutdownhook.class 設置作業優雅關閉鉤子:JobShutdownHookPlugin。

    • 觸發器監聽器( TriggerListener ),在《Elastic-Job-Lite 原始碼解析 —— 作業執行》詳細分享。

  • 呼叫 #createJobDetail() 方法創建 Quartz 作業:

    // JobScheduler.java
    private JobDetail createJobDetail(final String jobClass) {
       // 創建 Quartz 作業
       JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
       //
       result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
       // 創建 Elastic-Job 物件
       Optional elasticJobInstance = createElasticJobInstance();
       if (elasticJobInstance.isPresent()) {
           result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
       } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
           try {
               result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
           } catch (final ReflectiveOperationException ex) {
               throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
           }
       }
       return result;
    }

    protected Optional createElasticJobInstance() {
       return Optional.absent();
    }

    // SpringJobScheduler.java
    @Override
    protected Optional createElasticJobInstance() {
       return Optional.fromNullable(elasticJob);
    }
    • 創建 Quartz 作業設置了 LiteJob 類,這樣 Quartz 觸發作業執行時,LiteJob 會去呼叫 Elastic-Job 作業物件。在《Elastic-Job-Lite 原始碼解析 —— 作業執行》詳細分享。

    • 在 Spring 里,Elastic-Job 如果已經創建好註入到 SpringJobScheduler,無需進行創建。

    • Jodetail.jobDataMap 屬性里添加了作業門面物件( LiteJobFacade )、Elastic-Job 物件,Quartz 觸發作業時,會設置到 LiteJob 物件里。

3.2.4 註冊作業啟動信息

/**
* 註冊作業啟動信息.

@param enabled 作業是否啟用
*/

public void registerStartUpInfo(final boolean enabled) {
   // 開啟 所有監聽器
   listenerManager.startAllListeners();
   // 選舉 主節點
   leaderService.electLeader();
   // 持久化 作業服務器上線信息
   serverService.persistOnline(enabled);
   // 持久化 作業運行實體上線相關信息
   instanceService.persistOnline();
   // 設置 需要重新分片的標記
   shardingService.setReshardingFlag();
   // 初始化 作業監聽服務
   monitorService.listen();
   // 初始化 調解作業不一致狀態服務
   if (!reconcileService.isRunning()) {
       reconcileService.startAsync();
   }
}
  • 開啟所有監聽器。每個功能模塊都有其相應的監聽器,在模塊對應「文章」詳細分享。

  • 選舉主節點,在《Elastic-Job-Lite 原始碼解析 —— 主節點選舉》詳細分享。

  • 呼叫 ServerService#persistOnline() 方法,持久化作業服務器上線信息。

    public final class ServerService {
        /**
         * 持久化作業服務器上線信息.
         * 
         * @param enabled 作業是否啟用
         */

        public void persistOnline(final boolean enabled) {
            if (!JobRegistry.getInstance().isShutdown(jobName)) {
                jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
            }
        }
    }
    • 當作業配置設置作業禁用時( LiteJobConfiguration.disabled = true ),作業調度但調度作業分片為空。不太好理解?《Elastic-Job-Lite 原始碼解析 —— 作業分片》詳細分享。

  • 呼叫 InstanceService#persistOnline() 方法,持久化作業運行實體上線相關信息:

    public final class InstanceService {
        /**
         * 持久化作業運行實體上線相關信息.
         */

        public void persistOnline() {
            jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
        }
    }
  • 設置需要重新分片的標記,在《Elastic-Job-Lite 原始碼解析 —— 作業分片》詳細分享。

  • 初始化作業監聽服務,在《Elastic-Job-Lite 原始碼解析 —— 作業監控服務》詳細分享。

  • 初始化調解作業不一致狀態服務,在《Elastic-Job-Lite 原始碼解析 —— 自診斷修複》詳細分享。

3.2.5 調度作業

// JobScheduler.java
public void init() {
   // .... 省略部分代碼
   // 調度作業
   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

// JobScheduleController.java
/**
* 調度作業.
*
@param cron CRON運算式
*/

public void scheduleJob(final String cron) {
   try {
       if (!scheduler.checkExists(jobDetail.getKey())) {
           scheduler.scheduleJob(jobDetail, createTrigger(cron));
       }
       scheduler.start();
   } catch (final SchedulerException ex) {
       throw new JobSystemException(ex);
   }
}
  • 呼叫 #scheduleJob() 方法後,該 Elastic-Job 作業開始被調度。

666. 彩蛋

作業初始化,如果你對 Quartz 不是特別瞭解,可以再看 Quartz 再重新理解。

下一篇,《Elastic-Job-Lite 原始碼解析 —— 作業執行》 起航!

道友,分享一波微信朋友圈支持支持支持,可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂