歡迎光臨
每天分享高質量文章

分佈式作業系統 Elastic-Job-Lite 原始碼分析 —— 作業執行

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-execute/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. Lite調度作業

  • 3. 執行器創建

  • 4. 執行器執行

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作業執行

涉及到主要類的類圖如下( 打開大圖 ):

  • 黃色的類在 elastic-job-common-core 專案里,為 Elastic-Job-Lite、Elastic-Job-Cloud 公用作業執行類。

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. Lite調度作業

Lite調度作業( LiteJob ),作業被調度後,呼叫 #execute() 執行作業。

為什麼是 LiteJob 作為入口呢?

在《Elastic-Job-Lite 原始碼分析 —— 作業初始化》的「3.2.3」創建作業調度控制器里,我們可以看到 Quartz 的 JobDetail 創建代碼如下:

JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();

#newJob() 里的引數是 LiteJob,因此,每次 Quartz 到達調度時間時,會創建該物件進行作業執行。


public final class LiteJob implements Job {

    @Setter
    private ElasticJob elasticJob;

    @Setter
    private JobFacade jobFacade;

    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}
  • LiteJob 通過 JobExecutorFactory 獲得到作業執行器( AbstractElasticJobExecutor ),併進行執行:

    public final class JobExecutorFactory {
    /**
     * 獲取作業執行器.
     *
     * @param elasticJob 分佈式彈性作業
     * @param jobFacade 作業內部服務門面服務
     * @return 作業執行器
     */

    @SuppressWarnings("unchecked")
    public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
        // ScriptJob
        if (null == elasticJob) {
            return new ScriptJobExecutor(jobFacade);
        }
        // SimpleJob
        if (elasticJob instanceof SimpleJob) {
            return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
        }
        // DataflowJob
        if (elasticJob instanceof DataflowJob) {
            return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
        }
        throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
    }

    }

    • JobExecutorFactory,作業執行器工廠,根據不同的作業型別,傳回對應的作業執行器

作業 作業接口 執行器
簡單作業 SimpleJob SimpleJobExecutor
資料流作業 DataflowJob DataflowJobExecutor
腳本作業 ScriptJob ScriptJobExecutor

3. 執行器創建

AbstractElasticJobExecutor,作業執行器抽象類。不同作業執行器都繼承該類,創建的過程是一致的。

// AbstractElasticJobExecutor.java
public abstract class AbstractElasticJobExecutor {

    /**
     * 作業門面物件
     */

    @Getter(AccessLevel.PROTECTED)
    private final JobFacade jobFacade;
    /**
     * 作業配置
     */

    @Getter(AccessLevel.PROTECTED)
    private final JobRootConfiguration jobRootConfig;
    /**
     * 作業名稱
     */

    private final String jobName;
    /**
     * 作業執行執行緒池
     */

    private final ExecutorService executorService;
    /**
     * 作業異常處理器
     */

    private final JobExceptionHandler jobExceptionHandler;
    /**
     * 分片錯誤信息集合
     * key:分片序號
     */

    private final Map itemErrorMessages;

    protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
        this.jobFacade = jobFacade;
        // 加載 作業配置
        jobRootConfig = jobFacade.loadJobRootConfiguration(true);
        jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
        // 獲取 作業執行執行緒池
        executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
        // 獲取 作業異常處理器
        jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
        // 設置 分片錯誤信息集合
        itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
    }
}

// SimpleJobExecutor.java
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {

    /**
     * 簡單作業實現
     */

    private final SimpleJob simpleJob;

    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
}

// DataflowJobExecutor.java
public final class DataflowJobExecutor extends AbstractElasticJobExecutor {

    /**
     * 資料流作業物件
     */

    private final DataflowJob dataflowJob;

    public DataflowJobExecutor(final DataflowJob dataflowJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.dataflowJob = dataflowJob;
    }
}

// ScriptJobExecutor.java
public final class ScriptJobExecutor extends AbstractElasticJobExecutor {

    public ScriptJobExecutor(final JobFacade jobFacade) {
        super(jobFacade);
    }
}

3.1 加載作業配置

快取中讀取作業配置。在《Elastic-Job-Lite 原始碼分析 —— 作業配置》的「3.1」讀取作業配置 已經解析。

3.2 獲取作業執行執行緒池

