歡迎光臨
每天分享高質量文章

作業調度中間件 Elastic-Job-Cloud 原始碼分析 —— 高可用

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. Scheduler 集群
  • 3. Scheduler 部署
  • 4. Scheduler 故障轉移
  • 5. Scheduler 資料儲存
    • 5.1 RunningService
    • 5.2 ProducerManager
    • 5.3 TaskScheduler
  • 6. Mesos Master 崩潰
  • 7. Mesos Slave 崩潰
  • 8. Scheduler 核對
  • 666. 彩蛋

1. 概述

本文主要分享 Elastic-Job-Cloud 高可用

一個高可用的 Elastic-Job-Cloud 組成如下圖:

  • Mesos Master 集群
  • Mesos Slave 集群
  • Zookeeper 集群
  • Elastic-Job-Cloud-Scheduler 集群
  • Elastic-Job-Cloud-Executor 集群

本文重點分享 Elastic-Job-Cloud-Scheduler 如何實現高可用。

Mesos Master / Mesos Slave / Zookeeper 高可用,同學們可以自行 Google 解決。Elastic-Job-Cloud-Executor 運行在 Mesos Slave 上,通過 Mesos Slave 集群多節點實現高可用。

你行好事會因為得到贊賞而愉悅
同理,開源專案貢獻者會因為 Star 而更加有動力
為 Elastic-Job 點贊!傳送門

2. Scheduler 集群

Elastic-Job-Cloud-Scheduler 通過至少兩個節點實現集群。集群中通過主節點選舉一個主節點,只有主節點提供服務,從實體處於”待命”狀態。當主節點故障時,從節點會選舉出新的主節點繼續提供服務。實現代碼如下:

public final class Bootstrap {

    public static void main(final String[] args) throws InterruptedException {
        // 初始化 註冊中心
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(BootstrapEnvironment.getInstance().getZookeeperConfiguration());
        regCenter.init();
        // 初始化 Zookeeper 選舉服務
        final ZookeeperElectionService electionService = new ZookeeperElectionService(
                BootstrapEnvironment.getInstance().getFrameworkHostPort(), (CuratorFramework) regCenter.getRawClient(), HANode.ELECTION_NODE, new SchedulerElectionCandidate(regCenter));
        electionService.start();
        // 掛起 主行程
        final CountDownLatch latch = new CountDownLatch(1);
        latch.await();
        // Hook 貌似位置不對?
        Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {

            @Override
            public void run() {
                electionService.stop();
                latch.countDown();
            }
        });
    }
}
  • Bootstrap,Elastic-Job-Cloud-Scheduler 啟動器(仿佛在說廢話)。
  • CoordinatorRegistryCenter,用於協調分佈式服務的註冊中心,在《Elastic-Job-Lite 原始碼分析 —— 註冊中心》有詳細解析。
  • ZookeeperElectionService,Zookeeper 選舉服務,本小節的主角。
  • ShutdownHook 關閉行程鉤子,代碼放置的位置不對,需要放在 CountDownLatch#await() 方法上面。目前實際不影響使用。

呼叫 ZookeeperElectionService#start() 方法,初始化 Zookeeper 選舉服務以實現 Elastic-Job-Cloud-Scheduler 主節點選舉。

private final CountDownLatch leaderLatch = new CountDownLatch(1);

private final LeaderSelector leaderSelector;

public ZookeeperElectionService(final String identity, final CuratorFramework client, final String electionPath, final ElectionCandidate electionCandidate) {
   // 創建 LeaderSelector
   leaderSelector = new LeaderSelector(client, electionPath, new LeaderSelectorListenerAdapter() {

       @Override
       public void takeLeadership(final CuratorFramework client) throws Exception {
           // ... 省略【暫時】無關代碼
       }
   });
   // 設置重覆參與選舉主節點
   leaderSelector.autoRequeue();
   // 設置參與節點的編號
   leaderSelector.setId(identity);
}

/**
* 開始選舉.
*/

