點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文基於 Elastic-Job V2.1.5 版本分享
- 
1. 概述 
- 
2. 基於 Zookeeper 註冊中心 
- 
3. 作業節點資料訪問類 
- 
666. 彩蛋 
1. 概述
本文主要分享 Elastic-Job-Lite 註冊中心。
涉及到主要類的類圖如下( 開啟大圖 ):

- 
黃色的類在 elastic-job-common-core專案裡,為 Elastic-Job-Lite、Elastic-Job-Cloud 公用註冊中心類。
- 
作業節點資料訪問類( JobNodeStorage )的在主節點執行操作、在事務中執行操作兩個方法和註冊中心協調分散式服務有關係,從《Elastic-Job-Lite 原始碼解析 —— 作業資料儲存》摘出來,放本文解析。 
你行好事會因為得到贊賞而愉悅
同理,開源專案貢獻者會因為 Star 而更加有動力
為 Elastic-Job 點贊!傳送門
2. 基於 Zookeeper 註冊中心
ZookeeperRegistryCenter,基於 Zookeeper 註冊中心。從上面的類圖可以看到,ZookeeperRegistryCenter 實現 CoordinatorRegistryCenter 介面,CoordinatorRegistryCenter 繼承 RegistryCenter 介面。
- 
RegistryCenter,註冊中心,定義了簡單的增刪改查註冊資料和查詢時間的介面方法。 
- 
CoordinatorRegistryCenter,用於協調分散式服務的註冊中心,定義了持久節點、臨時節點、持久順序節點、臨時順序節點等目錄服務介面方法,隱性的要求提供事務、分散式鎖、資料訂閱等特性。 
ZookeeperRegistryCenter 使用 Apache Curator 進行 Zookeeper 註冊中心。
2.1 初始化
ZookeeperConfiguration,基於 Zookeeper 的註冊中心配置,註釋完整,點選連結直接檢視。
@Override
public void init() {
   log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
   CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
           .connectString(zkConfig.getServerLists())
           .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
           .namespace(zkConfig.getNamespace()); // 名稱空間
   if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
       builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); // 會話超時時間,預設 60 * 1000 毫秒
   }
   if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
       builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); // 連線超時時間,預設 15 * 1000 毫秒
   }
   // 認證
   if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
       builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
               .aclProvider(new ACLProvider() {
                   @Override
                   public List getDefaultAcl()  {
                       return ZooDefs.Ids.CREATOR_ALL_ACL;
                   }
                   @Override
                   public List getAclForPath(final String path)  {
                       return ZooDefs.Ids.CREATOR_ALL_ACL;
                   }
               });
   }
   client = builder.build();
   client.start();
   // 連線 Zookeeper
   try {
       if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
           client.close();
           throw new KeeperException.OperationTimeoutException();
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
- 
ExponentialBackoffRetry,當 Zookeeper 失去連結後重新連線的一種策略:動態計算每次計算重連的間隔,時間間隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))。如果對其它重連策略感興趣,可以看 RetryPolicy 的實現類,本文就不展開了。
- 
相同的作業叢集使用相同的 Zookeeper 名稱空間( ZookeeperConfiguration.namespace)。
2.2 快取
透過 Curator TreeCache 實現監控整個樹( Zookeeper目錄 )的資料訂閱和快取,包括節點的狀態,子節點的狀態。
初始化作業快取
作業初始化註冊時,初始化快取。
// JobRegistry.java
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
   schedulerMap.put(jobName, jobScheduleController);
   regCenterMap.put(jobName, regCenter);
   // 新增註冊中心快取
   regCenter.addCacheData("/" + jobName);
}
// ZookeeperRegistryCenter.java
/**
* 快取
* key:/作業名/
*/
private final Map caches = new HashMap<>();
 作業服務訂閱資料
