歡迎光臨
每天分享高質量文章

分散式作業 Elastic-Job-Lite 原始碼分析 —— 作業分片

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-sharding/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

1. 概述2. 作業分片條件3. 分配作業分片項4. 獲取作業分片背景關係集合666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作業分片

涉及到主要類的類圖如下( 開啟大圖 ):

  • 粉色的類在 com.dangdang.ddframe.job.lite.internal.sharding 包下,實現了 Elastic-Job-Lite 作業分片。

  • ShardingService,作業分片服務。

  • ShardingNode,作業分片資料儲存路徑。

  • ShardingListenerManager,作業分片監聽管理器。

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. 作業分片條件

當作業滿足分片條件時,不會立即進行作業分片分配,而是設定需要重新進行分片的標記,等到作業分片獲取時,判斷有該標記後執行作業分配。

設定需要重新進行分片的標記的程式碼如下:

// ShardingService.java
/**
* 設定需要重新分片的標記.
*/

public void setReshardingFlag() {
   jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}

// JobNodeStorage.java
/**
* 如果存在則建立作業節點.
* 如果作業根節點不存在表示作業已經停止, 不再繼續建立節點.

@param node 作業節點名稱
*/

public void createJobNodeIfNeeded(final String node) {
   if (isJobRootNodeExisted() && !isJobNodeExisted(node)) {
       regCenter.persist(jobNodePath.getFullPath(node), "");
   }
}
  • 呼叫 #setReshardingFlag() 方法設定需要重新分片的標記/${JOB_NAME}/leader/sharding/necessary。該 Zookeeper 資料節點是永久節點,儲存空串( ""),使用 zkClient 檢視如下:

    [zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
    [necessary]
    [zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding/necessary
     
  • 設定標記之後,透過呼叫 #isNeedSharding() 方法即可判斷是否需要重新分片。

    // ShardingService.java
    /**
    * 判斷是否需要重分片.

    @return 是否需要重分片
    */

    public boolean isNeedSharding() {
       return jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY);
    }

    // JobNodeStorage.java
    /**
    * 判斷作業節點是否存在.

    @param node 作業節點名稱
    @return 作業節點是否存在
    */

    public boolean isJobNodeExisted(final String node) {
       return regCenter.isExisted(jobNodePath.getFullPath(node));
    }

設定需要重新進行分片有 4 種情況

第一種,註冊作業啟動資訊時。

// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // ... 省略無關程式碼
   // 設定 需要重新分片的標記
   shardingService.setReshardingFlag();
  // ... 省略無關程式碼
}

第二種,作業分片總數( JobCoreConfiguration.shardingTotalCount )變化時。

// ShardingTotalCountChangedJobListener.java
class ShardingTotalCountChangedJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (configNode.isConfigPath(path)
               && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
           int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
           if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { // 作業分片總數變化
               // 設定需要重新分片的標記
               shardingService.setReshardingFlag();
               // 設定當前分片總數
               JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
           }
       }
   }
}

第三種,伺服器變化時。

// ShardingListenerManager.java
class ListenServersChangedJobListener extends AbstractJobListener {

   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && (isInstanceChange(eventType, path)
                   || isServerChange(path))) {
           shardingService.setReshardingFlag();
       }
   }

   private boolean isInstanceChange(final Type eventType, final String path) {
       return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
   }

   private boolean isServerChange(final String path) {
       return serverNode.isServerPath(path);
   }
}
  • 伺服器變化有兩種情況。

  • 第一種,#isServerChange(…) 伺服器被開啟或禁用。

  • 第二種,#isInstanceChange(…) 作業節點新增或者移除。

第四種,在《Elastic-Job-Lite 原始碼解析 —— 自診斷修複》詳細分享。

3. 分配作業分片項

呼叫 ShardingService#shardingIfNecessary() 方法,如果需要分片且當前節點為主節點, 則作業分片。

總體流程如下順序圖:( 開啟大圖 ):

實現程式碼如下:

// ShardingService.java
/**
* 如果需要分片且當前節點為主節點, 則作業分片.

* 如果當前無可用節點則不分片.
*/

