歡迎光臨
每天分享高質量文章

分佈式作業系統 Elastic-Job-Cloud 原始碼分析 —— 作業失效轉移

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. 記錄作業失效轉移
  • 3. 提交失效轉移作業
  • 666. 彩蛋

1. 概述

本文主要分享 Elastic-Job-Cloud 作業失效轉移。對應到 Elastic-Job-Lite 原始碼解析文章為《Elastic-Job-Lite 作業作業失效轉移》。

你需要對《Elastic-Job-Cloud 原始碼分析 —— 作業調度(一)》有一定的瞭解。

當作業任務在 Elastic-Job-Cloud-Executor 異常崩潰時,該任務在下次調度之前不會被重新執行。開啟失效轉移功能後,該作業任務會立即被 Elastic-Job-Cloud-Scheduler 重新調度,提交 Elastic-Job-Cloud-Executor 立即執行。

在 Elastic-Job-Cloud 里,我們瞭解到作業分成瞬時作業和常駐作業。實際上面失效轉移的定義暫時只適用於瞬時作業。對於常駐作業,作業任務異常崩潰後,無論你是否開啟失效轉移功能,Elastic-Job-Cloud-Scheduler 會立刻提交 Elastic-Job-Cloud-Executor 重新調度執行。

為什麼此處使用的是“重新調度”,而不是“立即執行”呢?目前版本 Elasitc-Job-Cloud 暫時不支持常駐作業的失效轉移,當作業任務異常崩潰,本次執行不會重新執行,但是為了作業任務後續能夠調度執行,所以再次提交 Elastic-Job-Cloud-Scheduler。

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 Elastic-Job 點贊!傳送門

OK,下麵我們來看看作業失效轉移的實現方式和作業任務異常崩潰的多重場景。

2. 記錄作業失效轉移

