歡迎光臨
每天分享高質量文章

作業排程中介軟體 Elastic-Job-Cloud 原始碼分析 —— 高可用

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. Scheduler 叢集
  • 3. Scheduler 部署
  • 4. Scheduler 故障轉移
  • 5. Scheduler 資料儲存
    • 5.1 RunningService
    • 5.2 ProducerManager
    • 5.3 TaskScheduler
  • 6. Mesos Master 崩潰
  • 7. Mesos Slave 崩潰
  • 8. Scheduler 核對
  • 666. 彩蛋

1. 概述

本文主要分享 Elastic-Job-Cloud 高可用

一個高可用的 Elastic-Job-Cloud 組成如下圖:

  • Mesos Master 叢集
  • Mesos Slave 叢集
  • Zookeeper 叢集
  • Elastic-Job-Cloud-Scheduler 叢集
  • Elastic-Job-Cloud-Executor 叢集

本文重點分享 Elastic-Job-Cloud-Scheduler 如何實現高可用。

Mesos Master / Mesos Slave / Zookeeper 高可用,同學們可以自行 Google 解決。Elastic-Job-Cloud-Executor 執行在 Mesos Slave 上,透過 Mesos Slave 叢集多節點實現高可用。

你行好事會因為得到贊賞而愉悅
同理,開源專案貢獻者會因為 Star 而更加有動力
為 Elastic-Job 點贊!傳送門

2. Scheduler 叢集

Elastic-Job-Cloud-Scheduler 透過至少兩個節點實現叢集。叢集中透過主節點選舉一個主節點,只有主節點提供服務,從實體處於”待命”狀態。當主節點故障時,從節點會選舉出新的主節點繼續提供服務。實現程式碼如下:

public final class Bootstrap {

    public static void main(final String[] args) throws InterruptedException {
        // 初始化 註冊中心
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(BootstrapEnvironment.getInstance().getZookeeperConfiguration());
        regCenter.init();
        // 初始化 Zookeeper 選舉服務
        final ZookeeperElectionService electionService = new ZookeeperElectionService(
                BootstrapEnvironment.getInstance().getFrameworkHostPort(), (CuratorFramework) regCenter.getRawClient(), HANode.ELECTION_NODE, new SchedulerElectionCandidate(regCenter));
        electionService.start();
        // 掛起 主行程
        final CountDownLatch latch = new CountDownLatch(1);
        latch.await();
        // Hook 貌似位置不對?
        Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {

            @Override
            public void run() {
                electionService.stop();
                latch.countDown();
            }
        });
    }
}
  • Bootstrap,Elastic-Job-Cloud-Scheduler 啟動器(彷彿在說廢話)。
  • CoordinatorRegistryCenter,用於協調分散式服務的註冊中心,在《Elastic-Job-Lite 原始碼分析 —— 註冊中心》有詳細解析。
  • ZookeeperElectionService,Zookeeper 選舉服務,本小節的主角。
  • ShutdownHook 關閉行程鉤子,程式碼放置的位置不對,需要放在 CountDownLatch#await() 方法上面。目前實際不影響使用。

呼叫 ZookeeperElectionService#start() 方法,初始化 Zookeeper 選舉服務以實現 Elastic-Job-Cloud-Scheduler 主節點選舉。

private final CountDownLatch leaderLatch = new CountDownLatch(1);

private final LeaderSelector leaderSelector;

public ZookeeperElectionService(final String identity, final CuratorFramework client, final String electionPath, final ElectionCandidate electionCandidate) {
   // 建立 LeaderSelector
   leaderSelector = new LeaderSelector(client, electionPath, new LeaderSelectorListenerAdapter() {

       @Override
       public void takeLeadership(final CuratorFramework client) throws Exception {
           // ... 省略【暫時】無關程式碼
       }
   });
   // 設定重覆參與選舉主節點
   leaderSelector.autoRequeue();
   // 設定參與節點的編號
   leaderSelector.setId(identity);
}

/**
* 開始選舉.
*/

