歡迎光臨
每天分享高質量文章

分佈式作業 Elastic-Job-Lite 原始碼分析 —— 作業配置

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-config/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作業配置

  • 3. 作業配置服務

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作業配置

涉及到主要類的類圖如下( 打開大圖 ):

  • 黃色的類在 elastic-job-common-core 專案里,為 Elastic-Job-Lite、Elastic-Job-Cloud 公用作業配置類。

另外建議你已經( 非必須 ):

  • 閱讀過《官方文件 —— 配置手冊》

  • 運行過 JavaMain.java

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. 作業配置

一個作業( ElasticJob )的調度,需要配置獨有的一個作業調度器( JobScheduler ),兩者是 1 : 1 的關係。這點大家要註意下,當然下文看代碼也會看到。

作業調度器的創建可以配置四個引數:

  1. 註冊中心( CoordinatorRegistryCenter ):用於協調分佈式服務。必填

  2. Lite作業配置( LiteJobConfiguration ):必填

  3. 作業事件總線( JobEventBus ):對作業事件異步監聽。選填

  4. 作業監聽器( ElasticJobListener ):對作業執行前,執行後進行同步監聽。選填

2.1 註冊中心配置

Elastic-Job 抽象了註冊中心接口( RegistryCenter ),並提供了預設基於 Zookeeper 的註冊中心實現( ZookeeperRegistryCenter )

ZookeeperRegistryCenter 對應配置類為 ZookeeperConfiguration。該類註釋很完整,可以點擊鏈接直接查看原始碼,這裡我們重點說下 namespace 屬性。如果你有多個不同 Elastic-Job集群 時,使用相同 Zookeeper,可以配置不同的 namespace 進行隔離。

註冊中心的初始化,我們會在《Elastic-Job-Lite 原始碼解析 —— 註冊中心》詳細分享。

2.2 Lite作業配置

LiteJobConfiguration 繼承自接口 JobRootConfiguration,作為 Elastic-Job-Lite 里的作業( LiteJob )配置。Elastic-Job-Cloud 的作業( CloudJob )對應另外的配置類,也實現了該接口。

public final class LiteJobConfiguration implements JobRootConfiguration {

    private final JobTypeConfiguration typeConfig;
    private final boolean monitorExecution;
    private final int maxTimeDiffSeconds;
    private final int monitorPort;
    private final String jobShardingStrategyClass;
    private final int reconcileIntervalMinutes;
    private final boolean disabled;
    private final boolean overwrite;

    // .... 省略部分get方法

    public static class Builder {

        // .... 省略部分屬性

        public final LiteJobConfiguration build() {
            return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, reconcileIntervalMinutes, disabled, overwrite);
        }
    }
}
  • typeConfig:作業型別配置。必填

  • monitorExecution:監控作業運行時狀態。預設為 false。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業執行》詳細分享。

    每次作業執行時間和間隔時間均非常短的情況,建議不監控作業運行時狀態以提升效率。因為是瞬時狀態,所以無必要監控。請用戶自行增加資料堆積監控。並且不能保證資料重覆選取,應在作業中實現冪等性。 
    每次作業執行時間和間隔時間均較長的情況,建議監控作業運行時狀態,可保證資料不會重覆選取。

  • monitorPort:作業監控端口。預設為 -1,不開啟作業監控端口。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業監控服務》詳細分享。

    建議配置作業監控端口, 方便開發者dump作業信息。 
    使用方法: echo “dump” | nc 127.0.0.1 9888

  • maxTimeDiffSeconds:設置最大容忍的本機與註冊中心的時間誤差秒數。預設為 -1,不檢查時間誤差。選填。

  • jobShardingStrategyClass:作業分片策略實現類全路徑。預設為使用分配側路。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業分片策略》詳細分享。

  • reconcileIntervalMinutes:修複作業服務器不一致狀態服務調度間隔時間,配置為小於1的任意值表示不執行修複。預設為 10。在《Elastic-Job-Lite 原始碼解析 —— 自診斷修複 》詳細分享。

  • disabled:作業是否禁用執行。預設為 false。選填。

  • overwrite:設置使用本地作業配置改寫註冊中心的作業配置。預設為 false。選填。建議使用運維平臺( console )配置作業配置,統一管理。

  • Builder 類:使用該類配置 LiteJobConfiguration 屬性,呼叫 #build() 方法最終生成作業配置。參見:《JAVA設計樣式 — 生成器樣式(Builder)》。

