歡迎光臨
每天分享高質量文章

【死磕Sharding-jdbc】—非同步送達JOB

點選上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

最大努力送達型非同步JOB任務

當最大努力送達型監聽器多次失敗嘗試後,把任務交給最大努力送達型非同步JOB任務處理,非同步多次嘗試處理;核心原始碼在模組 sharding-jdbc-transaction-async-job中。該模組是一個獨立非同步處理模組,使用者決定是否需要啟用,原始碼比較少,大概看一下原始碼結構:

原始碼結構

resouces目錄下的指令碼和dubbo非常相似(作者應該也看過dubbo原始碼,哈),start.sh&stop.sh;分別是服務啟動指令碼和服務停止指令碼;根據start.sh指令碼可知,該模組的主方法是BestEffortsDeliveryJobMain

  1. CONTAINER_MAIN=com.dangdang.ddframe.rdb.transaction.soft.bed.BestEffortsDeliveryJobMain

  2. nohup java -classpath $CONF_DIR:$LIB_DIR:. $CONTAINER_MAIN >/dev/null 2>&1 &

Main方法的核心原始碼如下:

  1. public final class BestEffortsDeliveryJobMain {

  2.    public static void main(final String[] args) throws Exception {

  3.        try (InputStreamReader inputStreamReader = new InputStreamReader(BestEffortsDeliveryJobMain.class.getResourceAsStream("/conf/config.yaml"), "UTF-8")) {

  4.            BestEffortsDeliveryConfiguration config = new Yaml(new Constructor(BestEffortsDeliveryConfiguration.class)).loadAs(inputStreamReader, BestEffortsDeliveryConfiguration.class);

  5.            new BestEffortsDeliveryJobFactory(config).init();

  6.        }

  7.    }

  8. }

由原始碼可知,主配置檔案是 config.yaml;將該檔案解析為BestEffortsDeliveryConfiguration,然後呼叫 newBestEffortsDeliveryJobFactory(config).init()

config.yaml配置檔案中job相關配置內容如下:

  1. jobConfig:

  2.  #作業名稱

  3.  name: bestEffortsDeliveryJob

  4.  #觸發作業的cron運算式--每5s重試一次

  5.  cron: 0/5 * * * * ?

  6.  #每次作業獲取的事務日誌最大數量

  7.  transactionLogFetchDataCount: 100

  8.  #事務送達的最大嘗試次數.

  9.  maxDeliveryTryTimes: 3

  10.  #執行送達事務的延遲毫秒數,早於此間隔時間的入庫事務才會被作業執行,其SQL為 where *** AND `creation_time`< (now() - maxDeliveryTryDelayMillis),即至少60000ms,即一分鐘前入庫的事務日誌才會被拉取出來;

  11.  maxDeliveryTryDelayMillis: 60000

maxDeliveryTryDelayMillis:60000這個配置也可以理解為60s內的transaction_log不處理;

BestEffortsDeliveryJobFactory核心原始碼:

  1. @RequiredArgsConstructor

  2. public final class BestEffortsDeliveryJobFactory {

  3.    // 這個屬性賦值透過有參構造方法進行賦值--new BestEffortsDeliveryJobFactory(config),就是透過`config.yaml`配置的屬性

  4.    private final BestEffortsDeliveryConfiguration bedConfig;

  5.    /**

  6.     * BestEffortsDeliveryJobMain中呼叫該init()方法,初始化最大努力嘗試型非同步JOB,該JOB基於elastic-job;

  7.     * Initialize best efforts delivery job.

  8.     */

  9.    public void init() {

  10.        // 根據config.yaml中配置的zkConfig節點,得到協調排程中心CoordinatorRegistryCenter

  11.        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(createZookeeperConfiguration(bedConfig));

  12.        // 排程中心初始化

  13.        regCenter.init();

  14.        // 構造elastic-job排程任務

  15.        JobScheduler jobScheduler = new JobScheduler(regCenter, createBedJobConfiguration(bedConfig));

  16.        jobScheduler.setField("bedConfig", bedConfig);

  17.        jobScheduler.setField("transactionLogStorage", TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(bedConfig.getDefaultTransactionLogDataSource())));

  18.        jobScheduler.init();

  19.    }

  20.    // 根據該方法可知,建立的是BestEffortsDeliveryJob

  21.    private JobConfiguration createBedJobConfiguration(final BestEffortsDeliveryConfiguration bedJobConfig) {

  22.        // 根據config.yaml中配置的jobConfig節點得到job配置資訊,且指定job型別為BestEffortsDeliveryJob

  23.        JobConfiguration result = new JobConfiguration(bedJobConfig.getJobConfig().getName(), BestEffortsDeliveryJob.class, 1, bedJobConfig.getJobConfig().getCron());

  24.        result.setFetchDataCount(bedJobConfig.getJobConfig().getTransactionLogFetchDataCount());

  25.        result.setOverwrite(true);

  26.        return result;

  27.    }

BestEffortsDeliveryJob核心原始碼:

  1. @Slf4j

  2. public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {

  3.    @Setter

  4.    private BestEffortsDeliveryConfiguration bedConfig;

  5.    @Setter

  6.    private TransactionLogStorage transactionLogStorage;

  7.    @Override

  8.    public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {

  9.        // 從transaction_log表中抓取最多100條事務日誌(相關引數都在config.yaml中jobConfig節點下)

  10.        return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),

  11.            bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());

  12.    }

  13.    @Override

  14.    public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {

  15.        try (

  16.            Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {

  17.            // 呼叫事務日誌儲存器的processData()進行處理

  18.            transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());

  19.        } catch (final SQLException | TransactionCompensationException ex) {

  20.            log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1,

  21.                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));

  22.            return false;

  23.        }

  24.        return true;

  25.    }

  26. }

END

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