歡迎光臨
每天分享高質量文章

分佈式作業 Elastic-Job-Lite 原始碼分析 —— 作業資料儲存

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-storage/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. JobNodePath

  • 3. JobNodeStorage

  • 4. ConfigurationNode

  • 5. ServerNode

  • 6. InstanceNode

  • 7. ShardingNode

  • 8. LeaderNode

  • 9. FailoverNode

  • 10. GuaranteeNode

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 作業資料儲存

涉及到主要類的類圖如下( 打開大圖 ):

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. JobNodePath

JobNodePath,作業節點路徑類。作業節點是在普通的節點前加上作業名稱的前綴

在 Zookeeper 看一個作業的資料儲存:

[zk: localhost:2181(CONNECTED) 65] ls /elastic-job-example-lite-java/javaSimpleJob
[leader, servers, config, instances, sharding]
  • elastic-job-example-lite-java:作業節點集群名,使用 ZookeeperConfiguration.namespace 屬性配置。

  • javaSimpleJob:作業名字,使用 JobCoreConfiguration.jobName 屬性配置。

  • config / servers / instances / sharding / leader:不同服務的資料儲存節點路徑。

JobNodePath,註釋很易懂,點擊鏈接查看。這裡我們梳理下 JobNodePath 和其它節點路徑類的關係:

Zookeeper 路徑 JobNodePath 靜態屬性 JobNodePath 方法 節點路徑類
config CONFIG_NODE #getConfigNodePath() ConfigurationNode
servers SERVERS_NODE #getServerNodePath() ServerNode
instances INSTANCES_NODE #getInstancesNodePath() InstanceNode
sharding SHARDING_NODE #getShardingNodePath() ShardingNode
leader / #getFullPath(node) LeaderNode
leader/failover / #getFullPath(node) FailoverNode
guarantee / #getFullPath(node) GuaranteeNode

3. JobNodeStorage

JobNodeStorage,作業節點資料訪問類。

Elastic-Job-Lite 使用註冊中心儲存作業節點資料,JobNodeStorage 對註冊中心提供的方法做下簡單的封裝提供呼叫。舉個例子:

// JobNodeStorage.java
private final CoordinatorRegistryCenter regCenter;
private final JobNodePath jobNodePath;

/**
* 判斷作業節點是否存在.

@param node 作業節點名稱
@return 作業節點是否存在
*/

public boolean isJobNodeExisted(final String node) {
   return regCenter.isExisted(jobNodePath.getFullPath(node));
}

// JobNodePath.java
/**
* 獲取節點全路徑.

@param node 節點名稱
@return 節點全路徑
*/

public String getFullPath(final String node) {
   return String.format("/%s/%s", jobName, node);
}
  • 傳遞的引數 node 只是簡單的作業節點名稱,通過呼叫 JobNodePath#getFullPath(…) 方法獲取節點全路徑。

  • 其它方法類似,有興趣的同學點擊鏈接查看。

4. ConfigurationNode

ConfigurationNode,配置節點路徑。

在 Zookeeper 看一個作業的配置節點資料儲存:

