歡迎光臨
每天分享高質量文章

分佈式作業 Elastic-Job-Lite 原始碼分析 —— 註冊中心

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 基於 Zookeeper 註冊中心

  • 3. 作業節點資料訪問類

  • 666. 彩蛋


1. 概述

本文主要分享 Elastic-Job-Lite 註冊中心

涉及到主要類的類圖如下( 打開大圖 ):

  • 黃色的類在 elastic-job-common-core 專案里,為 Elastic-Job-Lite、Elastic-Job-Cloud 公用註冊中心類。

  • 作業節點資料訪問類( JobNodeStorage )的在主節點執行操作在事務中執行操作兩個方法和註冊中心協調分佈式服務有關係,從《Elastic-Job-Lite 原始碼解析 —— 作業資料儲存》摘出來,放本文解析。

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

2. 基於 Zookeeper 註冊中心

ZookeeperRegistryCenter,基於 Zookeeper 註冊中心。從上面的類圖可以看到,ZookeeperRegistryCenter 實現 CoordinatorRegistryCenter 接口,CoordinatorRegistryCenter 繼承 RegistryCenter 接口。

  • RegistryCenter,註冊中心,定義了簡單的增刪改查註冊資料和查詢時間的接口方法。

  • CoordinatorRegistryCenter,用於協調分佈式服務的註冊中心,定義了持久節點、臨時節點、持久順序節點、臨時順序節點等目錄服務接口方法,隱性的要求提供事務分佈式鎖資料訂閱等特性。

ZookeeperRegistryCenter 使用 Apache Curator 進行 Zookeeper 註冊中心。

2.1 初始化

ZookeeperConfiguration,基於 Zookeeper 的註冊中心配置,註釋完整,點擊鏈接直接查看。

@Override
public void init() {
   log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
   CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
           .connectString(zkConfig.getServerLists())
           .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
           .namespace(zkConfig.getNamespace()); // 命名空間
   if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
       builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); // 會話超時時間,預設 60 * 1000 毫秒
   }
   if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
       builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); // 連接超時時間,預設 15 * 1000 毫秒
   }
   // 認證
   if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
       builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
               .aclProvider(new ACLProvider() {

                   @Override
                   public List getDefaultAcl() {
                       return ZooDefs.Ids.CREATOR_ALL_ACL;
                   }

                   @Override
                   public List getAclForPath(final String path) {
                       return ZooDefs.Ids.CREATOR_ALL_ACL;
                   }
               });
   }
   client = builder.build();
   client.start();
   // 連接 Zookeeper
   try {
       if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
           client.close();
           throw new KeeperException.OperationTimeoutException();
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
  • ExponentialBackoffRetry,當 Zookeeper 失去鏈接後重新連接的一種策略:動態計算每次計算重連的間隔,時間間隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))。如果對其它重連策略感興趣,可以看 RetryPolicy 的實現類,本文就不展開了。

  • 相同的作業集群使用相同的 Zookeeper 命名空間( ZookeeperConfiguration.namespace )。

2.2 快取

通過 Curator TreeCache 實現監控整個樹( Zookeeper目錄 )的資料訂閱和快取,包括節點的狀態,子節點的狀態。

初始化作業快取

作業初始化註冊時,初始化快取。

// JobRegistry.java
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
   schedulerMap.put(jobName, jobScheduleController);
   regCenterMap.put(jobName, regCenter);
   // 添加註冊中心快取
   regCenter.addCacheData("/" + jobName);
}

// ZookeeperRegistryCenter.java
/**
* 快取
* key:/作業名/
*/

private final Map caches = new HashMap<>();

作業服務訂閱資料

每個不同的服務,都會訂閱資料實現功能邏輯。在後續不同服務的文章,我們會詳細解析。?

public void addDataListener(final TreeCacheListener listener) {
   TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
   cache.getListenable().addListener(listener);
}

關閉作業快取

@Override
public void evictCacheData(final String cachePath) {
   TreeCache cache = caches.remove(cachePath + "/");
   if (null != cache) {
       cache.close();
   }
}

對 Curator TreeCache 感興趣的同學,可以點擊鏈接繼續瞭解。

2.3 關閉

public void close() {
   for (Entry each : caches.entrySet()) {
       each.getValue().close();
   }
   waitForCacheClose();
   CloseableUtils.closeQuietly(client);
}

/* 
* 因為異步處理, 可能會導致client先關閉而cache還未關閉結束.
* 等待Curator新版本解決這個bug.
* BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
*/

private void waitForCacheClose() {
   try {
       Thread.sleep(500L); // 等待500ms, cache先關閉再關閉client, 否則會拋異常
   } catch (final InterruptedException ex) {
       Thread.currentThread().interrupt();
   }
}

2.4 獲得資料

@Override
public String get(final String key) {
   TreeCache cache = findTreeCache(key); // 獲取快取
   if (null == cache) {
       return getDirectly(key);
   }
   ChildData resultInCache = cache.getCurrentData(key); // 快取中獲取 value
   if (null != resultInCache) {
       return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
   }
   return getDirectly(key);
}

@Override
public String getDirectly(final String key) {
   try {
       return new String(client.getData().forPath(key), Charsets.UTF_8);
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
       return null;
   }
}
  • #get(…) 先從 TreeCache快取 獲取,後從 Zookeeper 獲取。

  • #getDirectly(…) 直接從 Zookeeper 獲取。

  • #findTreeCache(...) 代碼如下:

    private TreeCache findTreeCache(final String key) {
       for (Entry entry : caches.entrySet()) {
           if (key.startsWith(entry.getKey())) {
               return entry.getValue();
           }
       }
       return null;
    }

2.5 獲得註冊子節點