作業每次執行時,可能分配到多個分片項,需要使用執行緒池實現並行執行。考慮到不同作業之間的隔離性,通過一個作業一個執行緒池實現。執行緒池服務處理器註冊表( ExecutorServiceHandlerRegistry ) 獲取作業執行緒池( #getExecutorServiceHandler(....) )代碼如下:

public final class ExecutorServiceHandlerRegistry {

    /**
     * 執行緒池集合
     * key:作業名字
     */

    private static final Map REGISTRY = new HashMap<>();

    /**
     * 獲取執行緒池服務.
     * 
     * @param jobName 作業名稱
     * @param executorServiceHandler 執行緒池服務處理器
     * @return 執行緒池服務
     */

    public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) {
        if (!REGISTRY.containsKey(jobName)) {
            REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName));
        }
        return REGISTRY.get(jobName);
    }
}

ExecutorServiceHandlerRegistry 使用 ExecutorServiceHandler 創建執行緒池。ExecutorServiceHandler 本身是個接口,預設使用 DefaultExecutorServiceHandler 實現:

// ExecutorServiceHandler.java
public interface ExecutorServiceHandler {

    /**
     * 創建執行緒池服務物件.
     * 
     * @param jobName 作業名
     * 
     * @return 執行緒池服務物件
     */

    ExecutorService createExecutorService(final String jobName);
}

// DefaultExecutorServiceHandler.java
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler {

    @Override
    public ExecutorService createExecutorService(final String jobName) {
        return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
    }
}
  • 呼叫 ExecutorServiceObject 的 #createExecutorService(....) 方法創建執行緒池:

    public final class ExecutorServiceObject {
    private final ThreadPoolExecutor threadPoolExecutor;
    private final BlockingQueue workQueue;

    public ExecutorServiceObject(final String namingPattern, final int threadSize) {
        workQueue = new LinkedBlockingQueue<>();
        threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue, 
                new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }

    /**
     * 創建執行緒池服務物件.
     *
     * @return 執行緒池服務物件
     */

    public ExecutorService createExecutorService() {
        return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
    }

    }

    service.shutdown();
    service.awaitTermination(terminationTimeout, timeUnit);
    • MoreExecutors#listeningDecorator(…) 在《Sharding-JDBC 原始碼分析 —— SQL 執行》 已經解析。

    • MoreExecutors#getExitingExecutorService(…) 方法邏輯:將 ThreadPoolExecutor 轉換成 ExecutorService,並增加 JVM 關閉鉤子,實現 120s 等待任務完成:

如何實現自定義 ExecutorServiceHandler ?

先看下 AbstractElasticJobExecutor 是如何獲得每個作業的 ExecutorServiceHandler :

// AbstractElasticJobExecutor.java
/**
* 獲得【自定義】處理器
*
@param jobPropertiesEnum 作業屬性列舉
@return 處理器
*/

private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
   String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
   try {
       Class> handlerClass = Class.forName(handlerClassName);
       if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) { // 必須是接口實現,才使用【自定義】
           return handlerClass.newInstance();
       }
       return getDefaultHandler(jobPropertiesEnum, handlerClassName);
   } catch (final ReflectiveOperationException ex) {
       return getDefaultHandler(jobPropertiesEnum, handlerClassName);
   }
}

/**
* 獲得【預設】處理器
*
@param jobPropertiesEnum 作業屬性列舉
@param handlerClassName 處理器類名
@return 處理器
*/

private Object getDefaultHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum, final String handlerClassName) {
   log.warn("Cannot instantiation class '{}', use default '{}' class.", handlerClassName, jobPropertiesEnum.getKey());
   try {
       return Class.forName(jobPropertiesEnum.getDefaultValue()).newInstance();
   } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
       throw new JobSystemException(e);
   }
}
  • 每個處理器都會對應一個 JobPropertiesEnum,使用列舉獲得處理器。優先從 JobProperties.map 獲取自定義的處理器實現類,如果不符合條件( 未實現正確接口 或者 創建處理器失敗 ),使用預設的處理器實現。

  • 每個作業可以配置不同的處理器,在《Elastic-Job-Lite 原始碼分析 —— 作業配置》的「2.2.2」作業核心配置已經解析。

3.3 獲取作業異常執行器

獲取作業異常執行器( JobExceptionHandler )和 ExecutorServiceHandler( ExecutorServiceHandler )相同

// ExecutorServiceHandler.java
public interface JobExceptionHandler {