當作業任務異常崩潰時,Elastic-Job-Cloud-Scheduler 通過 Mesos 任務狀態變更接口( #statusUpdate() )實現對任務狀態的監聽處理,實現代碼如下:

public final class SchedulerEngine implements Scheduler {
    @Override
    public void statusUpdate(final SchedulerDriver schedulerDriver, final Protos.TaskStatus taskStatus) {
        String taskId = taskStatus.getTaskId().getValue();
        TaskContext taskContext = TaskContext.from(taskId);
        String jobName = taskContext.getMetaInfo().getJobName();
        log.trace("call statusUpdate task state is: {}, task id is: {}", taskStatus.getState(), taskId);
        jobEventBus.post(new JobStatusTraceEvent(jobName, taskContext.getId(), taskContext.getSlaveId(), Source.CLOUD_SCHEDULER, 
                taskContext.getType(), String.valueOf(taskContext.getMetaInfo().getShardingItems()), State.valueOf(taskStatus.getState().name()), taskStatus.getMessage()));
        switch (taskStatus.getState()) {
            case TASK_RUNNING:
                // ... 省略無關代碼
                break;
            case TASK_FINISHED:
                // ... 省略無關代碼
                break;
            case TASK_KILLED:
                // ... 省略無關代碼
                break;
            case TASK_LOST:
            case TASK_DROPPED:
            case TASK_GONE:
            case TASK_GONE_BY_OPERATOR:
            case TASK_FAILED: // 執行作業任務被錯誤終止
            case TASK_ERROR: // 任務錯誤
                log.warn("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
                // 將任務從運行時佇列刪除
                facadeService.removeRunning(taskContext);
                // 記錄失效轉移佇列
                facadeService.recordFailoverTask(taskContext);
                // 通知 TaskScheduler 任務不分配在對應主機上
                unAssignTask(taskId);
                // 統計
                statisticManager.taskRunFailed();
                break;
            case TASK_UNKNOWN:
            case TASK_UNREACHABLE:
                log.error("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
                statisticManager.taskRunFailed();
                break;
            default:
                break;
        }
    }
}

一共有 6 種狀態判定為作業任務崩潰,我們來一個一個看看:

  • TASK_DROPPED / TASK_GONE / TASK_GONE_BY_OPERATOR

    這三個狀態,筆者暫時不太瞭解,這裡先取用一些資料,歡迎有瞭解的同學指教一下。

    FROM http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.TaskState.html 
    TASK_DROPPED:The task failed to launch because of a transient error. 
    TASK_GONE:The task is no longer running. 
    TASK_GONE_BY_OPERATOR:The task was running on an agent that the master cannot contact; the operator has asserted that the agent has been shutdown, but this has not been directly confirmed by the master.

    FROM http://mesos.apache.org/blog/mesos-1-1-0-released/ 
    [MESOS-5344] – Experimental support for partition-aware Mesos frameworks. In previous Mesos releases, when an agent is partitioned from the master and then reregisters with the cluster, all tasks running on the agent are terminated and the agent is shutdown. In Mesos 1.1, partitioned agents will no longer be shutdown when they reregister with the master. By default, tasks running on such agents will still be killed (for backward compatibility); however, frameworks can opt-in to the new PARTITION_AWARE capability. If they do this, their tasks will not be killed when a partition is healed. This allows frameworks to define their own policies for how to handle partitioned tasks. Enabling the PARTITION_AWARE capability also introduces a new set of task states: TASK_UNREACHABLE, TASK_DROPPED, TASK_GONE, TASK_GONE_BY_OPERATOR, and TASK_UNKNOWN. These new states are intended to eventually replace the TASK_LOST state.

  • TASK_FAILED

    執行作業任務被錯誤終止。例如,執行器( Elastic-Job-Cloud-Executor )異常崩潰,或者被殺死。

  • TASK_ERROR

    任務啟動嘗試失敗錯誤。例如,執行器( Elastic-Job-Cloud-Executor ) 接收到的任務的作業配置不正確。實現代碼如下:

       @Override
    public void run() {
       // 更新 Mesos 任務狀態,運行中。
       executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build());
       //
       Map data = SerializationUtils.deserialize(taskInfo.getData().toByteArray());
       ShardingContexts shardingContexts = (ShardingContexts) data.get("shardingContext");
       @SuppressWarnings("unchecked")
       JobConfigurationContext jobConfig = new JobConfigurationContext((Map) data.get("jobConfigContext"));
       try {
           // 獲得 分佈式作業
           ElasticJob elasticJob = getElasticJobInstance(jobConfig);
           // 調度器提供內部服務的門面物件
           final CloudJobFacade jobFacade = new CloudJobFacade(shardingContexts, jobConfig, jobEventBus);
           // 執行作業
           if (jobConfig.isTransient()) {
               // 執行作業
               JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
               // 更新 Mesos 任務狀態,已完成。
               executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build());
           } else {
               // 初始化 常駐作業調度器
               new DaemonTaskScheduler(elasticJob, jobConfig, jobFacade, executorDriver, taskInfo.getTaskId()).init();
           }
           // CHECKSTYLE:OFF
       } catch (final Throwable ex) {
           // CHECKSTYLE:ON
           log.error("Elastic-Job-Cloud-Executor error", ex);
           // 更新 Mesos 任務狀態,錯誤。
           executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_ERROR).setMessage(ExceptionUtil.transform(ex)).build());
           // 停止自己
           executorDriver.stop();
           throw ex;
       }
    }

    • *
    • 呼叫 #getElasticJobInstance() 方法,因為任務的作業配置不正確丟擲異常。例如,任務類不存在;Spring 的 配置檔案不存在;Spring 容器初始化出錯;Spring Bean 物件初始化或獲取出錯;以及等等。

    • 瞬時作業,呼叫 AbstractElasticJobExecutor#execute(...) 方法,發生異常,並且異常被丟擲。預設情況下,AbstractElasticJobExecutor 內部使用 DefaultJobExceptionHandler 處理髮生的異常,不會丟擲異常,實現代碼如下:

      public final class DefaultJobExceptionHandler implements JobExceptionHandler {

      @Override
      public void handleException(final String jobName, final Throwable cause) {
          log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
      }

      }

    • 常駐作業,呼叫 DaemonTaskScheduler#(…) 方法,初始化發生異常

    • 因為上述的種種異常,呼叫 ExecutorDriver#sendStatusUpdate(…),更新 Mesos 任務狀態為 TASK_ERROR。另外,呼叫 ExecutorDriver#stop() 方法,關閉自己。這意味著,一個執行器上如果存在一個作業任務發生 TASK_ERROR,其他作業任務即使是正常的,也會更新作業任務狀態為 TASK_FAILED。這塊千萬要註意。

  • TASK_LOST

    執行作業任務的 Elastic-Job-Cloud-Executor 所在的 Mesos Slave 與 Mesos Master 因為網絡問題或 Mesos Slave 崩潰引起丟失連接,可能導致其上的所有作業任務狀態變為 TASK_LOST。

    當 Slave 宕機後重啟,導致 TASK_LOST 時,Mesos又是怎麼來處理的呢?

    FROM http://dockone.io/article/2513 
    在 Master 和 Slave 之間,一般都是由 Master 主動向每一個 Slave 發送Ping訊息,如果在設定時間內(flag.slave_ping_timeout,預設15s)沒有收到Slave 的回覆,並且達到一定次數(flag.max_slave_ping_timeouts,預設次數為5),那麼 Master 會操作以下幾個步驟:

    But……………… 
    筆者嘗試如上流程,使用 kill -9 模擬 Mesos Slave 異常崩潰,等待 Mesos Master 發現 Mesos Slave 已經關閉,重啟 Mesos Slave,結果執行器( Elastic-Job-Cloud-Executor )未關閉,調度器( Elastic-Job-Cloud-Scheduler )並未收到任務的 TASK_LOST。???什麼情況???翻查如下文件:

    因為 Elastic-Job-Cloud-Scheduler 註冊到 Mesos Master 時,開啟了 checkpoint 和 PARTITION_AWARE。開啟 checkpoint 後,Mesos Slave 會將記錄檢查點信息, Mesos Slave 重啟後,會讀取檢查點檢查信息,重新連接上( 不會關閉 )運行在它上面的執行器( Elastic-Job-Cloud-Scheduler )。開啟 PARTITION_AWARE 後,TASK_LOST 會被區分成 TASK_UNREACHABLE, TASK_DROPPED, TASK_GONE, TASK_GONE_BY_OPERATOR, and TASK_UNKNOWN。表現如下:

    • 《Mesos 官方文件 —— high-availability-framework-guide》搜索標題 “Dealing with Partitioned or Failed Agents”。
    • 《Mesos 官方文件 —— agent-recovery》搜索關標題 “Agent Recovery”。
    • 必須 Slave 進行重啟,因為對執行器的相關操作只能通過 Mesos Slave,即 Scheduler <=> Mesos Master <=> Mesos Slave <=> Executor。如果 Slave 一直不進行重啟,執行器會一直運行,除非有另外的機制,通知到執行器。
  1. 將該 Slave 從 Master 中刪除,此時該 Slave 的資源將不會再分配給Scheduler。
  2. 遍歷該 Slave 上運行的所有任務,向對應的 Framework 發送任務的 Task_Lost 狀態更新,同時把這些任務從Master中刪除。
  3. 遍歷該 Slave 上的所有 Executor,並刪除。
  4. 觸發 Rescind Offer,把這個 Slave 上已經分配給 Scheduler 的 Offer 撤銷。
  5. 把這個 Slave 從 Master 的 Replicated log 中刪除(Mesos Master 依賴 Replicated log 中的部分持久化集群配置信息進行 failer over / recovery)。
  • kill -9 模擬 Mesos Slave 異常崩潰,等待 Mesos Master 發現 Mesos Slave 已經關閉

  • 調度器( Elastic-Job-Cloud-Scheduler ) 接收直接由 Mesos Master 發送的該 Mesos Slave 上的每個任務 TASK_UNREACHABLE。

  • Mesos Slave 重啟完成。

  • 執行器( Elastic-Job-Cloud-Executor ) 重新註冊到重啟好的 Mesos Slave ,並繼續運行任務。

    如果 Elastic-Job-Cloud-Scheduler 註冊到 Mesos Master 時,關閉了 PARTITION_AWARE 和 checkpoint,表現同 TASK_LOST 描述的過程。