public void start() {
   log.debug("Elastic job: {} start to elect leadership", leaderSelector.getId());
   leaderSelector.start();
}
  • 通過 Apache Curator LeaderSelector 實現分佈式多節點選舉。

    FROM https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/leader/LeaderSelector.html
    Abstraction to select a “leader” amongst multiple contenders in a group of JMVs connected to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will be assigned leader until it releases leadership at which time another one from the group will be chosen.
    Note that this class uses an underlying InterProcessMutex and as a result leader election is “fair” – each user will become leader in the order originally requested (from ZK’s point of view).

  • 呼叫 LeaderSelector#autoRequeue() 方法,設置重覆參與選舉主節點。預設情況下,自己選舉成為主節點後,不再參與下次選舉。設置重覆參與選舉主節點後,每次選舉都會參與。在 Elastic-Job-Cloud-Scheduler 里,我們顯然要重覆參與選舉。

  • 呼叫 LeaderSelector#setId() 方法,設置參與節點的編號。在 Elastic-Job-Cloud-Scheduler 里暫時沒有實際用途。編號演算法為 BootstrapEnvironment.getInstance().getFrameworkHostPort(),即:HOST:PORT

  • 呼叫 #start() 方法,開始選舉。當自己選舉主節點成功,回呼 LeaderSelector#takeLeadership() 方法。

回呼 LeaderSelector#takeLeadership() 方法,Elastic-Job-Cloud-Scheduler 主節點開始領導狀態。實現代碼如下:

// ZookeeperElectionService.LeaderSelector 內部實現類
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
    log.info("Elastic job: {} has leadership", identity);
    try {
        // 開始領導狀態
        electionCandidate.startLeadership();
        // 掛起 行程
        leaderLatch.await();
        log.warn("Elastic job: {} lost leadership.", identity);
        // 終止領導狀態
        electionCandidate.stopLeadership();
    } catch (final JobSystemException exception) {
        // 異常退出
        log.error("Elastic job: Starting error", exception);
        System.exit(1);  
    }
}
  • 呼叫 SchedulerElectionCandidate#startLeadership() 方法,開始領導狀態。實現代碼如下:

    // SchedulerElectionCandidate.java
    public final class SchedulerElectionCandidate implements ElectionCandidate {
    private final CoordinatorRegistryCenter regCenter;

    private SchedulerService schedulerService;

    public SchedulerElectionCandidate(final CoordinatorRegistryCenter regCenter) {
        this.regCenter = regCenter;
    }

    @Override
    public void startLeadership() throws Exception {
        try {
            schedulerService = new SchedulerService(regCenter);
            schedulerService.start();
        } catch (final Throwable throwable) {
            throw new JobSystemException(throwable);
        }
    }

    }

    // SchedulerService.java
    /**
    * 以守護行程方式啟動.
    */

    public void start() {
       facadeService.start();
       producerManager.startup();
       statisticManager.startup();
       cloudJobConfigurationListener.start();
       taskLaunchScheduledService.startAsync();
       restfulService.start();
       schedulerDriver.start();
       if (env.getFrameworkConfiguration().isEnabledReconcile()) {
           reconcileService.startAsync();
       }
    }

    • 當 Elastic-Job-Cloud-Scheduler 主節點呼叫 SchedulerService#start() 方法後,各種服務初始化完成,特別是和 Mesos Master 的連接,可以愉快的進行作業調度等等服務。
    • Elastic-Job-Cloud-Scheduler 從節點,因為無法回呼 LeaderSelector#takeLeadership() 方法,處於”待命”狀態。當主節點故障時,從節點會選舉出新的主節點,觸發 LeaderSelector#takeLeadership() 方法回呼,繼續提供服務。
  • 呼叫 CountLatch#await() 方法,掛起主節點 LeaderSelector#takeLeadership() 方法繼續向下執行。為什麼要進行掛起?如果呼叫完該方法,主節點就會讓出主節點身份,這樣會導致 Elastic-Job-Cloud-Scheduler 集群不斷不斷不斷更新主節點,無法正常提供服務。

  • 當 Elastic-Job-Cloud-Scheduler 主節點關閉時,觸發上文代碼看到的 ShutdownHook ,關閉服務。實現代碼如下:

    // Bootstrap.java
    public final class Bootstrap {
    public static void main(final String[] args) throws InterruptedException {
        // ... 省略無關代碼
        Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {

            @Override
            public void run() {
                // 停止選舉
                electionService.stop();
                latch.countDown();
            }
        });
    }

    }

    • 呼叫 ElectionService#stop() 方法,停止選舉,從而終止領導狀態,關閉各種服務。實現代碼如下:

      // ZookeeperElectionService.java
      public void stop() {
          log.info("Elastic job: stop leadership election");
          // 結束 #takeLeadership() 方法的行程掛起
          leaderLatch.countDown();
          try {
              // 關閉 LeaderSelector
              leaderSelector.close();
          } catch (final Exception ignored) {
          }
      }

      // SchedulerElectionCandidate.java
      @Override
      public void stopLeadership() {
          schedulerService.stop();
      }

      // SchedulerService.java
      /**
       * 停止運行.
       */

      public void stop() {
          restfulService.stop();
          taskLaunchScheduledService.stopAsync();
          cloudJobConfigurationListener.stop();
          statisticManager.shutdown();
          producerManager.shutdown();
          schedulerDriver.stop(true);
          facadeService.stop();
          if (env.getFrameworkConfiguration().isEnabledReconcile()) {
              reconcileService.stopAsync();
          }
      }
  • 當發生 JobSystemException 異常時,即呼叫 SchedulerElectionCandidate#startLeadership() 方法發生異常( SchedulerElectionCandidate#stopLeadership() 實際不會丟擲異常 ),呼叫 System.exit(1) 方法,Elastic-Job-Cloud-Scheduler 主節點異常崩潰

    • 目前猜測可能有種情況會導致異常崩潰。(1)一個 Elastic-Job-Cloud-Scheduler 集群有兩個節點 A / B,通過選舉 A 成為主節點;(2)突然 Zookeeper 集群崩潰,恢復後,A 節點選舉恰好又成為主節點,因為未呼叫 SchedulerElectionCandidate#stopLeadership() 關閉原來的各種服務,導致再次呼叫 SchedulerElectionCandidate#startLeadership() 會發生異常,例如說 RestfulService 服務,需要占用一個端口提供服務,重新初始化,會發生端口衝突丟擲異常。筆者嘗試模擬,通過一個 Elastic-Job-Cloud-Scheduler + Zookeeper 的情況,能夠觸發該情況,步驟如下:(1)Zookeeper 啟動;(2)Elastic-Job-Cloud-Scheduler 啟動,選舉成為主節點,正常初始化;(3)重啟 Zookeeper;(4)Elastic-Job-Cloud-Scheduler 再次選舉成為主節點,因為 RestfulService 端口衝突異常初始化崩潰。如果真出現這種情況怎麼辦呢?在「3. Scheduler 部署」揭曉答案。