public void start() {
   log.debug("Elastic job: {} start to elect leadership", leaderSelector.getId());
   leaderSelector.start();
}
  • 透過 Apache Curator LeaderSelector 實現分散式多節點選舉。

    FROM https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/leader/LeaderSelector.html
    Abstraction to select a “leader” amongst multiple contenders in a group of JMVs connected to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will be assigned leader until it releases leadership at which time another one from the group will be chosen.
    Note that this class uses an underlying InterProcessMutex and as a result leader election is “fair” – each user will become leader in the order originally requested (from ZK’s point of view).

  • 呼叫 LeaderSelector#autoRequeue() 方法,設定重覆參與選舉主節點。預設情況下,自己選舉成為主節點後,不再參與下次選舉。設定重覆參與選舉主節點後,每次選舉都會參與。在 Elastic-Job-Cloud-Scheduler 裡,我們顯然要重覆參與選舉。

  • 呼叫 LeaderSelector#setId() 方法,設定參與節點的編號。在 Elastic-Job-Cloud-Scheduler 裡暫時沒有實際用途。編號演演算法為 BootstrapEnvironment.getInstance().getFrameworkHostPort(),即:HOST:PORT

  • 呼叫 #start() 方法,開始選舉。當自己選舉主節點成功,回呼 LeaderSelector#takeLeadership() 方法。

回呼 LeaderSelector#takeLeadership() 方法,Elastic-Job-Cloud-Scheduler 主節點開始領導狀態。實現程式碼如下:

// ZookeeperElectionService.LeaderSelector 內部實現類
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
    log.info("Elastic job: {} has leadership", identity);
    try {
        // 開始領導狀態
        electionCandidate.startLeadership();
        // 掛起 行程
        leaderLatch.await();
        log.warn("Elastic job: {} lost leadership.", identity);
        // 終止領導狀態
        electionCandidate.stopLeadership();
    } catch (final JobSystemException exception) {
        // 異常退出
        log.error("Elastic job: Starting error", exception);
        System.exit(1);  
    }
}
  • 呼叫 SchedulerElectionCandidate#startLeadership() 方法,開始領導狀態。實現程式碼如下:

    // SchedulerElectionCandidate.java
    public final class SchedulerElectionCandidate implements ElectionCandidate {
    private final CoordinatorRegistryCenter regCenter;

    private SchedulerService schedulerService;

    public SchedulerElectionCandidate(final CoordinatorRegistryCenter regCenter) {
        this.regCenter = regCenter;
    }

    @Override
    public void startLeadership() throws Exception {
        try {
            schedulerService = new SchedulerService(regCenter);
            schedulerService.start();
        } catch (final Throwable throwable) {
            throw new JobSystemException(throwable);
        }
    }

    }

    // SchedulerService.java
    /**
    * 以守護行程方式啟動.
    */

    public void start() {
       facadeService.start();
       producerManager.startup();
       statisticManager.startup();
       cloudJobConfigurationListener.start();
       taskLaunchScheduledService.startAsync();
       restfulService.start();
       schedulerDriver.start();
       if (env.getFrameworkConfiguration().isEnabledReconcile()) {
           reconcileService.startAsync();
       }
    }

    • 當 Elastic-Job-Cloud-Scheduler 主節點呼叫 SchedulerService#start() 方法後,各種服務初始化完成,特別是和 Mesos Master 的連線,可以愉快的進行作業排程等等服務。
    • Elastic-Job-Cloud-Scheduler 從節點,因為無法回呼 LeaderSelector#takeLeadership() 方法,處於”待命”狀態。當主節點故障時,從節點會選舉出新的主節點,觸發 LeaderSelector#takeLeadership() 方法回呼,繼續提供服務。
  • 呼叫 CountLatch#await() 方法,掛起主節點 LeaderSelector#takeLeadership() 方法繼續向下執行。為什麼要進行掛起?如果呼叫完該方法,主節點就會讓出主節點身份,這樣會導致 Elastic-Job-Cloud-Scheduler 叢集不斷不斷不斷更新主節點,無法正常提供服務。

  • 當 Elastic-Job-Cloud-Scheduler 主節點關閉時,觸發上文程式碼看到的 ShutdownHook ,關閉服務。實現程式碼如下:

    // Bootstrap.java
    public final class Bootstrap {
    public static void main(final String[] args) throws InterruptedException {
        // ... 省略無關程式碼
        Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {

            @Override
            public void run() {
                // 停止選舉
                electionService.stop();
                latch.countDown();
            }
        });
    }

    }

    • 呼叫 ElectionService#stop() 方法,停止選舉,從而終止領導狀態,關閉各種服務。實現程式碼如下:

      // ZookeeperElectionService.java
      public void stop() {
          log.info("Elastic job: stop leadership election");
          // 結束 #takeLeadership() 方法的行程掛起
          leaderLatch.countDown();
          try {
              // 關閉 LeaderSelector
              leaderSelector.close();
          } catch (final Exception ignored) {
          }
      }

      // SchedulerElectionCandidate.java
      @Override
      public void stopLeadership() {
          schedulerService.stop();
      }

      // SchedulerService.java
      /**
       * 停止執行.
       */

      public void stop() {
          restfulService.stop();
          taskLaunchScheduledService.stopAsync();
          cloudJobConfigurationListener.stop();
          statisticManager.shutdown();
          producerManager.shutdown();
          schedulerDriver.stop(true);
          facadeService.stop();
          if (env.getFrameworkConfiguration().isEnabledReconcile()) {
              reconcileService.stopAsync();
          }
      }
  • 當發生 JobSystemException 異常時,即呼叫 SchedulerElectionCandidate#startLeadership() 方法發生異常( SchedulerElectionCandidate#stopLeadership() 實際不會丟擲異常 ),呼叫 System.exit(1) 方法,Elastic-Job-Cloud-Scheduler 主節點異常崩潰

    • 目前猜測可能有種情況會導致異常崩潰。(1)一個 Elastic-Job-Cloud-Scheduler 叢集有兩個節點 A / B,透過選舉 A 成為主節點;(2)突然 Zookeeper 叢集崩潰,恢復後,A 節點選舉恰好又成為主節點,因為未呼叫 SchedulerElectionCandidate#stopLeadership() 關閉原來的各種服務,導致再次呼叫 SchedulerElectionCandidate#startLeadership() 會發生異常,例如說 RestfulService 服務,需要佔用一個埠提供服務,重新初始化,會發生埠衝突丟擲異常。筆者嘗試模擬,透過一個 Elastic-Job-Cloud-Scheduler + Zookeeper 的情況,能夠觸發該情況,步驟如下:(1)Zookeeper 啟動;(2)Elastic-Job-Cloud-Scheduler 啟動,選舉成為主節點,正常初始化;(3)重啟 Zookeeper;(4)Elastic-Job-Cloud-Scheduler 再次選舉成為主節點,因為 RestfulService 埠衝突異常初始化崩潰。如果真出現這種情況怎麼辦呢?在「3. Scheduler 部署」揭曉答案。