public void shardingIfNecessary() {
   List availableJobInstances = instanceService.getAvailableJobInstances();
   if (!isNeedSharding() // 判斷是否需要重新分片
           || availableJobInstances.isEmpty()) {
       return;
   }
   // 【非主節點】等待 作業分片項分配完成
   if (!leaderService.isLeaderUntilBlock()) { // 判斷是否為【主節點】
       blockUntilShardingCompleted();
       return;
   }
   // 【主節點】作業分片項分配
   // 等待 作業未在執行中狀態
   waitingOtherJobCompleted();
   //
   LiteJobConfiguration liteJobConfig = configService.load(false);
   int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
   // 設定 作業正在重分片的標記
   log.debug("Job '{}' sharding begin.", jobName);
   jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
   // 重置 作業分片項資訊
   resetShardingInfo(shardingTotalCount);
   // 【事務中】設定 作業分片項資訊
   JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
   jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
   log.debug("Job '{}' sharding complete.", jobName);
}
  • 呼叫 #isNeedSharding() 方法判斷是否需要重新分片。

  • 呼叫 LeaderService#isLeaderUntilBlock() 方法判斷是否為主節點。作業分片項的分配過程:

    • 【主節點】執行作業分片項分配。

    • 【非主節點】等待作業分片項分配完成。

    • LeaderService#isLeaderUntilBlock() 方法在《Elastic-Job-Lite 原始碼分析 —— 主節點選舉》「3. 選舉主節點」有詳細分享。

  • 呼叫 #blockUntilShardingCompleted() 方法【非主節點】等待作業分片項分配完成。

    private void blockUntilShardingCompleted() {
       while (!leaderService.isLeaderUntilBlock() // 當前作業節點不為【主節點】
               && (jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY) // 存在作業需要重分片的標記
                   || jobNodeStorage.isJobNodeExisted(ShardingNode.PROCESSING))) { // 存在作業正在重分片的標記
           log.debug("Job '{}' sleep short time until sharding completed.", jobName);
           BlockUtils.waitingShortTime();
       }
    }
    • 呼叫 #LeaderService#isLeaderUntilBlock() 方法判斷是否為主節點。為什麼上面判斷了一次,這裡又判斷一次?主節點作業分片項分配過程中,不排除自己掛掉了,此時【非主節點】若選舉成主節點,無需繼續等待,當然也不能等待,因為已經沒節點在執行作業分片項分配,所有節點都會卡在這裡。

    • 當 作業需要重分片的標記作業正在重分片的標記 都不存在時,意味著作業分片項分配已經完成,下文 PersistShardingInfoTransactionExecutionCallback 類裡我們會看到。

  • 呼叫 #waitingOtherJobCompleted() 方法等待作業未在執行中狀態。作業是否在執行中需要 LiteJobConfiguration.monitorExecution = true,《Elastic-Job-Lite 原始碼分析 —— 作業執行》「4.6 執行普通觸發的作業」有詳細分享。

  • 呼叫 ConfigurationService#load(…) 方法從註冊中心獲取作業配置( 非快取 ),避免主節點本地作業配置可能非最新的,主要目的是獲得作業分片總數( shardingTotalCount )。

  • 呼叫 jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "") 設定作業正在重分片的標記 /${JOB_NAME}/leader/sharding/processing。該 Zookeeper 資料節點是臨時節點,儲存空串( "" ),僅用於標記作業正在重分片,無特別業務邏輯。

  • 呼叫 #resetShardingInfo(...) 方法重置作業分片資訊。

    private void resetShardingInfo(final int shardingTotalCount) {
      // 重置 有效的作業分片項
      for (int i = 0; i       jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getInstanceNode(i)); // 移除 /${JOB_NAME}/sharding/${ITEM_ID}/instance
          jobNodeStorage.createJobNodeIfNeeded(ShardingNode.ROOT + "/" + i); // 建立 /${JOB_NAME}/sharding/${ITEM_ID}
      }
      // 移除 多餘的作業分片項
      int actualShardingTotalCount = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT).size();
      if (actualShardingTotalCount > shardingTotalCount) {
          for (int i = shardingTotalCount; i           jobNodeStorage.removeJobNodeIfExisted(ShardingNode.ROOT + "/" + i); // 移除 /${JOB_NAME}/sharding/${ITEM_ID}
          }
      }
    }
  • 呼叫 JobShardingStrategy#sharding(…) 方法計算每個節點分配的作業分片項。《Elastic-Job-Lite 原始碼分析 —— 作業分片策略》有詳細分享。

  • 呼叫 JobNodeStorage#executeInTransaction(...) + PersistShardingInfoTransactionExecutionCallback#execute() 方法實現在事務設定每個節點分配的作業分片項。

    // PersistShardingInfoTransactionExecutionCallback.java
    class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {

       /**
        * 作業分片項分配結果
        * key:作業節點
        * value:作業分片項
        */

       private final Map> shardingResults;

       @Override
       public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
           // 設定 每個節點分配的作業分片項
           for (Map.Entry> entry : shardingResults.entrySet()) {
               for (int shardingItem : entry.getValue()) {
                   curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem))
                           , entry.getKey().getJobInstanceId().getBytes()).and();
               }
           }
           // 移除 作業需要重分片的標記、作業正在重分片的標記
           curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
           curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
       }
    }

    // JobNodeStorage.java
    /**
    * 在事務中執行操作.

    @param callback 執行操作的回呼
    */

    public void executeInTransaction(final TransactionExecutionCallback callback) {
       try {
           CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
           callback.execute(curatorTransactionFinal);
           curatorTransactionFinal.commit();
       } catch (final Exception ex) {
           RegExceptionHandler.handleException(ex);
       }
    }
    [zk: localhost:2181(CONNECTED) 0] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
    192.168.3.2@-@31492
    • 設定臨時資料節點 /${JOB_NAME}/sharding/${ITEM_ID}/instance 為分配的作業節點的作業實體主鍵( jobInstanceId )。使用 zkClient 檢視如下:

作業分片項分配整體流程有點長,耐著心看,畢竟是核心程式碼喲。如果中間有任何疑問,歡迎給我公眾號:芋道原始碼 留言。

4. 獲取作業分片背景關係集合

在《Elastic-Job-Lite 原始碼分析 —— 作業執行的》「4.2 獲取當前作業伺服器的分片背景關係」中,我們可以看到作業執行器( AbstractElasticJobExecutor ) 執行作業時,會獲取當前作業伺服器的分片背景關係進行執行。獲取過程總體如下順序圖( 開啟大圖 ):

  • 橘色叉叉在《Elastic-Job-Lite 原始碼解析 —— 作業失效轉移》有詳細分享。

實現程式碼如下:

// LiteJobFacade.java
@Override
public ShardingContexts getShardingContexts() {
   // 【忽略,作業失效轉移詳解】獲得 失效轉移的作業分片項
   boolean isFailover = configService.load(true).isFailover();
   if (isFailover) {
       List failoverShardingItems = failoverService.getLocalFailoverItems();
       if (!failoverShardingItems.isEmpty()) {
           return executionContextService.getJobShardingContext(failoverShardingItems);
       }
   }
   // 作業分片,如果需要分片且當前節點為主節點
   shardingService.shardingIfNecessary();
   // 獲得 分配在本機的作業分片項
   List shardingItems = shardingService.getLocalShardingItems();
   // 【忽略,作業失效轉移詳解】移除 分配在本機的失效轉移的作業分片專案
   if (isFailover) {
       shardingItems.removeAll(failoverService.getLocalTakeOffItems());
   }
   // 移除 被禁用的作業分片項
   shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
   // 獲取當前作業伺服器分片背景關係
   return executionContextService.getJobShardingContext(shardingItems);
}
  • 呼叫 ShardingService#shardingIfNecessary() 方法,如果需要分片且當前節點為主節點,作業分片項分配不是每次都需要作業分片,必須滿足「2. 作業分片條件」才執行作業分片

  • 呼叫 ShardingService#getLocalShardingItems()方法,獲得分配在本機的作業分片項,即 /${JOB_NAME}/sharding/${ITEM_ID}/instance 為本機的作業分片項。

    // ShardingService.java
    /**
    * 獲取執行在本作業實體的分片項集合.

    @return 執行在本作業實體的分片項集合
    */

    public List getLocalShardingItems() {
       if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
           return Collections.emptyList();
       }
       return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    }

    /**
    * 獲取作業執行實體的分片項集合.
    *
    @param jobInstanceId 作業執行實體主鍵
    @return 作業執行實體的分片項集合
    */

    public List getShardingItems(final String jobInstanceId) {
       JobInstance jobInstance = new JobInstance(jobInstanceId);
       if (!serverService.isAvailableServer(jobInstance.getIp())) {
           return Collections.emptyList();
       }
       List result = new LinkedList<>();
       int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
       for (int i = 0; i        // /${JOB_NAME}/sharding/${ITEM_ID}/instance
           if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
               result.add(i);
           }
       }
       return result;
    }
  • 呼叫 shardingItems.removeAll(executionService.getDisabledItems(shardingItems)),移除被禁用的作業分片項,即 /${JOB_NAME}/sharding/${ITEM_ID}/disabled 存在的作業分片項。

    // ExecutionService.java
    /**
    * 獲取禁用的任務分片項.
    *
    @param items 需要獲取禁用的任務分片項
    @return 禁用的任務分片項
    */

    public List getDisabledItems(final List items) {
       List result = new ArrayList<>(items.size());
       for (int each : items) {
           // /¨E123EJOB¨E95ENAME¨E125E/sharding/¨E123EJOB¨E95ENAME¨E125E/sharding/{ITEM_ID}/disabled
           if (jobNodeStorage.isJobNodeExisted(ShardingNode.getDisabledNode(each))) {
               result.add(each);
           }
       }
       return result;
    }
  • 呼叫 ExecutionContextService#getJobShardingContext(…) 方法,獲取當前作業伺服器分片背景關係。

  • 獲取當前作業伺服器分片背景關係

    呼叫 ExecutionContextService#getJobShardingContext(...) 方法,獲取當前作業伺服器分片背景關係:

    // ExecutionContextService.java
    public ShardingContexts getJobShardingContext(final List shardingItems) {
       LiteJobConfiguration liteJobConfig = configService.load(false);
       // 移除 正在執行中的作業分片項
       removeRunningIfMonitorExecution(liteJobConfig.isMonitorExecution(), shardingItems);
       //
       if (shardingItems.isEmpty()) {
           return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 
                   liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(), Collections.emptyMap());
       }
       // 解析分片引數
       Map shardingItemParameterMap = new ShardingItemParameters(liteJobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
       // 建立 分片背景關係集合
       return new ShardingContexts(buildTaskId(liteJobConfig, shardingItems), //
               liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
               liteJobConfig.getTypeConfig().getCoreConfig().getJobParameter(),
               getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap)); // 獲得當前作業節點的分片引數
    }
    • 呼叫 #removeRunningIfMonitorExecution() 方法,移除正在執行中的作業分片項。

      private void removeRunningIfMonitorExecution(final boolean monitorExecution, final List shardingItems) {
         if (!monitorExecution) {
             return;
         }
         List runningShardingItems = new ArrayList<>(shardingItems.size());
         for (int each : shardingItems) {
             if (isRunning(each)) {
                 runningShardingItems.add(each); // /¨E123EJOB¨E95ENAME¨E125E/sharding/¨E123EJOB¨E95ENAME¨E125E/sharding/{ITEM_ID}/running
             }
         }
         shardingItems.removeAll(runningShardingItems);
      }

      private boolean isRunning(final int shardingItem) {
         return jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(shardingItem));
      }
  • 使用 ShardingItemParameters 解析作業分片引數。例如作業分片引數( JobCoreConfiguration.shardingItemParameters="0=Beijing,1=Shanghai,2=Guangzhou") 解析結果:

    • ShardingItemParameters 程式碼清晰易懂,點選連結直接檢視。

  • 呼叫 #buildTaskId(...) 方法,建立作業任務ID( ShardingContexts.taskId ):

    private String buildTaskId(final LiteJobConfiguration liteJobConfig, final List shardingItems) {
       JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
       return Joiner.on("@-@").join(liteJobConfig.getJobName(), Joiner.on(",").join(shardingItems), "READY"
               null == jobInstance.getJobInstanceId() ? "127.0.0.1@-@1" : jobInstance.getJobInstanceId()); 
    }
    • taskId = ${JOB_NAME} + @-@ + ${SHARDING_ITEMS} + @-@ + READY + @-@ + ${IP} + @-@ + ${PID}。例如:javaSimpleJob@-@0,1,2@-@READY@-@192.168.3.2@-@38330

  • 呼叫 #getAssignedShardingItemParameterMap(...) 方法,獲得當前作業節點的分片引數。

    private Map getAssignedShardingItemParameterMap(final List shardingItems, final Map shardingItemParameterMap) {
       Map result = new HashMap<>(shardingItemParameterMap.size(), 1);
       for (int each : shardingItems) {
           result.put(each, shardingItemParameterMap.get(each));
       }
       return result;
    }

  • ShardingContexts,分片背景關係集合。

    public final class ShardingContexts implements Serializable {
    private static final long serialVersionUID = -4585977349142082152L;

    /**
     * 作業任務ID.
     */

    private final String taskId;
    /**
     * 作業名稱.
     */

    private final String jobName;
    /**
     * 分片總數.
     */

    private final int shardingTotalCount;
    /**
     * 作業自定義引數.
     * 可以配置多個相同的作業, 但是用不同的引數作為不同的排程實體.
     */

    private final String jobParameter;
    /**
     * 分配於本作業實體的分片項和引數的Map.
     */

    private final Map shardingItemParameters;
    /**
     * 作業事件取樣統計數.
     */

    private int jobEventSamplingCount;
    /**
     * 當前作業事件取樣統計數.
     */

    @Setter
    private int currentJobEventSamplingCount;
    /**
     * 是否允許可以傳送作業事件.
     */

    @Setter
    private boolean allowSendJobEvent = true;

    }

    • jobEventSamplingCountcurrentJobEventSamplingCount 在 Elastic-Job-Lite 暫未還使用,在 Elastic-Job-Cloud 使用。

    666. 彩蛋

    旁白君:小夥伴,更新了乾貨嘛,雙擊 666。 
    芋道君:那必須的嘛,而且這麼勤快更新!是不是應該分享一波朋友圈。

    道友,趕緊上車,分享一波朋友圈!



    如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢

    目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:

    01. 除錯環境搭建
    02. 專案結構一覽
    03. 配置 Configuration
    04. 核心流程一覽

    05. 拓展機制 SPI

    06. 執行緒池

    07. 服務暴露 Export

    08. 服務取用 Refer

    09. 註冊中心 Registry

    10. 動態編譯 Compile

    11. 動態代理 Proxy

    12. 服務呼叫 Invoke

    13. 呼叫特性 

    14. 過濾器 Filter

    15. NIO 伺服器

    16. P2P 伺服器

    17. HTTP 伺服器

    18. 序列化 Serialization

    19. 叢集容錯 Cluster

    20. 優雅停機

    21. 日誌適配

    22. 狀態檢查

    23. 監控中心 Monitor

    24. 管理中心 Admin

    25. 運維命令 QOS

    26. 鏈路追蹤 Tracing

    … 一共 69+ 篇

    目前在知識星球更新了《Netty 原始碼解析》目錄如下:

    01. 除錯環境搭建
    02. NIO 基礎
    03. Netty 簡介
    04. 啟動 Bootstrap

    05. 事件輪詢 EventLoop

    06. 通道管道 ChannelPipeline

    07. 通道 Channel

    08. 位元組緩衝區 ByteBuf

    09. 通道處理器 ChannelHandler

    10. 編解碼 Codec

    11. 工具類 Util

    … 一共 61+ 篇

    目前在知識星球更新了《資料庫物體設計》目錄如下:


    01. 商品模組
    02. 交易模組
    03. 營銷模組
    04. 公用模組

    … 一共 17+ 篇


    目前在知識星球更新了《Spring 原始碼解析》目錄如下:


    01. 除錯環境搭建
    02. IoC Resource 定位
    03. IoC BeanDefinition 載入

    04. IoC BeanDefinition 註冊

    05. IoC Bean 獲取

    06. IoC Bean 生命週期

    … 一共 35+ 篇


    原始碼不易↓↓↓

    點贊支援老艿艿↓↓

    贊(0)

    分享創造快樂