開啟 checkpoint 和 PARTITION_AWARE 實現代碼如下:

  ```Java
  // SchedulerService.java
  private SchedulerDriver getSchedulerDriver(final TaskScheduler taskScheduler, final JobEventBus jobEventBus, final FrameworkIDService frameworkIDService) {
        Protos.FrameworkInfo.Builder builder = Protos.FrameworkInfo.newBuilder();
        // PARTITION_AWARE
        builder.addCapabilitiesBuilder().setType(Protos.FrameworkInfo.Capability.Type.PARTITION_AWARE);
        Protos.FrameworkInfo frameworkInfo = builder.setUser(mesosConfig.getUser()).setName(frameworkName)
            .setHostname(mesosConfig.getHostname()).setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT_SECONDS)
            .setWebuiUrl(WEB_UI_PROTOCOL + env.getFrameworkHostPort())
            .setCheckpoint(true// checkpoint
            .build();
        // ... 省略無關代碼
   }
  ```

是不是開啟了 checkpoint,Mesos Slave 重啟不會關閉執行器?

答案當然是不是的。當 Mesos Slave 配置 recover = cleanup 或者 重啟時間超過 recovery_timeout ( 預設,15 分鐘 )時,重啟完成後,Mesos Slave 關閉運行在它上面的執行器( Elastic-Job-Cloud-Executor ),調度器( Elastic-Job-Cloud-Scheduler ) 接收到的該 Mesos Slave 上的每個任務 TASK_FAILED。

  • 參考文件:《Mesos 官方文件 —— agent-recovery》搜索標題 “Agent Configuration”。