Elastic-Job-Lite 在主節點選舉實現方式上略有不同,有興趣的同學可以看下《Elastic-Job-Lite 原始碼分析 —— 主節點選舉》的實現。

3. Scheduler 部署

比較容易想到的一種方式,選擇多臺主機部署 Elastic-Job-Cloud-Executor 多個節點。

But…… 我們要想下,Elastic-Job-Cloud-Executor 執行在 Mesos 之上,可以使用上 Mesos 的資源排程和部署服務。引入 Mesos 上著名的框架 Marathon。它可以帶來所有後臺行程( 例如,Elastic-Job-Cloud-Executor )能夠執行在任意機器上,Marathon 會在後臺已有實體失敗時,自動啟動新實體的好處。是不是很贊 +1024 ?!

FROM 《Mesos 框架構建分散式應用》 P47
Mesos 集群裡的常見方案是在 Marathon 上執行叢集的 Mesos 框架。但是 Marathon 本身就是一種 Mesos 的框架!那麼在 Marathon 上執行 Mesos 框架意味著什麼呢?不用考慮如何將每種框架的排程器部署到特定的主機上並且處理這些主機的故障,Marathon 能夠確保框架的排程器總是在集群裡的某處執行著。這樣大幅簡化了在高可用配置裡部署新框架的複雜度。

嗯…… 當然,Marathon 我們也要做高可用。

? Marathon 原來中文是馬拉松。哈哈哈,很適合的名字。

4. Scheduler 故障轉移