    /**
     * 處理作業異常.
     * 
     * @param jobName 作業名稱
     * @param cause 異常原因
     */

    void handleException(String jobName, Throwable cause);
}

// DefaultJobExceptionHandler.java
public final class DefaultJobExceptionHandler implements JobExceptionHandler {

    @Override
    public void handleException(final String jobName, final Throwable cause) {
        log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
    }
}
  • 預設實現 DefaultJobExceptionHandler 打印異常日誌,不會丟擲異常

4. 執行器執行

執行邏輯主流程如下圖( 打開大圖 ):

// AbstractElasticJobExecutor.java
public final void execute() {
   // 檢查 作業執行環境
   try {
       jobFacade.checkJobExecutionEnvironment();
   } catch (final JobExecutionEnvironmentException cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
   // 獲取 當前作業服務器的分片背景關係
   ShardingContexts shardingContexts = jobFacade.getShardingContexts();
   // 發佈作業狀態追蹤事件(State.TASK_STAGING)
   if (shardingContexts.isAllowSendJobEvent()) {
       jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
   }
   // 跳過 存在運行中的被錯過作業
   if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
       // 發佈作業狀態追蹤事件(State.TASK_FINISHED)
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                   "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                   shardingContexts.getShardingItemParameters().keySet()));
       }
       return;
   }
   // 執行 作業執行前的方法
   try {
       jobFacade.beforeJobExecuted(shardingContexts);
       //CHECKSTYLE:OFF
   } catch (final Throwable cause) {
       //CHECKSTYLE:ON
       jobExceptionHandler.handleException(jobName, cause);
   }
   // 執行 普通觸發的作業
   execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
   // 執行 被跳過觸發的作業
   while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
       jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
   }
   // 執行 作業失效轉移
   jobFacade.failoverIfNecessary();
   // 執行 作業執行後的方法
   try {
       jobFacade.afterJobExecuted(shardingContexts);
       //CHECKSTYLE:OFF
   } catch (final Throwable cause) {
       //CHECKSTYLE:ON
       jobExceptionHandler.handleException(jobName, cause);
   }
}

代碼步驟比較多,我們一步一步往下看。

4.1 檢查作業執行環境

// LiteJobFacade.java
@Override
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
   configService.checkMaxTimeDiffSecondsTolerable();
}
  • 呼叫 ConfigService#checkMaxTimeDiffSecondsTolerable() 方法校驗本機時間是否合法,在《Elastic-Job-Lite 原始碼分析 —— 作業配置》的「3.3」校驗本機時間是否合法 已經解析。

  • 當校驗本機時間不合法時,丟擲異常。若使用 DefaultJobExceptionHandler 作為異常處理,只打印日誌,不會終止作業執行。如果你的作業對時間精準度有比較高的要求,期望作業終止執行,可以自定義 JobExceptionHandler 實現對異常的處理。

4.2 獲取當前作業服務器的分片背景關係

呼叫 LiteJobFacade#getShardingContexts() 方法獲取當前作業服務器的分片背景關係。通過這個方法,作業獲得其所分配執行的分片項,在《Elastic-Job-Lite 原始碼解析 —— 作業分片》詳細分享。

4.3 發佈作業狀態追蹤事件

呼叫 LiteJobFacade#postJobStatusTraceEvent() 方法發佈作業狀態追蹤事件,在《Elastic-Job-Lite 原始碼解析 —— 作業事件追蹤》詳細分享。

4.4 跳過正在運行中的被錯過執行的作業

該邏輯和「4.7」執行被錯過執行的作業,一起解析,可以整體性的理解 Elastic-Job-Lite 對被錯過執行( misfired )的作業處理。

4.5 執行作業執行前的方法

// LiteJobFacade.java
@Override
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
   for (ElasticJobListener each : elasticJobListeners) {
       each.beforeJobExecuted(shardingContexts);
   }
}
  • 呼叫作業監聽器執行作業執行前的方法,在《Elastic-Job-Lite 原始碼解析 —— 作業監聽器》詳細分享。

4.6 執行普通觸發的作業

這個小節的標題不太準確,其他作業來源( ExecutionSource )也是執行這樣的邏輯。本小節執行作業會經歷 4 個方法,方法順序往下呼叫,我們逐個來看。

// AbstractElasticJobExecutor.java
/**
* 執行多個作業的分片
*
@param shardingContexts 分片背景關係集合
@param executionSource 執行來源
*/