呼叫 FacadeService#recordFailoverTask(...) 方法,記錄失效轉移佇列,實現代碼如下:

public void recordFailoverTask(final TaskContext taskContext) {
   Optional jobConfigOptional = jobConfigService.load(taskContext.getMetaInfo().getJobName());
   if (!jobConfigOptional.isPresent()) {
       return;
   }
   if (isDisable(jobConfigOptional.get())) {
       return;
   }
   CloudJobConfiguration jobConfig = jobConfigOptional.get();
   if (jobConfig.getTypeConfig().getCoreConfig().isFailover() // 開啟失效轉移
           || CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) { // 常駐作業
       failoverService.add(taskContext);
   }
}

  • 對於瞬時作業,必須開啟 JobCoreConfiguration.failover = true,才能失效轉移,這個比較好理解。
  • 對於常駐作業,暫時不支持失效轉移。因為常駐作業是在執行器( Elastic-Job-Executor ) 進行調度執行,如果不添加到失效轉移作業佇列,重新提交到執行器( Elastic-Job-Executor ),後續就不能調度執行該作業了。
  • 呼叫 FailoverService#add(…) 方法,將任務放入失效轉移佇列,實現代碼如下:

// FailoverService.java
public void add(final TaskContext taskContext) {
   if (regCenter.getNumChildren(FailoverNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
       log.warn("Cannot add job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
       return;
   }
   String failoverTaskNodePath = FailoverNode.getFailoverTaskNodePath(taskContext.getMetaInfo().toString());
   if (!regCenter.isExisted(failoverTaskNodePath) // 判斷不在失效轉移佇列
           && !runningService.isTaskRunning(taskContext.getMetaInfo())) { // 判斷不在運行中
       regCenter.persist(failoverTaskNodePath, taskContext.getId());
   }
}

// FailoverNode.java
final class FailoverNode {

    static final String ROOT = StateNode.ROOT + "/failover";

    private static final String FAILOVER_JOB = ROOT + "/%s"// %s=${JOB_NAME}

    private static final String FAILOVER_TASK = FAILOVER_JOB + "/%s"// %s=${TASK_META_INFO}
}

  • FailoverService,失效轉移佇列服務。

  • 失效轉移佇列儲存在註冊中心( Zookeeper )的持久資料節點 /${NAMESPACE}/state/failover/${JOB_NAME}/${TASK_META_INFO},儲存值為任務編號。使用 zkClient 查看如下:

    [zk: localhost:2181(CONNECTED) 2] ls /elastic-job-cloud/state/failover/test_job_simple
    [[email protected]@0]
    [zk: localhost:2181(CONNECTED) 3] get /elastic-job-cloud/state/failover/test_job_simple/[email protected]@0
    [email protected]@[email protected]@[email protected]@[email protected]@8f2a5bb5-2941-4ece-b192-0f936e60faa7

  • 在運維平臺,我們可以看到失效轉移佇列:

3. 提交失效轉移作業

在《Elastic-Job-Cloud 原始碼分析 —— 作業調度(一)》「4.1 創建 Fenzo 任務請求」里,呼叫 FacadeService#getEligibleJobContext() 方法,獲取有資格運行的作業時。FacadeService#getEligibleJobContext() 不僅呼叫 ReadyService#getAllEligibleJobContexts(...) 方法,從待執行佇列中獲取所有有資格執行的作業背景關係,也呼叫 FailoverService#getAllEligibleJobContexts() 方法,從失效轉移佇列中獲取所有有資格執行的作業背景關係。實現代碼如下:

// FailoverService.java
public Collection getAllEligibleJobContexts() {
   // 不存在 失效轉移佇列
   if (!regCenter.isExisted(FailoverNode.ROOT)) {
       return Collections.emptyList();
   }
   // 獲取 失效轉移佇列 的作業們
   List jobNames = regCenter.getChildrenKeys(FailoverNode.ROOT);
   Collection result = new ArrayList<>(jobNames.size());
   Set assignedTasks = new HashSet<>(jobNames.size() * 101);
   for (String each : jobNames) {
       // 為空時,移除 失效轉移佇列 的作業
       List taskIdList = regCenter.getChildrenKeys(FailoverNode.getFailoverJobNodePath(each));
       if (taskIdList.isEmpty()) {
           regCenter.remove(FailoverNode.getFailoverJobNodePath(each));
           continue;
       }
       // 排除 作業配置 不存在的作業
       Optional jobConfig = configService.load(each);
       if (!jobConfig.isPresent()) {
           regCenter.remove(FailoverNode.getFailoverJobNodePath(each));
           continue;
       }
       // 獲得待執行的分片集合
       List assignedShardingItems = getAssignedShardingItems(each, taskIdList, assignedTasks);
       //
       if (!assignedShardingItems.isEmpty() && jobConfig.isPresent()) {
           result.add(new JobContext(jobConfig.get(), assignedShardingItems, ExecutionType.FAILOVER));    
       }
   }
   return result;
}

private List getAssignedShardingItems(final String jobName, final List taskIdList, final Set assignedTasks) {
   List result = new ArrayList<>(taskIdList.size());
   for (String each : taskIdList) {
       TaskContext.MetaInfo metaInfo = TaskContext.MetaInfo.from(each);
       if (assignedTasks.add(Hashing.md5().newHasher().putString(jobName, Charsets.UTF_8).putInt(metaInfo.getShardingItems().get(0)).hash()) // 排重
               && !runningService.isTaskRunning(metaInfo)) { // 排除正在運行中
           result.add(metaInfo.getShardingItems().get(0));
       }
   }
   return result;
}


在《Elastic-Job-Cloud 原始碼分析 —— 作業調度(一)》「4.4 創建 Mesos 任務信息」里,呼叫 LaunchingTasks#getIntegrityViolationJobs() 方法,獲得作業分片不完整的作業集合。實現代碼如下:

// LaunchingTasks.java
/**
* 獲得作業分片不完整的作業集合
*
@param vmAssignmentResults 主機分配任務結果集合
@return 作業分片不完整的作業集合
*/

Collection getIntegrityViolationJobs(final Collection vmAssignmentResults) {
   Map assignedJobShardingTotalCountMap = getAssignedJobShardingTotalCountMap(vmAssignmentResults);
   Collection result = new HashSet<>(assignedJobShardingTotalCountMap.size(), 1);
   for (Map.Entry entry : assignedJobShardingTotalCountMap.entrySet()) {
       JobContext jobContext = eligibleJobContextsMap.get(entry.getKey());
       if (ExecutionType.FAILOVER != jobContext.getType() // 不包括 FAILOVER 執行型別的作業
               && !entry.getValue().equals(jobContext.getJobConfig().getTypeConfig().getCoreConfig().getShardingTotalCount())) {
           log.warn("Job {} is not assigned at this time, because resources not enough to run all sharding instances.", entry.getKey());
           result.add(entry.getKey());
       }
   }
   return result;
}

  • 一個作業可能存在部分分片需要失效轉移,不需要考慮完整性。

在《Elastic-Job-Cloud 原始碼分析 —— 作業調度(一)》「4.7 從佇列中刪除已運行的作業」里,呼叫 FailoverService#remove(...) 方法,從失效轉移佇列中刪除相關任務。實現代碼如下:

public void remove(final Collection metaInfoList) {
   for (TaskContext.MetaInfo each : metaInfoList) {
       regCenter.remove(FailoverNode.getFailoverTaskNodePath(each.toString()));
   }
}

赞(0)

分享創造快樂