Elastic-Job-Lite 在主節點選舉實現方式上略有不同,有興趣的同學可以看下《Elastic-Job-Lite 原始碼分析 —— 主節點選舉》的實現。

3. Scheduler 部署

比較容易想到的一種方式,選擇多台主機部署 Elastic-Job-Cloud-Executor 多個節點。

But…… 我們要想下,Elastic-Job-Cloud-Executor 運行在 Mesos 之上,可以使用上 Mesos 的資源調度和部署服務。引入 Mesos 上著名的框架 Marathon。它可以帶來所有後臺行程( 例如,Elastic-Job-Cloud-Executor )能夠運行在任意機器上,Marathon 會在後臺已有實體失敗時,自動啟動新實體的好處。是不是很贊 +1024 ?!

FROM 《Mesos 框架構建分佈式應用》 P47
Mesos 集群里的常見方案是在 Marathon 上運行集群的 Mesos 框架。但是 Marathon 本身就是一種 Mesos 的框架!那麼在 Marathon 上運行 Mesos 框架意味著什麼呢?不用考慮如何將每種框架的調度器部署到特定的主機上並且處理這些主機的故障,Marathon 能夠確保框架的調度器總是在集群里的某處運行著。這樣大幅簡化了在高可用配置里部署新框架的複雜度。

嗯…… 當然,Marathon 我們也要做高可用。

? Marathon 原來中文是馬拉松。哈哈哈,很適合的名字。

4. Scheduler 故障轉移