[zk: localhost:2181(CONNECTED) 67] get /elastic-job-example-lite-java/javaSimpleJob/config
{"jobName":"javaSimpleJob","jobClass":"com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob","jobType":"SIMPLE","cron":"0/5 * * * * ?","shardingTotalCount":3,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":true,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true}
  • /config 是持久節點,儲存Lite作業配置( LiteJobConfiguration ) JSON化字串。

ConfigurationNode 代碼如下:

public final class ConfigurationNode {

    static final String ROOT = "config";
}

ConfigurationNode 如何讀取、儲存,在《Elastic-Job-Lite 原始碼分析 —— 作業配置》的「3.」作業配置服務已經詳細解析。

5. ServerNode

ServerNode,服務器節點路徑。

在 Zookeeper 看一個作業的服務器節點資料儲存:

[zk: localhost:2181(CONNECTED) 72] ls /elastic-job-example-lite-java/javaSimpleJob/servers
[192.168.16.164, 169.254.93.156, 192.168.252.57, 192.168.16.137, 192.168.3.2, 192.168.43.31]
[zk: localhost:2181(CONNECTED) 73] get /elastic-job-example-lite-java/javaSimpleJob/servers/192.168.16.164
  • /servers/ 目錄下以 IP 為資料節點路徑儲存每個服務器節點。如果相同IP服務器有多個服務器節點,只儲存一個 IP 資料節點。

  • /servers/${IP} 是持久節點,不儲存任何信息,只是空串( "");

ServerNode 代碼如下:

public final class ServerNode {

    /**
     * 服務器信息根節點.
     */

    public static final String ROOT = "servers";

    private static final String SERVERS = ROOT + "/%s";
}

ServerNode 如何儲存,在《Elastic-Job-Lite 原始碼分析 —— 作業初始化》的「3.2.4」註冊作業啟動信息已經詳細解析。

6. InstanceNode

InstanceNode,運行實體節點路徑。

在 Zookeeper 看一個作業的運行實體節點資料儲存:

[zk: localhost:2181(CONNECTED) 81] ls /elastic-job-example-lite-java/javaSimpleJob/instances
[[email protected]@56010]
[zk: localhost:2181(CONNECTED) 82] get /elastic-job-example-lite-java/javaSimpleJob/instances
  • /instances 目錄下以作業實體主鍵( JOB_INSTANCE_ID ) 為資料節點路徑儲存每個運行實體節點。

  • /instances/${JOB_INSTANCE_ID} 是臨時節點,不儲存任何信息,只是空串( "");

  • JOB_INSTANCE_ID 生成方式:

    // JobInstance.java

    jobInstanceId = IpUtils.getIp()
                    + DELIMITER
                    + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // PID

InstanceNode 代碼如下:

public final class InstanceNode {

    /**
     * 運行實體信息根節點.
     */

    public static final String ROOT = "instances";

    private static final String INSTANCES = ROOT + "/%s";

    /**
     * 獲取當前運行實體節點路徑
     *
     * @return 當前運行實體節點路徑
     */

    String getLocalInstanceNode() {
        return String.format(INSTANCES, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    }
}

InstanceNode 如何儲存,在《Elastic-Job-Lite 原始碼分析 —— 作業初始化》的「3.2.4」註冊作業啟動信息已經詳細解析。

7. ShardingNode

ShardingNode,分片節點路徑。

在 Zookeeper 看一個作業的分片節點資料儲存:

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/sharding
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/sharding/0
[running, instance, misfire]
[zk: localhost:2181(CONNECTED) 3] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
[email protected]@56010
  • /sharding/${ITEM_ID} 目錄下以作業分片項序號( ITEM_ID ) 為資料節點路徑儲存作業分片項的 instancerunning / misfire / disable 資料節點信息。

  • /sharding/${ITEM_ID}/instance 是臨時節點,儲存該作業分片項分配到的作業實體主鍵JOB_INSTANCE_ID)。在《Elastic-Job-Lite 原始碼分析 —— 作業分片》詳細解析。

  • /sharding/${ITEM_ID}/running 是臨時節點,當該作業分片項正在運行,儲存空串( "" );當該作業分片項不在運行,移除該資料節點。《Elastic-Job-Lite 原始碼分析 —— 作業執行》的「4.6」執行普通觸發的作業已經詳細解析。

  • /sharding/${ITEM_ID}/misfire 是永久節點,當該作業分片項被錯過執行,儲存空串( "" );當該作業分片項重新執行,移除該資料節點。《Elastic-Job-Lite 原始碼分析 —— 作業執行》的「4.7」執行被錯過觸發的作業已經詳細解析。

  • /sharding/${ITEM_ID}/disable 是永久節點,當該作業分片項被禁用,儲存空串( "" );當該作業分片項被開啟,移除資料節點。

ShardingNode,代碼如下:

public final class ShardingNode {

    /**
     * 執行狀態根節點.
     */

    public static final String ROOT = "sharding";

    static final String INSTANCE_APPENDIX = "instance";

    public static final String INSTANCE = ROOT + "/%s/" + INSTANCE_APPENDIX;

    static final String RUNNING_APPENDIX = "running";

    static final String RUNNING = ROOT + "/%s/" + RUNNING_APPENDIX;

    static final String MISFIRE = ROOT + "/%s/misfire";

    static final String DISABLED = ROOT + "/%s/disabled";

    static final String LEADER_ROOT = LeaderNode.ROOT + "/" + ROOT;

    static final String NECESSARY = LEADER_ROOT + "/necessary";

    static final String PROCESSING = LEADER_ROOT + "/processing";
}
  • LEADER_ROOT / NECESSARY / PROCESSING 放在「4.7」LeaderNode 解析。

8. LeaderNode

LeaderNode,主節點路徑。

在 leader 目錄下一共有三個儲存子節點:

  • election:主節點選舉。

  • sharding:作業分片項分配。

  • failover:作業失效轉移。

主節點選舉

在 Zookeeper 看一個作業的 leader/election 節點資料儲存:

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/election
[latch, instance]
[zk: localhost:2181(CONNECTED) 2] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
[email protected]@1910
  • /leader/election/instance 是臨時節點,當作業集群完成選舉後,儲存主作業實體主鍵( JOB_INSTANCE_ID)。

  • /leader/election/latch 主節點選舉分佈式鎖,是 Apache Curator 針對 Zookeeper 實現的分佈式鎖的一種,筆者暫未瞭解儲存形式,無法解釋。在《Elastic-Job-Lite 原始碼分析 —— 註冊中心》的「3.1」在主節點執行操作進行了簡單解析。

作業分片項分配

在 Zookeeper 看一個作業的 leader/sharding 節點資料儲存:

[zk: localhost:2181(CONNECTED) 1] ls /elastic-job-example-lite-java/javaSimpleJob/leader/sharding
[necessary, processing]
[zk: localhost:2181(CONNECTED) 2] 個get /elastic-job-example-lite-java/javaSimpleJob/leader/sharding

[zk: localhost:2181(CONNECTED) 3] 個get /elastic-job-example-lite-java/javaSimpleJob/leader/processing
  • /leader/sharding/necessary 是永久節點,當相同作業有新的作業節點加入或者移除時,儲存空串( "" ),標記需要進行作業分片項重新分配;當重新分配完成後,移除該資料節點。

  • /leader/sharding/processing 是臨時節點,當開始重新分配作業分片項時,儲存空串( "" ),標記正在進行重新分配;當重新分配完成後,移除該資料節點。

  • 當且僅當作業節點為主節點時,才可以執行作業分片項分配,《Elastic-Job-Lite 原始碼分析 —— 作業分片》詳細解析。

作業失效轉移

作業失效轉移資料節點在 FailoverNode,放在「9」FailoverNode 解析。

這裡大家可能會和我一樣比較疑惑,為什麼 /leader/failover 放在 /leader 目錄下,而不獨立成為一個根目錄?經過確認,作業失效轉移 設計到分佈式鎖,統一儲存在 /leader 目錄下。


LeaderNode,代碼如下:

public final class LeaderNode {

    /**
     * 主節點根路徑.
     */

    public static final String ROOT = "leader";

    static final String ELECTION_ROOT = ROOT + "/election";

    static final String INSTANCE = ELECTION_ROOT + "/instance";

    static final String  LATCH = ELECTION_ROOT + "/latch";
}

9. FailoverNode

FailoverNode,失效轉移節點路徑。

在 Zookeeper 看一個作業的失效轉移節點資料儲存:

[zk: localhost:2181(CONNECTED) 2] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover
[latch, items]
[zk: localhost:2181(CONNECTED) 4] ls /elastic-job-example-lite-java/javaSimpleJob/leader/failover/items
[0]
  • /leader/failover/latch 作業失效轉移分佈式鎖,和 /leader/failover/latch 是一致的。

  • /leader/items/${ITEM_ID} 是永久節點,當某台作業節點 CRASH 時,其分配的作業分片項標記需要進行失效轉移,儲存其分配的作業分片項的 /leader/items/${ITEM_ID} 為空串( "" );當失效轉移標記,移除 /leader/items/${ITEM_ID},儲存 /sharding/${ITEM_ID}/failover 為空串( "" ),臨時節點,需要進行失效轉移執行。《Elastic-Job-Lite 原始碼分析 —— 作業失效轉移》詳細解析。

FailoverNode 代碼如下:

public final class FailoverNode {

    static final String FAILOVER = "failover";

    static final String LEADER_ROOT = LeaderNode.ROOT + "/" + FAILOVER;

    static final String ITEMS_ROOT = LEADER_ROOT + "/items";

    static final String ITEMS = ITEMS_ROOT + "/%s";

    static final String LATCH = LEADER_ROOT + "/latch";

    private static final String EXECUTION_FAILOVER = ShardingNode.ROOT + "/%s/" + FAILOVER;

    static String getItemsNode(final int item) {
        return String.format(ITEMS, item);
    }

    static String getExecutionFailoverNode(final int item) {
        return String.format(EXECUTION_FAILOVER, item);
    }
}

10. GuaranteeNode

GuaranteeNode,保證分佈式任務全部開始和結束狀態節點路徑。在《Elastic-Job-Lite 原始碼分析 —— 作業監聽器》詳細解析。

666. 彩蛋

旁白君:芋道君,你又水更了! 
芋道君:屁屁屁,勞資懟死你!如下是作業資料儲存整理,哼哼哈兮!

道友,趕緊上車,分享一波朋友圈!




如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢

目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing

… 一共 69+ 篇

目前在知識星球更新了《Netty 原始碼解析》目錄如下:

01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap

05. 事件輪詢 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 位元組緩衝區 ByteBuf

09. 通道處理器 ChannelHandler

10. 編解碼 Codec

11. 工具類 Util

… 一共 61+ 篇

目前在知識星球更新了《資料庫物體設計》目錄如下:


01. 商品模塊
02. 交易模塊
03. 營銷模塊
04. 公用模塊

… 一共 17+ 篇

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