private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
}

/**
* 執行多個作業的分片
*
@param shardingContexts 分片背景關係集合
@param executionSource 執行來源
*/

private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
}

/**
* 執行單個作業的分片
*
@param shardingContexts 分片背景關係集合
@param item 分片序號
@param startEvent 執行事件(開始)
*/

private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
}

/**
* 執行單個作業的分片【子類實現】
*
@param shardingContext 分片背景關係集合
*/

protected abstract void process(ShardingContext shardingContext);

ps:作業事件相關邏輯,先統一跳過,在《Elastic-Job-Lite 原始碼解析 —— 作業事件追蹤》詳細分享。


private void execute(shardingContexts, executionSource)

// AbstractElasticJobExecutor.java
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
   // 無可執行的分片,發佈作業狀態追蹤事件(State.TASK_FINISHED)
   if (shardingContexts.getShardingItemParameters().isEmpty()) {
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
       }
       return;
   }
   // 註冊作業啟動信息
   jobFacade.registerJobBegin(shardingContexts);
   // 發佈作業狀態追蹤事件(State.TASK_RUNNING)
   String taskId = shardingContexts.getTaskId();
   if (shardingContexts.isAllowSendJobEvent()) {
       jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
   }
   //
   try {
       process(shardingContexts, executionSource);
   } finally {
       // TODO 考慮增加作業失敗的狀態,並且考慮如何處理作業失敗的整體迴路
       // 註冊作業完成信息
       jobFacade.registerJobCompleted(shardingContexts);
       // 根據是否有異常,發佈作業狀態追蹤事件(State.TASK_FINISHED / State.TASK_ERROR)
       if (itemErrorMessages.isEmpty()) {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
           }
       } else {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
           }
       }
   }
}
  • 方法引數 executionSource 代表執行來源( ExecutionSource ),一共有三種:

    public enum ExecutionSource {
       /**
        * 普通觸發執行
        */

       NORMAL_TRIGGER,
       /**
        * 被錯過執行
        */

       MISFIRE,
       /**
        * 失效轉移執行
        */

       FAILOVER
    }
  • 呼叫 LiteJobFacade#registerJobBegin(...) 方法註冊作業啟動信息:

    // LiteJobFacade.java
    @Override
    public void registerJobBegin(final ShardingContexts shardingContexts) {
       executionService.registerJobBegin(shardingContexts);
    }

    // ExecutionService.java
    public void registerJobBegin(final ShardingContexts shardingContexts) {
       JobRegistry.getInstance().setJobRunning(jobName, true);
       if (!configService.load(true).isMonitorExecution()) {
           return;
       }
       for (int each : shardingContexts.getShardingItemParameters().keySet()) {
           jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
       }
    }
    • 僅當作業配置設置監控作業運行時狀態LiteJobConfiguration.monitorExecution = true )時,記錄作業運行狀態。

    • 呼叫 JobNodeStorage#fillEphemeralJobNode(…) 方法記錄分配的作業分片項正在運行中。如何記錄的,在《Elastic-Job-Lite 原始碼解析 —— 作業資料儲存》詳細分享。

  • 呼叫 LiteJobFacade#registerJobCompleted(...) 方法註冊作業完成信息:

    // LiteJobFacade.java
    @Override
    public void registerJobCompleted(final ShardingContexts shardingContexts) {
       executionService.registerJobCompleted(shardingContexts);
       if (configService.load(true).isFailover()) {
           failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
       }
    }

    // ExecutionService.java
    /**
    * 註冊作業完成信息.

    @param shardingContexts 分片背景關係
    */

    public void registerJobCompleted(final ShardingContexts shardingContexts) {
       JobRegistry.getInstance().setJobRunning(jobName, false);
       if (!configService.load(true).isMonitorExecution()) {
           return;
       }
       for (int each : shardingContexts.getShardingItemParameters().keySet()) {
           jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
       }
    }
    • 僅當作業配置設置監控作業運行時狀態LiteJobConfiguration.monitorExecution = true ),移除作業運行狀態。

    • 呼叫 JobNodeStorage#removeJobNodeIfExisted(…) 方法移除分配的作業分片項正在運行中的標記,表示作業分片項不在運行中狀態。

    • 呼叫 FailoverService#updateFailoverComplete(…) 方法更新執行完畢失效轉移的分片項狀態,在《Elastic-Job-Lite 原始碼解析 —— 作業失效轉移》詳細分享。