2.2.1 作業型別配置

作業型別配置接口( JobTypeConfiguration ) 有三種配置實現,針對三種作業型別:

配置實現 作業 說明
SimpleJobConfiguration SimpleJob 簡單作業。例如:訂單過期作業
DataflowJobConfiguration DataflowJob 資料流作業。TODO:筆者暫時未瞭解流式處理資料,不誤人子弟
ScriptJobConfiguration ScriptJob 腳本作業。例如:呼叫 shell 腳本備份資料庫作業

三種配置類屬性對比如:

屬性 SimpleJob DataflowJob ScriptJob 說明
coreConfig 作業核心配置
jobType JobType.SIMPLE JobType.DATAFLOW JobType.SCRIPT 作業型別
jobClass √ (預設:ScriptJob.class) 作業實現類全路徑
streamingProcess 是否流式處理資料
scriptCommandLine 腳本型作業執行命令列

作業型別配置不僅僅適用於 Elastic-Job-Lite,也適用於 Elastic-Job-Cloud。

2.2.2 作業核心配置

作業核心配置( JobCoreConfiguration ),我們可以看到在每種作業型別配置都有該屬性( coreConfig )。

public final class JobCoreConfiguration {

    private final String jobName;
    private final String cron;
    private final int shardingTotalCount;
    private final String shardingItemParameters;
    private final String jobParameter;
    private final boolean failover;
    private final boolean misfire;
    private final String description;
    private final JobProperties jobProperties;

    public static class Builder {

        // .... 省略部分屬性

        public final JobCoreConfiguration build() {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
            Preconditions.checkArgument(!Strings.isNullOrEmpty(cron), "cron can not be empty.");
            Preconditions.checkArgument(shardingTotalCount > 0"shardingTotalCount should larger than zero.");
            return new JobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, description, jobProperties);
        }
    }
}
  • jobName:作業名稱。必填。

  • cron:cron運算式,用於控製作業觸發時間。必填。

  • shardingTotalCount:作業分片總數。如果一個作業啟動超過作業分片總數的節點,只有 shardingTotalCount會執行作業。必填。在《Elastic-Job-Lite 原始碼解析 —— 作業分片策略 》詳細分享。

  • shardingItemParameters:分片序列號和引數。選填。

    分片序列號和引數用等號分隔,多個鍵值對用逗號分隔 
    分片序列號從0開始,不可大於或等於作業分片總數 
    如: 
    0=a,1=b,2=c

  • jobParameter:作業自定義引數。選填。

    作業自定義引數,可通過傳遞該引數為作業調度的業務方法傳參,用於實現帶引數的作業 
    例:每次獲取的資料量、作業實體從資料庫讀取的主鍵等

  • failover:是否開啟作業執行失效轉移。開啟表示如果作業在一次作業執行中途宕機,允許將該次未完成的作業在另一作業節點上補償執行。預設為 false。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業失效轉移 》詳細分享。

  • misfire:是否開啟錯過作業重新執行。預設為 true。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業執行 》詳細分享。

  • description:作業描述。選填。

  • jobProperties:作業屬性配置。選填。在《Elastic-Job-Lite 原始碼解析 —— 作業執行 》詳細分享。

    public final class JobProperties {
    private EnumMap map = new EnumMap<>(JobPropertiesEnum.class);

       public enum JobPropertiesEnum {

        /**
         * 作業異常處理器.
         */

        JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),

        /**
         * 執行緒池服務處理器.
         */

        EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());

        private final String key;

        private final Class> classType;

        private final String defaultValue;

       }
    }

    • JOB_EXCEPTION_HANDLER:用於擴展異常處理類。

    • EXECUTOR_SERVICE_HANDLER:用於擴展作業處理執行緒池類。

    • 通過這個屬性,我們可以自定義每個作業的異常處理和執行緒池服務。

2.3 作業事件配置

通過作業事件配置( JobEventConfiguration ),實現對作業事件的異步監聽、處理。在《Elastic-Job-Lite 原始碼解析 —— 作業事件追蹤》詳細分享。