當原有 Elastic-Job-Cloud-Scheduler 主節點崩潰時,從節點重新進行主節點選舉,完成故障轉移。那麼此時會有一個問題,新主節點如何接管已經在執行中的 Elastic-Job-Cloud-Executer 們呢?

第一種方案,關閉原有的所有 Elastic-Job-Cloud-Executor 們,然後重新排程啟動。顯然,這個方式太過暴力。如果有些作業任務執行時間較長,直接中斷不是很友好。再比如,Elastic-Job-Cloud-Scheduler 節點需要進行升級,也關閉 Elastic-Job-Cloud-Executor,也不合理,和使用高可用性叢集作業系統的初衷是背離的。該方案,不推薦

第二種方案,重用原主節點的 Mesos FrameworkID。原理如下:

FROM 《Mesos 框架構建分散式應用》 P72
在 Mesos 裡,排程器由其 FrameworkID、FrameworkInfo 裡的可選值唯一確定。FrameworkID 必須由 Mesos 分配,從而確保對於每個框架來說該值是唯一確定的。現在,需要在分配 FrameworkID 時儲存該值,這樣未來的主實體才可以重用該值。

在 Elastic-Job-Cloud-Scheduler 使用註冊中心( Zookeeper ) 的持久資料節點 /${NAMESPACE}/ha/framework_id 儲存 FrameworkID,儲存值為 ${FRAMEWORK_ID}。使用 zkClient 檢視如下:

[zk: localhost:2181(CONNECTED) 1] get /elastic-job-cloud/ha/framework_id
d31e7faa-aa72-4d0a-8941-512984d5af49-0001

呼叫 SchedulerService#getSchedulerDriver() 方法,初始化 Mesos Scheduler Driver 時,從 Zookeeper 獲取是否已經存在 FrameworkID。實現程式碼如下:

// SchedulerService.java
private SchedulerDriver getSchedulerDriver(final TaskScheduler taskScheduler, final JobEventBus jobEventBus, final FrameworkIDService frameworkIDService) {
   // 獲取 FrameworkID
   Optional frameworkIDOptional = frameworkIDService.fetch();
   Protos.FrameworkInfo.Builder builder = Protos.FrameworkInfo.newBuilder();
   // 如果存在,設定 FrameworkID
   if (frameworkIDOptional.isPresent()) {
       builder.setId(Protos.FrameworkID.newBuilder().setValue(frameworkIDOptional.get()).build());
   }
   // ... 省略無關程式碼
   Protos.FrameworkInfo frameworkInfo = builder.setUser(mesosConfig.getUser()).setName(frameworkName)
                .setHostname(mesosConfig.getHostname())
                .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT_SECONDS)
                .setWebuiUrl(WEB_UI_PROTOCOL + env.getFrameworkHostPort()).setCheckpoint(true).build();
   // ... 省略無關程式碼
}
  • 呼叫 FrameworkIDService#fetch() 方法,從註冊中心獲取 FrameworkID 。實現程式碼如下:

    public Optional fetch() {
       String frameworkId = regCenter.getDirectly(HANode.FRAMEWORK_ID_NODE);
       return Strings.isNullOrEmpty(frameworkId) ? Optional.absent() : Optional.of(frameworkId);
    }
  • 呼叫 Protos.FrameworkInfo.Builder#setId(…) 方法,當 FrameworkID 存在時,設定 FrameworkID。

  • 呼叫 Protos.FrameworkInfo.Builder#setFailoverTimeout(…) 方法,設定 Scheduler 最大故障轉移時間,即 FrameworkID 過期時間。Elastic-Job-Cloud-Scheduler 預設設定一週。

當 Elastic-Job-Cloud-Scheduler 叢集第一次初始化,上面的邏輯顯然獲取不到 FrameworkID,在向 Mesos Master 初始化成功後,回呼 SchedulerEngine#registered(...) 方法進行儲存,實現程式碼如下:

// SchedulerEngine.java
public final class SchedulerEngine implements Scheduler {

    @Override
    public void registered(final SchedulerDriver schedulerDriver, final Protos.FrameworkID frameworkID, final Protos.MasterInfo masterInfo) {
        log.info("call registered");
        // 儲存FrameworkID
        frameworkIDService.save(frameworkID.getValue());
        // 過期 TaskScheduler Lease
        taskScheduler.expireAllLeases();
        // 註冊 Mesos Master 資訊
        MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
    }

}