private void process(shardingContexts, executionSource)

// AbstractElasticJobExecutor.java
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
   Collection items = shardingContexts.getShardingItemParameters().keySet();
   // 單分片,直接執行
   if (1 == items.size()) {
       int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
       JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
       // 執行一個作業
       process(shardingContexts, item, jobExecutionEvent);
       return;
   }
   // 多分片,並行執行
   final CountDownLatch latch = new CountDownLatch(items.size());
   for (final int each : items) {
       final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
       if (executorService.isShutdown()) {
           return;
       }
       executorService.submit(new Runnable() {

           @Override
           public void run() {
               try {
                   // 執行一個作業
                   process(shardingContexts, each, jobExecutionEvent);
               } finally {
                   latch.countDown();
               }
           }
       });
   }
   // 等待多分片全部完成
   try {
       latch.await();
   } catch (final InterruptedException ex) {
       Thread.currentThread().interrupt();
   }
}
  • 分配分片項時,直接執行,無需使用執行緒池,性能更優。

  • 分配分片項時,使用執行緒池併發執行,通過 CountDownLatch 實現等待分片項全部執行完成。


private void process(shardingContexts, item, startEvent) 
protected abstract void process(shardingContext)

// AbstractElasticJobExecutor.java
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
   // 發佈執行事件(開始)
   if (shardingContexts.isAllowSendJobEvent()) {
       jobFacade.postJobExecutionEvent(startEvent);
   }
   log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
   JobExecutionEvent completeEvent;
   try {
       // 執行單個作業
       process(new ShardingContext(shardingContexts, item));
       // 發佈執行事件(成功)
       completeEvent = startEvent.executionSuccess();
       log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobExecutionEvent(completeEvent);
       }
       // CHECKSTYLE:OFF
   } catch (final Throwable cause) {
       // CHECKSTYLE:ON
       // 發佈執行事件(失敗)
       completeEvent = startEvent.executionFailure(cause);
       jobFacade.postJobExecutionEvent(completeEvent);
       // 設置該分片執行異常信息
       itemErrorMessages.put(item, ExceptionUtil.transform(cause));
       //
       jobExceptionHandler.handleException(jobName, cause);
   }
}

protected abstract void process(ShardingContext shardingContext);
  • 不同作業執行器實現類通過實現 #process(shardingContext) 抽象方法,實現對單個分片項作業的處理。

  • 不同作業執行器實現類通過實現 #process(shardingContext) 抽象方法,實現對單個分片項作業的處理。

  • 不同作業執行器實現類通過實現 #process(shardingContext) 抽象方法,實現對單個分片項作業的處理。

4.6.1 簡單作業執行器

SimpleJobExecutor,簡單作業執行器

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {

    /**
     * 簡單作業實現
     */

    private final SimpleJob simpleJob;

    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);
    }
}
  • 呼叫 SimpleJob#execute() 方法對單個分片項作業進行處理。

4.6.2 資料流作業執行器

DataflowJobExecutor,資料流作業執行器。

public final class DataflowJobExecutor extends AbstractElasticJobExecutor {

    /**
     * 資料流作業物件
     */

    private final DataflowJob dataflowJob;

    @Override
    protected void process(final ShardingContext shardingContext) {
        DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig();
        if (dataflowConfig.isStreamingProcess()) { // 流式處理資料
            streamingExecute(shardingContext);
        } else {
            oneOffExecute(shardingContext);
        }
    }

    /**
     * 流式處理
     *
     * @param shardingContext 分片背景關係
     */

    private void streamingExecute(final ShardingContext shardingContext) {
        List data = fetchData(shardingContext);
        while (null != data && !data.isEmpty()) {
            processData(shardingContext, data);
            if (!getJobFacade().isEligibleForJobRunning()) {
                break;
            }
            data = fetchData(shardingContext);
        }
    }

    /**
     * 一次處理
     *
     * @param shardingContext 分片背景關係
     */