每個不同的服務,都會訂閱資料實現功能邏輯。在後續不同服務的文章,我們會詳細解析。?
public void addDataListener(final TreeCacheListener listener) {
   TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
   cache.getListenable().addListener(listener);
}
關閉作業快取
@Override
public void evictCacheData(final String cachePath) {
   TreeCache cache = caches.remove(cachePath + "/");
   if (null != cache) {
       cache.close();
   }
}
對 Curator TreeCache 感興趣的同學,可以點選連結繼續瞭解。
2.3 關閉
public void close() {
   for (Entry each : caches.entrySet()) {
       each.getValue().close();
   }
   waitForCacheClose();
   CloseableUtils.closeQuietly(client);
}
/* 
* 因為非同步處理, 可能會導致client先關閉而cache還未關閉結束.
* 等待Curator新版本解決這個bug.
* BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
*/
private void waitForCacheClose() {
   try {
       Thread.sleep(500L); // 等待500ms, cache先關閉再關閉client, 否則會拋異常
   } catch (final InterruptedException ex) {
       Thread.currentThread().interrupt();
   }
}
 2.4 獲得資料
@Override
public String get(final String key) {
   TreeCache cache = findTreeCache(key); // 獲取快取
   if (null == cache) {
       return getDirectly(key);
   }
   ChildData resultInCache = cache.getCurrentData(key); // 快取中獲取 value
   if (null != resultInCache) {
       return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
   }
   return getDirectly(key);
}
@Override
public String getDirectly(final String key) {
   try {
       return new String(client.getData().forPath(key), Charsets.UTF_8);
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
       return null;
   }
}
- 
#get(…)先從 TreeCache快取 獲取,後從 Zookeeper 獲取。
- 
#getDirectly(…)直接從 Zookeeper 獲取。
- 
#findTreeCache(...)程式碼如下:private TreeCache findTreeCache(final String key) {
 for (Entryentry : caches.entrySet()) { 
 if (key.startsWith(entry.getKey())) {
 return entry.getValue();
 }
 }
 return null;
 }
2.5 獲得註冊子節點
獲取子節點名稱集合(降序)
@Override
public List getChildrenKeys(final String key)  {
   try {
       List result = client.getChildren().forPath(key);
       Collections.sort(result, new Comparator() {
           @Override
           public int compare(final String o1, final String o2) {
               return o2.compareTo(o1);
           }
       });
       return result;
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
       return Collections.emptyList();
   }
}
  獲取子節點數量