// FrameworkIDService.java
public void save(final String id) {
   if (!regCenter.isExisted(HANode.FRAMEWORK_ID_NODE)) { // 不存在才儲存
       regCenter.persist(HANode.FRAMEWORK_ID_NODE, id);
   }
}

5. Scheduler 資料儲存

新的 Elastic-Job-Cloud-Scheduler 主節點在故障轉移,不僅僅接管 Elastic-Job-Cloud-Executor,還需要接管資料儲存

Elastic-Job-Cloud-Executor 使用註冊中心( Zookeeper )儲存資料。資料儲存分成兩部分:

  • config,雲作業應用配置、雲作業配置。
  • state,作業狀態資訊。

整體如下圖:

Elastic-Job-Cloud-Scheduler 各個服務根據資料儲存啟動初始化。下麵來看看依賴資料儲存進行初始化的服務程式碼實現。

5.1 RunningService

RunningService,任務執行時服務。呼叫 RunningService#start() 方法,啟動任務執行佇列。實現程式碼如下:

public final class RunningService {

    /**
     * 執行中作業對映
     * key:作業名稱
     * value:任務執行時背景關係集合
     */

    @Getter
    private static final ConcurrentHashMap> RUNNING_TASKS = new ConcurrentHashMap<>(TASK_INITIAL_SIZE);