當原有 Elastic-Job-Cloud-Scheduler 主節點崩潰時,從節點重新進行主節點選舉,完成故障轉移。那麼此時會有一個問題,新主節點如何接管已經在執行中的 Elastic-Job-Cloud-Executer 們呢?

第一種方案,關閉原有的所有 Elastic-Job-Cloud-Executor 們,然後重新調度啟動。顯然,這個方式太過暴力。如果有些作業任務運行時間較長,直接中斷不是很友好。再比如,Elastic-Job-Cloud-Scheduler 節點需要進行升級,也關閉 Elastic-Job-Cloud-Executor,也不合理,和使用高可用性集群操作系統的初衷是背離的。該方案,不推薦

第二種方案,重用原主節點的 Mesos FrameworkID。原理如下:

FROM 《Mesos 框架構建分佈式應用》 P72
在 Mesos 里,調度器由其 FrameworkID、FrameworkInfo 里的可選值唯一確定。FrameworkID 必須由 Mesos 分配,從而確保對於每個框架來說該值是唯一確定的。現在,需要在分配 FrameworkID 時儲存該值,這樣未來的主實體才可以重用該值。

在 Elastic-Job-Cloud-Scheduler 使用註冊中心( Zookeeper ) 的持久資料節點 /${NAMESPACE}/ha/framework_id 儲存 FrameworkID,儲存值為 ${FRAMEWORK_ID}。使用 zkClient 查看如下:

[zk: localhost:2181(CONNECTED) 1] get /elastic-job-cloud/ha/framework_id
d31e7faa-aa72-4d0a-8941-512984d5af49-0001

呼叫 SchedulerService#getSchedulerDriver() 方法,初始化 Mesos Scheduler Driver 時,從 Zookeeper 獲取是否已經存在 FrameworkID。實現代碼如下:

// SchedulerService.java
private SchedulerDriver getSchedulerDriver(final TaskScheduler taskScheduler, final JobEventBus jobEventBus, final FrameworkIDService frameworkIDService) {
   // 獲取 FrameworkID
   Optional frameworkIDOptional = frameworkIDService.fetch();
   Protos.FrameworkInfo.Builder builder = Protos.FrameworkInfo.newBuilder();
   // 如果存在,設置 FrameworkID
   if (frameworkIDOptional.isPresent()) {
       builder.setId(Protos.FrameworkID.newBuilder().setValue(frameworkIDOptional.get()).build());
   }
   // ... 省略無關代碼
   Protos.FrameworkInfo frameworkInfo = builder.setUser(mesosConfig.getUser()).setName(frameworkName)
                .setHostname(mesosConfig.getHostname())
                .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT_SECONDS)
                .setWebuiUrl(WEB_UI_PROTOCOL + env.getFrameworkHostPort()).setCheckpoint(true).build();
   // ... 省略無關代碼
}
  • 呼叫 FrameworkIDService#fetch() 方法,從註冊中心獲取 FrameworkID 。實現代碼如下:

    public Optional fetch() {
       String frameworkId = regCenter.getDirectly(HANode.FRAMEWORK_ID_NODE);
       return Strings.isNullOrEmpty(frameworkId) ? Optional.absent() : Optional.of(frameworkId);
    }
  • 呼叫 Protos.FrameworkInfo.Builder#setId(…) 方法,當 FrameworkID 存在時,設置 FrameworkID。

  • 呼叫 Protos.FrameworkInfo.Builder#setFailoverTimeout(…) 方法,設置 Scheduler 最大故障轉移時間,即 FrameworkID 過期時間。Elastic-Job-Cloud-Scheduler 預設設置一周。

當 Elastic-Job-Cloud-Scheduler 集群第一次初始化,上面的邏輯顯然獲取不到 FrameworkID,在向 Mesos Master 初始化成功後,回呼 SchedulerEngine#registered(...) 方法進行儲存,實現代碼如下:

// SchedulerEngine.java
public final class SchedulerEngine implements Scheduler {

    @Override
    public void registered(final SchedulerDriver schedulerDriver, final Protos.FrameworkID frameworkID, final Protos.MasterInfo masterInfo) {
        log.info("call registered");
        // 儲存FrameworkID
        frameworkIDService.save(frameworkID.getValue());
        // 過期 TaskScheduler Lease
        taskScheduler.expireAllLeases();
        // 註冊 Mesos Master 信息
        MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
    }

}