2.4 作業監聽器

通過配置作業監聽器( ElasticJobListener ),實現對作業執行的同步監聽、處理。在《Elastic-Job-Lite 原始碼解析 —— 作業監聽器》詳細分享。

3. 作業配置服務

多個 Elastic-Job-Lite 使用相同註冊中心和相同 namespace 組成集群,實現高可用。集群中,使用作業配置服務( ConfigurationService ) 共享作業配置。

public final class ConfigurationService {

    /**
     * 時間服務
     */

    private final TimeService timeService;
    /**
     * 作業節點資料訪問類
     */

    private final JobNodeStorage jobNodeStorage;

    public ConfigurationService(final CoordinatorRegistryCenter regCenter, final String jobName) {
        jobNodeStorage = new JobNodeStorage(regCenter, jobName);
        timeService = new TimeService();
    }
}
  • JobNodeStorage,封裝註冊中心,提供儲存服務。在《Elastic-Job-Lite 原始碼解析 —— 作業資料儲存》詳細分享。

  • TimeService,時間服務,提供當前時間查詢。

    public final class TimeService {
    /**
     * 獲取當前時間的毫秒數.
     * 
     * @return 當前時間的毫秒數
     */

    public long getCurrentMillis() {
        return System.currentTimeMillis();
    }

    }

3.1 讀取作業配置

/**
* 讀取作業配置.

@param fromCache 是否從快取中讀取
@return 作業配置
*/

public LiteJobConfiguration load(final boolean fromCache) {
   String result;
   if (fromCache) { // 快取
       result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
       if (null == result) {
           result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
       }
   } else {
       result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
   }
   return LiteJobConfigurationGsonFactory.fromJson(result);
}

3.2 持久化作業配置

/**
* 持久化分佈式作業配置信息.

@param liteJobConfig 作業配置
*/

public void persist(final LiteJobConfiguration liteJobConfig) {
   checkConflictJob(liteJobConfig);
   if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
       jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
   }
}
  • 呼叫 #checkConflictJob(...) 方法校驗註冊中心儲存的作業配置的作業實現類全路徑( jobClass )和當前的是否相同,如果不同,則認為是衝突,不允許儲存:

    private void checkConflictJob(final LiteJobConfiguration liteJobConfig) {
       Optional liteJobConfigFromZk = find();
       if (liteJobConfigFromZk.isPresent()
               && !liteJobConfigFromZk.get().getTypeConfig().getJobClass().equals(liteJobConfig.getTypeConfig().getJobClass())) { // jobClass 是否相同
           throw new JobConfigurationException("Job conflict with register center. The job '%s' in register center's class is '%s', your job class is '%s'"
                   liteJobConfig.getJobName(), liteJobConfigFromZk.get().getTypeConfig().getJobClass(), liteJobConfig.getTypeConfig().getJobClass());
       }
    }
  • 當註冊中心未儲存該作業配置 或者 當前作業配置允許替換註冊中心作業配置( overwrite = true )時,持久化作業配置。

3.3 校驗本機時間是否合法

/**
* 檢查本機與註冊中心的時間誤差秒數是否在允許範圍.

@throws JobExecutionEnvironmentException 本機與註冊中心的時間誤差秒數不在允許範圍所丟擲的異常
*/

public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
   int maxTimeDiffSeconds =  load(true).getMaxTimeDiffSeconds();
   if (-1  == maxTimeDiffSeconds) {
       return;
   }
   long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
   if (timeDiff > maxTimeDiffSeconds * 1000L) {
       throw new JobExecutionEnvironmentException(
               "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
   }
}
  • Elastic-Job-Lite 作業觸發是依賴本機時間,相同集群使用註冊中心時間為基準,校驗本機與註冊中心的時間誤差是否在允許範圍內( LiteJobConfiguration.maxTimeDiffSeconds )。

666. 彩蛋

Elastic-Job-Lite 原始碼解析系列第一篇文章,希望大家多多支持,預計全部更新完會有 15+ 篇。Elastic-Job-Cloud 原始碼系列後續也會更新。

道友,分享一波微信朋友圈支持支持支持,可好?




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