獲取子節點名稱集合(降序)

@Override
public List getChildrenKeys(final String key) {
   try {
       List result = client.getChildren().forPath(key);
       Collections.sort(result, new Comparator() {

           @Override
           public int compare(final String o1, final String o2) {
               return o2.compareTo(o1);
           }
       });
       return result;
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
       return Collections.emptyList();
   }
}

獲取子節點數量

@Override
public int getNumChildren(final String key) {
   try {
       Stat stat = client.checkExists().forPath(key);
       if (null != stat) {
           return stat.getNumChildren();
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   return 0;
}

2.6 儲存註冊資料

@Override
public void persist(final String key, final String value) {
   try {
       if (!isExisted(key)) {
           client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
       } else {
           update(key, value);
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}

@Override
public void persistEphemeral(final String key, final String value) {
   try {
       if (isExisted(key)) {
           client.delete().deletingChildrenIfNeeded().forPath(key);
       }
       client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
  • #persist(…) 儲存持久節點資料。邏輯等價於 insertOrUpdate 操作。

  • persistEphemeral(…) 儲存臨時節點資料。節點型別無法變更,因此如果資料已存在,需要先進行刪除。

  • #isExisted(...)#update(...) 代碼如下:

    @Override
    public boolean isExisted(final String key) {
       try {
           return null != client.checkExists().forPath(key);
       } catch (final Exception ex) {
           RegExceptionHandler.handleException(ex);
           return false;
       }
    }

    @Override
    public void update(final String key, final String value) {
       try {
           client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
       } catch (final Exception ex) {
           RegExceptionHandler.handleException(ex);
       }
    }
    • #update(…) 使用事務校驗鍵( key )存在才進行更新。

2.7 儲存順序註冊資料

實現邏輯和儲存註冊資料類似。Elastic-Job 未使用該方法,跳過。

2.8 移除註冊資料

@Override
public void remove(final String key) {
   try {
       client.delete().deletingChildrenIfNeeded().forPath(key);
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}

2.9 獲取註冊中心當前時間

@Override
public long getRegistryCenterTime(final String key) {
   long result = 0L;
   try {
       persist(key, "");
       result = client.checkExists().forPath(key).getMtime();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   Preconditions.checkState(0L != result, "Cannot get registry center time.");
   return result;
}
  • 通過更新節點,獲得該節點的最後更新時間( mtime )獲得 Zookeeper 的時間。six six six。

2.10 註冊中心異常處理器

RegExceptionHandler,註冊中心異常處理器。在上面的操作 Zookeeper 發生異常時,都會呼叫 RegExceptionHandler.handleException(...) 處理異常:

public static void handleException(final Exception cause) {
   if (null == cause) {
       return;
   }
   if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
       log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
   } else if (cause instanceof InterruptedException) {
       Thread.currentThread().interrupt();
   } else {
       throw new RegException(cause);
   }
}

private static boolean isIgnoredException(final Throwable cause) {
   return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
}
  • 部分異常會被無視,僅打印異常。例如呼叫 #getDirectly(…) 獲得註冊資料時,可能節點不存在,丟擲 NodeExistsException,這種異常可以無視。

3. 作業節點資料訪問類

JobNodeStorage,作業節點資料訪問類。

3.1 在主節點執行操作

// JobNodeStorage.java
/**
* 在主節點執行操作.

@param latchNode 分佈式鎖使用的節點,例如:leader/election/latch
@param callback 執行操作的回呼
*/

public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
   try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
       latch.start();
       latch.await();
       callback.execute();
   } catch (final Exception ex) {
       handleException(ex);
   }
}

Apache Curator 使用 Zookeeper 實現了兩種分佈式鎖,LeaderLatch 是其中的一種。使用一個 Zookeeper 節點路徑創建一個 LeaderLatch,#start() 後,呼叫 #await() 等待拿到這把。如果有多個執行緒執行了相同節點路徑的 LeaderLatch 的 #await() 後,同一時刻有且僅有一個執行緒可以繼續執行,其他執行緒需要等待。當該執行緒釋放( LeaderLatch#close() )後,下一個執行緒可以拿到該繼續執行。用 Java 併發包 Lock 舉例子:

public void executeInLeader(Lock lock) {
    try {
        lock.lock();
        // doSomething();
    } finally {
        lock.unlock();
    }
}

《官方文件 —— LeaderLatch》,有興趣的同學可以看看。在《Elastic-Job-Lite 原始碼解析 —— 主節點選舉》中,我們會看到 #executeInLeader(...) 的使用。

另一種分佈式鎖實現,《官方文件 —— LeaderElection》,有興趣也可以看看。在 Elastic-Job-Cloud 中使用到了,後續進行解析。

3.2 在事務中執行操作

// JobNodeStorage.java
public void executeInTransaction(final TransactionExecutionCallback callback) {
   try {
       CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
       callback.execute(curatorTransactionFinal);
       curatorTransactionFinal.commit();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
  • 開啟事務,執行 TransactionExecutionCallback 回呼邏輯,提交事務。

666. 彩蛋

旁白君:煞筆芋道君,又在水更 
芋道君:人艱不拆,好不好。

道友,趕緊上車,分享一波朋友圈!




如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢

目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing

... 一共 69+ 篇

目前在知識星球更新了《Netty 原始碼解析》目錄如下:

01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap

05. 事件輪詢 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 位元組緩衝區 ByteBuf

09. 通道處理器 ChannelHandler

10. 編解碼 Codec

11. 工具類 Util

... 一共 61+ 篇

目前在知識星球更新了《資料庫物體設計》目錄如下:


01. 商品模塊
02. 交易模塊
03. 營銷模塊
04. 公用模塊

... 一共 17+ 篇

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