    private void oneOffExecute(final ShardingContext shardingContext) {
        List data = fetchData(shardingContext);
        if (null != data && !data.isEmpty()) {
            processData(shardingContext, data);
        }
    }

}
  • 當作業配置設置流式處理資料( DataflowJobConfiguration.streamingProcess = true ) 時,呼叫 #streamingExecute() 不斷加載資料,不斷處理資料,直到資料為空 或者 作業不適合繼續運行

    // LiteJobFacade.java
    @Override
    public boolean isEligibleForJobRunning() {
       LiteJobConfiguration liteJobConfig = configService.load(true);
       if (liteJobConfig.getTypeConfig() instanceof DataflowJobConfiguration) {
           return !shardingService.isNeedSharding() // 作業不需要重新分片
                   && ((DataflowJobConfiguration) liteJobConfig.getTypeConfig()).isStreamingProcess();
       }
       return !shardingService.isNeedSharding(); // 作業不需要重新分片
    }

    如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。 流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。

    • 作業需要重新分片,所以不適合繼續流式資料處理。

  • 當作業配置設置流式處理資料( DataflowJobConfiguration.streamingProcess = false ) 時,呼叫 #oneOffExecute() 一次加載資料,一次處理資料。

  • 呼叫 #fetchData() 方法加載資料;呼叫 #processData(...) 方法處理資料:

    // DataflowJobExecutor.java
    /**
    * 加載資料
    *
    @param shardingContext 分片背景關係
    @return 資料
    */

    private List fetchData(final ShardingContext shardingContext) {
       return dataflowJob.fetchData(shardingContext);
    }

    /**
    * 處理資料
    *
    @param shardingContext 分片背景關係
    @param data 資料
    */

    private void processData(final ShardingContext shardingContext, final List data) {
       dataflowJob.processData(shardingContext, data);
    }

4.6.3 腳本作業執行器

ScriptJobExecutor,腳本作業執行器。

public final class ScriptJobExecutor extends AbstractElasticJobExecutor {

    @Override
    protected void process(final ShardingContext shardingContext) {
        final String scriptCommandLine = ((ScriptJobConfiguration) getJobRootConfig().getTypeConfig()).getScriptCommandLine();
        if (Strings.isNullOrEmpty(scriptCommandLine)) {
            throw new JobConfigurationException("Cannot find script command line for job '%s', job is not executed.", shardingContext.getJobName());
        }
        executeScript(shardingContext, scriptCommandLine);
    }

    /**
     * 執行腳本
     *
     * @param shardingContext 分片背景關係
     * @param scriptCommandLine 執行腳本路徑
     */

    private void executeScript(final ShardingContext shardingContext, final String scriptCommandLine) {
        CommandLine commandLine = CommandLine.parse(scriptCommandLine);
        // JSON 格式傳遞引數
        commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);
        try {
            new DefaultExecutor().execute(commandLine);
        } catch (final IOException ex) {
            throw new JobConfigurationException("Execute script failure.", ex);
        }
    }
}
  • scriptCommandLine 傳遞的是腳本路徑。使用 Apache Commons Exec 工具包實現腳本呼叫:

    Script型別作業意為腳本型別作業,支持shell,python,perl等所有型別腳本。只需通過控制台或代碼配置scriptCommandLine即可,無需編碼。執行腳本路徑可包含引數,引數傳遞完畢後,作業框架會自動追加最後一個引數為作業運行時信息。

  • 腳本引數傳遞使用 JSON 格式。

4.7 執行被錯過觸發的作業

當作業執行過久,導致到達下次執行時間未進行下一次作業執行,Elastic-Job-Lite 會設置該作業分片項為被錯過執行( misfired )。下一次作業執行時,會補充執行被錯過執行的作業分片項。

標記作業被錯過執行

// JobScheduler.java
private Scheduler createScheduler() {
   Scheduler result;
   // 省略部分代碼
   result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
   return result;
}

private Properties getBaseQuartzProperties() {
   // 省略部分代碼
   result.put("org.quartz.jobStore.misfireThreshold""1");
   return result;
}

// JobScheduleController.class
private CronTrigger createTrigger(final String cron) {
   return TriggerBuilder.newTrigger()
           .withIdentity(triggerIdentity)
           .withSchedule(CronScheduleBuilder.cronSchedule(cron)
           .withMisfireHandlingInstructionDoNothing())
           .build();
}
  • org.quartz.jobStore.misfireThreshold 設置最大允許超過 1 毫秒,作業分片項即被視為錯過執行。

  • #withMisfireHandlingInstructionDoNothing() 設置 Quartz 系統不會立刻再執行任務,而是等到距離目前時間最近的預計時間執行。重新執行被錯過執行的作業交給 Elastic-Job-Lite 處理

  • 使用 TriggerListener 監聽被錯過執行的作業分片項:

    // JobTriggerListener.java
    public final class JobTriggerListener extends TriggerListenerSupport {
    @Override
    public void triggerMisfired(final Trigger trigger) {
        if (null != trigger.getPreviousFireTime()) {
            executionService.setMisfire(shardingService.getLocalShardingItems());
        }
    }

    }

    // ExecutionService.java
    public void setMisfire(final Collection items) {
       for (int each : items) {
           jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
       }
    }

    • 呼叫 #setMisfire(…) 設置作業分片項被錯過執行。