    public void start() {
        clear();
        List jobKeys = regCenter.getChildrenKeys(RunningNode.ROOT);
        for (String each : jobKeys) {
            // 從執行中佇列移除不存在配置的作業任務
            if (!configurationService.load(each).isPresent()) {
                remove(each);
                continue;
            }
            // 新增 執行中作業對映
            RUNNING_TASKS.put(each, Sets.newCopyOnWriteArraySet(Lists.transform(regCenter.getChildrenKeys(RunningNode.getRunningJobNodePath(each)), new Function() {

                @Override
                public TaskContext apply(final String input) {
                    return TaskContext.from(regCenter.get(RunningNode.getRunningTaskNodePath(TaskContext.MetaInfo.from(input).toString())));
                }
            })));
        }
    }
}
  • 因為執行中作業對映( RUNNING_TASKS )使用的頻次很多,Elastic-Job-Cloud-Scheduler 快取在記憶體中。每次初始化時,使用從資料儲存執行中作業佇列載入到記憶體。

  • 這裡我們在看下執行中作業佇列的新增( #add() )方法,實現程式碼如下:

    public void add(final TaskContext taskContext) {
       if (!configurationService.load(taskContext.getMetaInfo().getJobName()).isPresent()) {
           return;
       }
       // 新增到執行中的任務集合
       getRunningTasks(taskContext.getMetaInfo().getJobName()).add(taskContext);
       // 判斷是否為常駐任務
       if (!isDaemon(taskContext.getMetaInfo().getJobName())) {
           return;
       }
       // 新增到執行中佇列
       String runningTaskNodePath = RunningNode.getRunningTaskNodePath(taskContext.getMetaInfo().toString());
       if (!regCenter.isExisted(runningTaskNodePath)) {
           regCenter.persist(runningTaskNodePath, taskContext.getId());
       }
    }
    • 執行中作業佇列只儲存常駐作業的任務。所以瞬時作業,在故障轉移時,可能存在相同作業相同分片任務同時排程執行。舉個慄子?,Elastic-Job-Cloud-Scheduler 叢集有兩個節點 A( 主節點 ) / B( 從節點 ),(1)A 節點每 5 分鐘排程一次瞬時作業任務 T ,T 每次執行消耗時間實際超過 5 分鐘( 先不要考慮是否合理 )。(2)A 節點崩潰,B 節點成為主節點,5 分鐘後排程 T 作業,因為執行中作業佇列只儲存常駐作業的任務,恢復後的 RUNNING_TASKS 不存在該作業任務,因此可以排程 T 作業,實際 T 作業正在 Elastic-Job-Cloud-Executor 執行中。

5.2 ProducerManager

ProducerManager,釋出任務作業排程管理器。呼叫 ProducerManager#startup() 方法,啟動作業排程器。實現程式碼如下:

public final class ProducerManager {

    public void startup() {
        log.info("Start producer manager");
        // 釋出瞬時作業任務的排程器
        transientProducerScheduler.start();
        // 初始化排程作業
        for (CloudJobConfiguration each : configService.loadAll()) {
            schedule(each);
        }
    }

}
  • 呼叫 ConfigService#loadAll() 方法,從資料儲存讀取所有作業配置。
  • 呼叫 #schedule() 方法,初始化排程作業。
    • 瞬時作業,在 Elastic-Job-Cloud-Scheduler 計時排程,類似每 XX 秒 / 分 / 時 / 天之類的作業需要重新計時,這個請註意。
    • 常駐作業,在 Elastic-Job-Cloud-Executor 計時排程,暫無影響。
    • 在《Elastic-Job-Cloud 原始碼分析 —— 作業排程(一)》「3. Producer 釋出任務」有詳細解析。

5.3 TaskScheduler

TaskScheduler,Fenzo 作業排程器,根據 Mesos Offer 和作業任務的最佳化分配。因為其分配是依賴當前實際 Mesos Offer 和 作業任務執行的情況,猜測可能對最佳化分配有影響,但不影響正確性。筆者對 TaskScheduler 瞭解不是很深入,僅僅作為猜測。

在《Elastic-Job-Cloud 原始碼分析 —— 作業排程(一)》「4.1」「4.2」「4.3」有和 TaskScheduler 相關的內容解析。

6. Mesos Master 崩潰

Mesos Master 叢集,Mesos Master 主節點崩潰後,Mesos Master 叢集重新選舉後,Scheduler、Mesos Slave 從 Zookeeper 獲取到最新的 Mesos Master 主節點重新進行註冊,不影響 Scheduler 、Mesos Slave 、任務執行。

呼叫 SchedulerService#getSchedulerDriver(...) 方法,設定 SchedulerDriver 從 Mesos Zookeeper Address 讀取當前 Mesos Master 地址,實現程式碼如下:

// SchedulerService.java
private SchedulerDriver getSchedulerDriver(final TaskScheduler taskScheduler, final JobEventBus jobEventBus, final FrameworkIDService frameworkIDService) {
    // ... 省略無關程式碼
    MesosConfiguration mesosConfig = env.getMesosConfiguration();
    return new MesosSchedulerDriver(new SchedulerEngine(taskScheduler, facadeService, jobEventBus, frameworkIDService, statisticManager), frameworkInfo, mesosConfig.getUrl() // Mesos Master URL
    );
}

// MesosSchedulerDriver.java
public MesosSchedulerDriver(Scheduler scheduler,
                            FrameworkInfo framework,
                            String master)
 
{
    // ... 省略無關程式碼     
}
  • MesosSchedulerDriver 構造方法第三個引數 master,代表 Mesos 使用的 Zookeeper 地址,例如:zk://127.0.0.1:2181/mesos。生產環境請配置多 Zookeeper 節點,例如:zk://host1:port1,host2:port2,…/path

  • 使用 zkClient 檢視如下:

    [zk: localhost:2181(CONNECTED) 10] ls /mesos
    [log_replicas, json.info_0000000017]
    [zk: localhost:2181(CONNECTED) 11] get /mesos/json.info_0000000017
    {"address":{"hostname":"localhost","ip":"127.0.0.1","port":5050},"hostname":"localhost","id":"685fe32d-e30c-4df7-b891-3d96b06fee88","ip":16777343,"pid":"master@127.0.0.1:5050","port":5050,"version":"1.4.0"}

Elastic-Job-Cloud-Scheduler 註冊上、重新註冊上、斷開 Mesos Master 實現程式碼如下:

public final class SchedulerEngine implements Scheduler {

    @Override
    public void registered(final SchedulerDriver schedulerDriver, final Protos.FrameworkID frameworkID, final Protos.MasterInfo masterInfo) {
        log.info("call registered");
        // ... 省略無關程式碼
        // 註冊 Mesos Master 資訊
        MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
    }

    @Override
    public void reregistered(final SchedulerDriver schedulerDriver, final Protos.MasterInfo masterInfo) {
        // ... 省略無關程式碼
        // 註冊 Mesos Master 資訊
        MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
    }

    @Override
    public void disconnected(final SchedulerDriver schedulerDriver) {
        log.warn("call disconnected");
        MesosStateService.deregister();
    }
}
  • MesosStateService,Mesos狀態服務,提供呼叫 Mesos Master API 服務,例如獲取所有執行器。

  • 呼叫 MesosStateService#register(...) 方法,註冊 Mesos Master 資訊,實現程式碼如下:

    public class MesosStateService {
    private static String stateUrl;

    public static synchronized void register(final String hostName, final int port) {
        stateUrl = String.format("http://%s:%d/state", hostName, port);
    }

    }

  • 呼叫 MesosStateService#deregister(...) 方法,登出 Mesos Master 資訊,實現程式碼如下:

    public static synchronized void deregister() {
        stateUrl = null;
    }

《Mesos 框架構建分散式應用》P110 如何處理 master 的故障,有興趣的同學也可以仔細看看。

7. Mesos Slave 崩潰

在《Elastic-Job-Cloud 原始碼分析 —— 作業失效轉移》中,搜尋關鍵字 “TASK_LOST”,有 Mesos Slave 崩潰後,對 Elastic-Job-Cloud-Scheduler 和 Elastic-Job-Cloud-Executor 的影響。

《Mesos 框架構建分散式應用》P109 如何處理 slave 的故障,有興趣的同學也可以仔細看看。

8. Scheduler 核對

FROM http://mesos.apache.org/documentation/latest/reconciliation/
Messages between framework schedulers and the Mesos master may be dropped due to failures and network partitions. This may cause a framework scheduler and the master to have different views of the current state of the cluster. For example, consider a launch task request sent by a framework. There are many ways that failures can prevent the task launch operation from succeeding, such as:

  • Framework fails after persisting its intent to launch the task, but before the launch task message was sent.
  • Master fails before receiving the message.
  • Master fails after receiving the message but before sending it to the agent.

透過核對特性解決這個問題。核對是協調器如何和 Mesos Master 一起檢查排程器所認為的叢集狀態是否和 Mesos Master 所認為的叢集狀態完成匹配。

呼叫 SchedulerDriver#reconcileTasks(...) 方法,查詢任務狀態。程式碼介面如下:

public interface SchedulerDriver {
    Status reconcileTasks(Collection statuses);
}
  • 只能查詢非終止狀態( non-terminal )的任務。核對的主要原因,確認任務是否還在執行,或者已經進入了中斷狀態。
    • terminal:TASK_ERROR、TASK_FAILED、TASK_FINISHED、TASK_KILLED
    • non-terminal:TASK_DROPPED、TASK_GONE、TASK_GONE_BY_OPERATOR、TASK_KILLING、TASK_LOST、TASK_RUNNING、TASK_STAGING、TASK_STARTING、TASK_UNREACHABLE、TASK_UNKNOWN
  • 當 statuses 非空時,顯示查詢,透過回呼 Scheduler#statusUpdate(…) 方法非同步傳回指定的任務的狀態。
  • 當 statuses 為空時,隱式查詢,透過回呼 Scheduler#statusUpdate(…) 方法非同步傳回全部的任務的狀態。

ReconcileService,核對 Mesos 與 Scheduler 之間的任務狀態。實現程式碼如下:

public class ReconcileService extends AbstractScheduledService {

    private final ReentrantLock lock = new ReentrantLock();

    @Override
    protected void runOneIteration() throws Exception {
        lock.lock();
        try {
            explicitReconcile();
            implicitReconcile();
        } finally {
            lock.unlock();
        }
    }

    @Override
    protected Scheduler scheduler() {
        FrameworkConfiguration configuration = BootstrapEnvironment.getInstance().getFrameworkConfiguration();
        return Scheduler.newFixedDelaySchedule(configuration.getReconcileIntervalMinutes(), configuration.getReconcileIntervalMinutes(), TimeUnit.MINUTES);
    }
}
  • 透過配置 FrameworkConfiguration#reconcileIntervalMinutes 設定,每隔多少分鐘執行一次核對。若配置時間大於 0 才開啟任務狀態核對功能。

  • 呼叫 #explicitReconcile() 方法,查詢執行中的任務。實現程式碼如下:

    public void explicitReconcile() {
       lock.lock();
       try {
           // 獲取執行中的作業任務背景關係集合
           Set runningTask = new HashSet<>();
           for (Set each : facadeService.getAllRunningTasks().values()) {
               runningTask.addAll(each);
           }
           if (runningTask.isEmpty()) {
               return;
           }
           log.info("Requesting {} tasks reconciliation with the Mesos master", runningTask.size());
           // 查詢指定任務
           schedulerDriver.reconcileTasks(Collections2.transform(runningTask, new Function() {
               @Override
               public Protos.TaskStatus apply(final TaskContext input) {
                   return Protos.TaskStatus.newBuilder()
                           .setTaskId(Protos.TaskID.newBuilder().setValue(input.getId()).build())
                           .setSlaveId(Protos.SlaveID.newBuilder().setValue(input.getSlaveId()).build())
                           .setState(Protos.TaskState.TASK_RUNNING)
                           .build();
               }
           }));
       } finally {
           lock.unlock();
       }
    }
  • 呼叫 #implicitReconcile() 方法,查詢所有任務。實現程式碼如下:

    public void implicitReconcile() {
       lock.lock();
       try {
           // 查詢全部任務
           schedulerDriver.reconcileTasks(Collections.emptyList());
       } finally {
           lock.unlock();
       }
    }
  • 為什麼這裡要使用 ReentrantLock 鎖呢?Elastic-Job-Cloud-Scheduler 提供 CloudOperationRestfulApi,支援使用 HTTP Restful API 主動觸發 #explicitReconcile() 和 #implicitReconcile() 方法,透過鎖避免併發核對。對 CloudOperationRestfulApi 有興趣的同學,直接點選連結檢視實現。

  • 雖然 #implicitReconcile() 方法,能查詢到所有 Mesos 任務狀的態,但是效能較差,而 #explicitReconcile() 方法顯式查詢執行中的 Mesos 任務的狀態,效能更好,所以先進行呼叫。

  • 最佳化點(目前暫未實現):Elastic-Job-Cloud-Scheduler 註冊到 Mesos 和 重註冊到 Mesos,都執行一次核對。

    FROM 《Elastic-Job-Lite 原始碼分析 —— 自診斷修複》
    This reconciliation algorithm must be run after each (re-)registration.

其他 Scheduler 核對資料,有興趣的同學可以看看:

  • 《Mesos 框架構建分散式應用》P76 新增核對 、P111 故障轉移期間的核對
  • 《Mesos 官方檔案 —— reconciliation》

Elastic-Job-Lite 也會存在作業節點 和 Zookeeper 資料不一致的情況,有興趣的同學可以看下《Elastic-Job-Lite 原始碼分析 —— 自診斷修複》的實現。

666. 彩蛋

給英文和我一樣半斤八兩的同學一本葵花寶典+辟邪劍譜:

  • 《Mesos中文手冊》。
  • 《Mesos 容錯、故障》

整個 Elastic-Job-Cloud 完結,撒花!

收穫蠻多的,學習的第一套基於雲原生( CloudNative )實現的中介軟體,期待有基於雲原生的服務化中介軟體。

一開始因為 Elastic-Job-Cloud 基於 Mesos 實現,內心還是有點恐懼感,後面硬啃 + 搭配《Mesos 框架構建分散式應用》,比預想的時間快了一半完成這個系列。在這裡強烈推薦這本書。另外,等時間相對空,會研究下另外一個滬江開源的基於 Mesos 實現的分散式排程系統 Juice。不是很確定會不會出原始碼解析的文章,儘量輸出噶。

後面會繼續更新原始碼解析系列,下一個系列應該是《tcc-transaction 原始碼解析》。在選擇要研究的 tcc 中介軟體還是蠻糾結的,哈哈,這裡聽從 zhisheng 的建議。如果不好,我保證會打死你的。

希望堅持不懈的分享原始碼解析會有更多的同行者閱讀。確實,原始碼解析的受眾略小。

道友,趕緊上車,分享一波朋友圈!

贊(0)

分享創造快樂