// FrameworkIDService.java
public void save(final String id) {
   if (!regCenter.isExisted(HANode.FRAMEWORK_ID_NODE)) { // 不存在才儲存
       regCenter.persist(HANode.FRAMEWORK_ID_NODE, id);
   }
}

5. Scheduler 資料儲存

新的 Elastic-Job-Cloud-Scheduler 主節點在故障轉移,不僅僅接管 Elastic-Job-Cloud-Executor,還需要接管資料儲存

Elastic-Job-Cloud-Executor 使用註冊中心( Zookeeper )儲存資料。資料儲存分成兩部分:

  • config,雲作業應用配置、雲作業配置。
  • state,作業狀態信息。

整體如下圖:

Elastic-Job-Cloud-Scheduler 各個服務根據資料儲存啟動初始化。下麵來看看依賴資料儲存進行初始化的服務代碼實現。

5.1 RunningService

RunningService,任務運行時服務。呼叫 RunningService#start() 方法,啟動任務運行佇列。實現代碼如下:

public final class RunningService {

    /**
     * 運行中作業映射
     * key:作業名稱
     * value:任務運行時背景關係集合
     */

    @Getter
    private static final ConcurrentHashMap> RUNNING_TASKS = new ConcurrentHashMap<>(TASK_INITIAL_SIZE);