@Override
public int getNumChildren(final String key) {
   try {
       Stat stat = client.checkExists().forPath(key);
       if (null != stat) {
           return stat.getNumChildren();
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   return 0;
}
2.6 儲存註冊資料
@Override
public void persist(final String key, final String value) {
   try {
       if (!isExisted(key)) {
           client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
       } else {
           update(key, value);
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
@Override
public void persistEphemeral(final String key, final String value) {
   try {
       if (isExisted(key)) {
           client.delete().deletingChildrenIfNeeded().forPath(key);
       }
       client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
- 
#persist(…)儲存持久節點資料。邏輯等價於 insertOrUpdate 操作。
- 
persistEphemeral(…)儲存臨時節點資料。節點型別無法變更,因此如果資料已存在,需要先進行刪除。
- 
#isExisted(...)、#update(...)程式碼如下:@Override
 public boolean isExisted(final String key) {
 try {
 return null != client.checkExists().forPath(key);
 } catch (final Exception ex) {
 RegExceptionHandler.handleException(ex);
 return false;
 }
 }
 @Override
 public void update(final String key, final String value) {
 try {
 client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
 } catch (final Exception ex) {
 RegExceptionHandler.handleException(ex);
 }
 }
- 
#update(…)使用事務校驗鍵( key )存在才進行更新。
2.7 儲存順序註冊資料
實現邏輯和儲存註冊資料類似。Elastic-Job 未使用該方法,跳過。
2.8 移除註冊資料
@Override
public void remove(final String key) {
   try {
       client.delete().deletingChildrenIfNeeded().forPath(key);
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
2.9 獲取註冊中心當前時間
@Override
public long getRegistryCenterTime(final String key) {
   long result = 0L;
   try {
       persist(key, "");
       result = client.checkExists().forPath(key).getMtime();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   Preconditions.checkState(0L != result, "Cannot get registry center time.");
   return result;
}
- 
透過更新節點,獲得該節點的最後更新時間( mtime)獲得 Zookeeper 的時間。six six six。
2.10 註冊中心異常處理器
RegExceptionHandler,註冊中心異常處理器。在上面的操作 Zookeeper 發生異常時,都會呼叫 RegExceptionHandler.handleException(...) 處理異常:
public static void handleException(final Exception cause) {
   if (null == cause) {
       return;
   }
   if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
       log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
   } else if (cause instanceof InterruptedException) {
       Thread.currentThread().interrupt();
   } else {
       throw new RegException(cause);
   }
}
private static boolean isIgnoredException(final Throwable cause) {
   return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
}
- 
部分異常會被無視,僅列印異常。例如呼叫 #getDirectly(…)獲得註冊資料時,可能節點不存在,丟擲 NodeExistsException,這種異常可以無視。
3. 作業節點資料訪問類
JobNodeStorage,作業節點資料訪問類。
3.1 在主節點執行操作
// JobNodeStorage.java
/**
* 在主節點執行操作.
* 
* @param latchNode 分散式鎖使用的節點,例如:leader/election/latch
* @param callback 執行操作的回呼
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
   try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
       latch.start();
       latch.await();
       callback.execute();
   } catch (final Exception ex) {
       handleException(ex);
   }
}
Apache Curator 使用 Zookeeper 實現了兩種分散式鎖,LeaderLatch 是其中的一種。使用一個 Zookeeper 節點路徑建立一個 LeaderLatch,#start() 後,呼叫 #await() 等待拿到這把鎖。如果有多個執行緒執行了相同節點路徑的 LeaderLatch 的 #await() 後,同一時刻有且僅有一個執行緒可以繼續執行,其他執行緒需要等待。當該執行緒釋放( LeaderLatch#close() )後,下一個執行緒可以拿到該鎖繼續執行。用 Java 併發包 Lock 舉例子:
public void executeInLeader(Lock lock) {
    try {
        lock.lock();
        // doSomething();
    } finally {
        lock.unlock();
    }
}
《官方檔案 —— LeaderLatch》,有興趣的同學可以看看。在《Elastic-Job-Lite 原始碼解析 —— 主節點選舉》中,我們會看到 #executeInLeader(...) 的使用。
另一種分散式鎖實現,《官方檔案 —— LeaderElection》,有興趣也可以看看。在 Elastic-Job-Cloud 中使用到了,後續進行解析。
3.2 在事務中執行操作
// JobNodeStorage.java
public void executeInTransaction(final TransactionExecutionCallback callback) {
   try {
       CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
       callback.execute(curatorTransactionFinal);
       curatorTransactionFinal.commit();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
- 
開啟事務,執行 TransactionExecutionCallback 回呼邏輯,提交事務。 
666. 彩蛋
旁白君:煞筆芋道君,又在水更 
芋道君:人艱不拆,好不好。
道友,趕緊上車,分享一波朋友圈!
如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢:

目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:
01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽
05. 拓展機制 SPI
06. 執行緒池
07. 服務暴露 Export
08. 服務取用 Refer
09. 註冊中心 Registry
10. 動態編譯 Compile
11. 動態代理 Proxy
12. 服務呼叫 Invoke
13. 呼叫特性
14. 過濾器 Filter
15. NIO 伺服器
16. P2P 伺服器
17. HTTP 伺服器
18. 序列化 Serialization
19. 叢集容錯 Cluster
20. 優雅停機
21. 日誌適配
22. 狀態檢查
23. 監控中心 Monitor
24. 管理中心 Admin
25. 運維命令 QOS
26. 鏈路追蹤 Tracing
... 一共 69+ 篇
目前在知識星球更新了《Netty 原始碼解析》目錄如下:
01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap
05. 事件輪詢 EventLoop
06. 通道管道 ChannelPipeline
07. 通道 Channel
08. 位元組緩衝區 ByteBuf
09. 通道處理器 ChannelHandler
10. 編解碼 Codec
11. 工具類 Util
... 一共 61+ 篇
目前在知識星球更新了《資料庫物體設計》目錄如下:
01. 商品模組
02. 交易模組
03. 營銷模組
04. 公用模組
... 一共 17+ 篇
原始碼不易↓↓↓↓↓
點贊支援老艿艿↓↓
 知識星球
知識星球