跳過正在運行中的被錯過執行的作業

// LiteJobFacade.java
@Override
public boolean misfireIfRunning(final Collection shardingItems) {
   return executionService.misfireIfHasRunningItems(shardingItems);
}

// ExecutionService.java
public boolean misfireIfHasRunningItems(final Collection items) {
   if (!hasRunningItems(items)) {
       return false;
   }
   setMisfire(items);
   return true;
}

public boolean hasRunningItems(final Collection items) {
   LiteJobConfiguration jobConfig = configService.load(true);
   if (null == jobConfig || !jobConfig.isMonitorExecution()) {
       return false;
   }
   for (int each : items) {
       if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) {
           return true;
       }
   }
   return false;
}
  • 當分配的作業分片項里存在任意一個分片正在運行中,設置分片項被錯過執行( misfired ),並不執行這些作業分片。如果不進行跳過,則可能導致同時運行某個作業分片。

  • 該功能依賴作業配置監控作業運行時狀態LiteJobConfiguration.monitorExecution = true )時生效。

執行被錯過執行的作業分片項

// AbstractElasticJobExecutor.java
public final void execute() {
   // .... 省略部分代碼
   // 執行 被跳過觸發的作業
   while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
       jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
   }
   // .... 省略部分代碼
}

// LiteJobFacade.java
@Override
public boolean isExecuteMisfired(final Collection shardingItems) {
   return isEligibleForJobRunning() // 合適繼續運行
           && configService.load(true).getTypeConfig().getCoreConfig().isMisfire() // 作業配置開啟作業被錯過觸發
           && !executionService.getMisfiredJobItems(shardingItems).isEmpty(); // 所執行的作業分片存在被錯過( misfired )
}

@Override
public void clearMisfire(final Collection shardingItems) {
   executionService.clearMisfire(shardingItems);
}
  • 清除分配的作業分片項被錯過執行的標識,並執行作業分片項。

  • 為什麼此處使用 while(…)防禦性編程#isExecuteMisfired(…) 判斷使用記憶體快取的資料,而該資料的更新依賴 Zookeeper 通知進行異步更新,可能因為各種情況,例如網絡,資料可能未及時更新導致資料不一致。使用 while(…) 進行防禦編程,保證記憶體快取的資料已經更新。

4.8 執行作業失效轉移

// LiteJobFacade.java
@Override
public void failoverIfNecessary() {
   if (configService.load(true).isFailover()) {
       failoverService.failoverIfNecessary();
   }
}
  • 呼叫作業失效轉移服務( FailoverService )執行作業失效轉移( #failoverIfNecessary() ),在《Elastic-Job-Lite 原始碼解析 —— 作業失效轉移》詳細分享。

4.9 執行作業執行後的方法

// LiteJobFacade.java
@Override
public void afterJobExecuted(final ShardingContexts shardingContexts) {
   for (ElasticJobListener each : elasticJobListeners) {
       each.afterJobExecuted(shardingContexts);
   }
}
  • 呼叫作業監聽器執行作業執行後的方法,在《Elastic-Job-Lite 原始碼解析 —— 作業監聽器》詳細分享。

666. 彩蛋

呼!略長略長略長!

下麵會更新如下兩篇文章,為後續的主節點選舉、失效轉移、作業分片策略等文章做鋪墊:

  • 《Elastic-Job-Lite 原始碼解析 —— 註冊中心》

  • 《Elastic-Job-Lite 原始碼解析 —— 作業資料儲存》

道友,趕緊上車,分享一波朋友圈!

啊啊啊,我好想馬上拜讀 Elastic-Job-Cloud。為了你們,我忍住了心碎。

旁白君:煞筆筆者已經偷偷在讀了。 
芋道君:旁白君,你大爺!




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