    public void start() {
        clear();
        List jobKeys = regCenter.getChildrenKeys(RunningNode.ROOT);
        for (String each : jobKeys) {
            // 從運行中佇列移除不存在配置的作業任務
            if (!configurationService.load(each).isPresent()) {
                remove(each);
                continue;
            }
            // 添加 運行中作業映射
            RUNNING_TASKS.put(each, Sets.newCopyOnWriteArraySet(Lists.transform(regCenter.getChildrenKeys(RunningNode.getRunningJobNodePath(each)), new Function() {

                @Override
                public TaskContext apply(final String input) {
                    return TaskContext.from(regCenter.get(RunningNode.getRunningTaskNodePath(TaskContext.MetaInfo.from(input).toString())));
                }
            })));
        }
    }
}
  • 因為運行中作業映射( RUNNING_TASKS )使用的頻次很多,Elastic-Job-Cloud-Scheduler 快取在記憶體中。每次初始化時,使用從資料儲存運行中作業佇列加載到記憶體。

  • 這裡我們在看下運行中作業佇列的添加( #add() )方法,實現代碼如下:

    public void add(final TaskContext taskContext) {
       if (!configurationService.load(taskContext.getMetaInfo().getJobName()).isPresent()) {
           return;
       }
       // 添加到運行中的任務集合
       getRunningTasks(taskContext.getMetaInfo().getJobName()).add(taskContext);
       // 判斷是否為常駐任務
       if (!isDaemon(taskContext.getMetaInfo().getJobName())) {
           return;
       }
       // 添加到運行中佇列
       String runningTaskNodePath = RunningNode.getRunningTaskNodePath(taskContext.getMetaInfo().toString());
       if (!regCenter.isExisted(runningTaskNodePath)) {
           regCenter.persist(runningTaskNodePath, taskContext.getId());
       }
    }
    • 運行中作業佇列只儲存常駐作業的任務。所以瞬時作業,在故障轉移時,可能存在相同作業相同分片任務同時調度執行。舉個慄子?,Elastic-Job-Cloud-Scheduler 集群有兩個節點 A( 主節點 ) / B( 從節點 ),(1)A 節點每 5 分鐘調度一次瞬時作業任務 T ,T 每次執行消耗時間實際超過 5 分鐘( 先不要考慮是否合理 )。(2)A 節點崩潰,B 節點成為主節點,5 分鐘後調度 T 作業,因為運行中作業佇列只儲存常駐作業的任務,恢復後的 RUNNING_TASKS 不存在該作業任務,因此可以調度 T 作業,實際 T 作業正在 Elastic-Job-Cloud-Executor 執行中。

5.2 ProducerManager

ProducerManager,發佈任務作業調度管理器。呼叫 ProducerManager#startup() 方法,啟動作業調度器。實現代碼如下:

public final class ProducerManager {

    public void startup() {
        log.info("Start producer manager");
        // 發佈瞬時作業任務的調度器
        transientProducerScheduler.start();
        // 初始化調度作業
        for (CloudJobConfiguration each : configService.loadAll()) {
            schedule(each);
        }
    }

}
  • 呼叫 ConfigService#loadAll() 方法,從資料儲存讀取所有作業配置。
  • 呼叫 #schedule() 方法,初始化調度作業。
    • 瞬時作業,在 Elastic-Job-Cloud-Scheduler 計時調度,類似每 XX 秒 / 分 / 時 / 天之類的作業需要重新計時,這個請註意。
    • 常駐作業,在 Elastic-Job-Cloud-Executor 計時調度,暫無影響。
    • 在《Elastic-Job-Cloud 原始碼分析 —— 作業調度(一)》「3. Producer 發佈任務」有詳細解析。

5.3 TaskScheduler

TaskScheduler,Fenzo 作業調度器,根據 Mesos Offer 和作業任務的優化分配。因為其分配是依賴當前實際 Mesos Offer 和 作業任務運行的情況,猜測可能對優化分配有影響,但不影響正確性。筆者對 TaskScheduler 瞭解不是很深入,僅僅作為猜測。

在《Elastic-Job-Cloud 原始碼分析 —— 作業調度(一)》「4.1」「4.2」「4.3」有和 TaskScheduler 相關的內容解析。

6. Mesos Master 崩潰

Mesos Master 集群,Mesos Master 主節點崩潰後,Mesos Master 集群重新選舉後,Scheduler、Mesos Slave 從 Zookeeper 獲取到最新的 Mesos Master 主節點重新進行註冊,不影響 Scheduler 、Mesos Slave 、任務執行。

呼叫 SchedulerService#getSchedulerDriver(...) 方法,設置 SchedulerDriver 從 Mesos Zookeeper Address 讀取當前 Mesos Master 地址,實現代碼如下:

// SchedulerService.java
private SchedulerDriver getSchedulerDriver(final TaskScheduler taskScheduler, final JobEventBus jobEventBus, final FrameworkIDService frameworkIDService) {
    // ... 省略無關代碼
    MesosConfiguration mesosConfig = env.getMesosConfiguration();
    return new MesosSchedulerDriver(new SchedulerEngine(taskScheduler, facadeService, jobEventBus, frameworkIDService, statisticManager), frameworkInfo, mesosConfig.getUrl() // Mesos Master URL
    );
}

// MesosSchedulerDriver.java
public MesosSchedulerDriver(Scheduler scheduler,
                            FrameworkInfo framework,
                            String master)
 
{
    // ... 省略無關代碼     
}
  • MesosSchedulerDriver 構造方法第三個引數 master,代表 Mesos 使用的 Zookeeper 地址,例如:zk://127.0.0.1:2181/mesos。生產環境請配置多 Zookeeper 節點,例如:zk://host1:port1,host2:port2,…/path

  • 使用 zkClient 查看如下:

    [zk: localhost:2181(CONNECTED) 10] ls /mesos
    [log_replicas, json.info_0000000017]
    [zk: localhost:2181(CONNECTED) 11] get /mesos/json.info_0000000017
    {"address":{"hostname":"localhost","ip":"127.0.0.1","port":5050},"hostname":"localhost","id":"685fe32d-e30c-4df7-b891-3d96b06fee88","ip":16777343,"pid":"[email protected]:5050","port":5050,"version":"1.4.0"}

Elastic-Job-Cloud-Scheduler 註冊上、重新註冊上、斷開 Mesos Master 實現代碼如下:

public final class SchedulerEngine implements Scheduler {

    @Override
    public void registered(final SchedulerDriver schedulerDriver, final Protos.FrameworkID frameworkID, final Protos.MasterInfo masterInfo) {
        log.info("call registered");
        // ... 省略無關代碼
        // 註冊 Mesos Master 信息
        MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
    }

    @Override
    public void reregistered(final SchedulerDriver schedulerDriver, final Protos.MasterInfo masterInfo) {
        // ... 省略無關代碼
        // 註冊 Mesos Master 信息
        MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
    }

    @Override
    public void disconnected(final SchedulerDriver schedulerDriver) {
        log.warn("call disconnected");
        MesosStateService.deregister();
    }
}
  • MesosStateService,Mesos狀態服務,提供呼叫 Mesos Master API 服務,例如獲取所有執行器。

  • 呼叫 MesosStateService#register(...) 方法,註冊 Mesos Master 信息,實現代碼如下:

    public class MesosStateService {
    private static String stateUrl;

    public static synchronized void register(final String hostName, final int port) {
        stateUrl = String.format("http://%s:%d/state", hostName, port);
    }

    }

  • 呼叫 MesosStateService#deregister(...) 方法,註銷 Mesos Master 信息,實現代碼如下:

    public static synchronized void deregister() {
        stateUrl = null;
    }

《Mesos 框架構建分佈式應用》P110 如何處理 master 的故障,有興趣的同學也可以仔細看看。

7. Mesos Slave 崩潰

在《Elastic-Job-Cloud 原始碼分析 —— 作業失效轉移》中,搜索關鍵字 “TASK_LOST”,有 Mesos Slave 崩潰後,對 Elastic-Job-Cloud-Scheduler 和 Elastic-Job-Cloud-Executor 的影響。

《Mesos 框架構建分佈式應用》P109 如何處理 slave 的故障,有興趣的同學也可以仔細看看。

8. Scheduler 核對

FROM http://mesos.apache.org/documentation/latest/reconciliation/
Messages between framework schedulers and the Mesos master may be dropped due to failures and network partitions. This may cause a framework scheduler and the master to have different views of the current state of the cluster. For example, consider a launch task request sent by a framework. There are many ways that failures can prevent the task launch operation from succeeding, such as:

  • Framework fails after persisting its intent to launch the task, but before the launch task message was sent.
  • Master fails before receiving the message.
  • Master fails after receiving the message but before sending it to the agent.

通過核對特性解決這個問題。核對是協調器如何和 Mesos Master 一起檢查調度器所認為的集群狀態是否和 Mesos Master 所認為的集群狀態完成匹配。

呼叫 SchedulerDriver#reconcileTasks(...) 方法,查詢任務狀態。代碼接口如下:

public interface SchedulerDriver {
    Status reconcileTasks(Collection statuses);
}
  • 只能查詢非終止狀態( non-terminal )的任務。核對的主要原因,確認任務是否還在運行,或者已經進入了中斷狀態。
    • terminal:TASK_ERROR、TASK_FAILED、TASK_FINISHED、TASK_KILLED
    • non-terminal:TASK_DROPPED、TASK_GONE、TASK_GONE_BY_OPERATOR、TASK_KILLING、TASK_LOST、TASK_RUNNING、TASK_STAGING、TASK_STARTING、TASK_UNREACHABLE、TASK_UNKNOWN
  • 當 statuses 非空時,顯示查詢,通過回呼 Scheduler#statusUpdate(…) 方法異步傳回指定的任務的狀態。
  • 當 statuses 為空時,隱式查詢,通過回呼 Scheduler#statusUpdate(…) 方法異步傳回全部的任務的狀態。

ReconcileService,核對 Mesos 與 Scheduler 之間的任務狀態。實現代碼如下:

public class ReconcileService extends AbstractScheduledService {

    private final ReentrantLock lock = new ReentrantLock();

    @Override
    protected void runOneIteration() throws Exception {
        lock.lock();
        try {
            explicitReconcile();
            implicitReconcile();
        } finally {
            lock.unlock();
        }
    }

    @Override
    protected Scheduler scheduler() {
        FrameworkConfiguration configuration = BootstrapEnvironment.getInstance().getFrameworkConfiguration();
        return Scheduler.newFixedDelaySchedule(configuration.getReconcileIntervalMinutes(), configuration.getReconcileIntervalMinutes(), TimeUnit.MINUTES);
    }
}
  • 通過配置 FrameworkConfiguration#reconcileIntervalMinutes 設置,每隔多少分鐘執行一次核對。若配置時間大於 0 才開啟任務狀態核對功能。

  • 呼叫 #explicitReconcile() 方法,查詢運行中的任務。實現代碼如下:

    public void explicitReconcile() {
       lock.lock();
       try {
           // 獲取運行中的作業任務背景關係集合
           Set runningTask = new HashSet<>();
           for (Set each : facadeService.getAllRunningTasks().values()) {
               runningTask.addAll(each);
           }
           if (runningTask.isEmpty()) {
               return;
           }
           log.info("Requesting {} tasks reconciliation with the Mesos master", runningTask.size());
           // 查詢指定任務
           schedulerDriver.reconcileTasks(Collections2.transform(runningTask, new Function() {
               @Override
               public Protos.TaskStatus apply(final TaskContext input) {
                   return Protos.TaskStatus.newBuilder()
                           .setTaskId(Protos.TaskID.newBuilder().setValue(input.getId()).build())
                           .setSlaveId(Protos.SlaveID.newBuilder().setValue(input.getSlaveId()).build())
                           .setState(Protos.TaskState.TASK_RUNNING)
                           .build();
               }
           }));
       } finally {
           lock.unlock();
       }
    }
  • 呼叫 #implicitReconcile() 方法,查詢所有任務。實現代碼如下:

    public void implicitReconcile() {
       lock.lock();
       try {
           // 查詢全部任務
           schedulerDriver.reconcileTasks(Collections.emptyList());
       } finally {
           lock.unlock();
       }
    }
  • 為什麼這裡要使用 ReentrantLock 鎖呢?Elastic-Job-Cloud-Scheduler 提供 CloudOperationRestfulApi,支持使用 HTTP Restful API 主動觸發 #explicitReconcile() 和 #implicitReconcile() 方法,通過鎖避免併發核對。對 CloudOperationRestfulApi 有興趣的同學,直接點擊鏈接查看實現。

  • 雖然 #implicitReconcile() 方法,能查詢到所有 Mesos 任務狀的態,但是性能較差,而 #explicitReconcile() 方法顯式查詢運行中的 Mesos 任務的狀態,性能更好,所以先進行呼叫。

  • 優化點(目前暫未實現):Elastic-Job-Cloud-Scheduler 註冊到 Mesos 和 重註冊到 Mesos,都執行一次核對。

    FROM 《Elastic-Job-Lite 原始碼分析 —— 自診斷修複》
    This reconciliation algorithm must be run after each (re-)registration.

其他 Scheduler 核對資料,有興趣的同學可以看看:

  • 《Mesos 框架構建分佈式應用》P76 添加核對 、P111 故障轉移期間的核對
  • 《Mesos 官方文件 —— reconciliation》

Elastic-Job-Lite 也會存在作業節點 和 Zookeeper 資料不一致的情況,有興趣的同學可以看下《Elastic-Job-Lite 原始碼分析 —— 自診斷修複》的實現。

666. 彩蛋

給英文和我一樣半斤八兩的同學一本葵花寶典+辟邪劍譜:

  • 《Mesos中文手冊》。
  • 《Mesos 容錯、故障》

整個 Elastic-Job-Cloud 完結,撒花!

收穫蠻多的,學習的第一套基於雲原生( CloudNative )實現的中間件,期待有基於雲原生的服務化中間件。

一開始因為 Elastic-Job-Cloud 基於 Mesos 實現,內心還是有點恐懼感,後面硬啃 + 搭配《Mesos 框架構建分佈式應用》,比預想的時間快了一半完成這個系列。在這裡強烈推薦這本書。另外,等時間相對空,會研究下另外一個滬江開源的基於 Mesos 實現的分佈式調度系統 Juice。不是很確定會不會出原始碼解析的文章,儘量輸出噶。

後面會繼續更新原始碼解析系列,下一個系列應該是《tcc-transaction 原始碼解析》。在選擇要研究的 tcc 中間件還是蠻糾結的,哈哈,這裡聽從 zhisheng 的建議。如果不好,我保證會打死你的。

希望堅持不懈的分享原始碼解析會有更多的同行者閱讀。確實,原始碼解析的受眾略小。

道友,趕緊上車,分享一波朋友圈!

赞(0)

分享創造快樂