歡迎光臨
每天分享高質量文章

【死磕Sharding-jdbc】—group by結果合併(1)

點選上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

在sharding-jdbc原始碼之結果合併中已經分析了OrderByStreamResultSetMerger、LimitDecoratorResultSetMerger、IteratorStreamResultSetMerger,檢視原始碼目錄下ResultSetMerger的實現類,只剩下GroupByMemoryResultSetMerger和GroupByStreamResultSetMerger兩個實現類的分析,接下來根據原始碼對兩者的實現進行剖析;

ResultSetMerge關係圖.png

如何選擇

GroupBy有兩個ResultSetMerge的實現:GroupByMemoryResultSetMerger和GroupByStreamResultSetMerger,那麼如何選擇呢?在MergeEngine中有一段這樣的程式碼:

  1. private ResultSetMerger build() throws SQLException {

  2.    // 如果有group by或者聚合型別(例如sum, avg等)的SQL條件,就會選擇一個GroupBy***ResultSetMerger

  3.    if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {

  4.        // isSameGroupByAndOrderByItems()原始碼緊隨其後

  5.        if (selectStatement.isSameGroupByAndOrderByItems()) {

  6.            return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);

  7.        } else {

  8.            return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);

  9.        }

  10.    }

  11.    if (!selectStatement.getOrderByItems().isEmpty()) {

  12.        return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());

  13.    }

  14.    return new IteratorStreamResultSetMerger(resultSets);

  15. }

  16. // 如果只有group by條件,沒有order by,那麼isSameGroupByAndOrderByItems()為true,例如:`SELECT o.* FROM t_order o where o.user_id=? group by o.order_id`(因為這種sql會被改寫為SELECT o.* , o.order_id AS GROUP_BY_DERIVED_0 FROM t_order_0 o where o.user_id=?  group by o.order_id  ORDER BY GROUP_BY_DERIVED_0 ASC,即group by和order by完全相同)

  17. public boolean isSameGroupByAndOrderByItems() {

  18.    return !getGroupByItems().isEmpty() && getGroupByItems().equals(getOrderByItems());

  19. }

由上段原始碼分析可知,如果只有group by條件,那麼選擇GroupByStreamResultSetMerger;那麼如果既有group by,又有order by,那麼就會選擇GroupByStreamResultSetMerger;

接下來分析GroupByStreamResultSetMerger中如何對結果進行group by聚合,假設資料源 js_jdbc_0中實際表 t_order_0和實際表 t_order_1的資料如下:

order_id user_id status
1000 10 INIT
1002 10 INIT
1004 10 VALID
1006 10 NEW
1008 10 INIT
order_id user_id status
1001 10 NEW
1003 10 NEW
1005 10 VALID
1007 10 INIT
1009 10 INIT

GroupByStreamResultSetMerger

以執行SQL SELECT o.status,count(o.user_id)FROM t_order o whereo.user_id=10groupbyo.status為例,分析GroupByStreamResultSetMerger,其部分原始碼如下:

  1. public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {  

  2.    ... ...

  3.    public GroupByStreamResultSetMerger(

  4.            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {

  5.        // GroupByStreamResultSetMerger的父類是OrderByStreamResultSetMerger,所以呼叫super()就是呼叫OrderByStreamResultSetMerger的構造方法

  6.        super(resultSets, selectStatement.getOrderByItems());

  7.        // 標簽(列名)和位置索引的map關係,例如{order_id:1, status:3, user_id:2}

  8.        this.labelAndIndexMap = labelAndIndexMap;

  9.        // 執行的SQL陳述句

  10.        this.selectStatement = selectStatement;

  11.        currentRow = new ArrayList<>(labelAndIndexMap.size());

  12.        // 如果優先順序佇列不為空,表示where條件中有group by,將佇列中第一個元素的group值賦值給currentGroupByValues,即INIT(預設升序排列,所以INIT > NEW > VALID)

  13.        currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();

  14.    }

  15.    ...

  16. }

備註:OrderByStreamResultSetMerger在5. sharding-jdbc原始碼之結果合併這篇文章中已經分析,不再贅述;

next()方法核心原始碼如下:

  1. @Override

  2. public boolean next() throws SQLException {

  3.    currentRow.clear();

  4.    // 如果優先順序佇列為空,表示沒有任何結果,那麼傳回false

  5.    if (getOrderByValuesQueue().isEmpty()) {

  6.        return false;

  7.    }

  8.    if (isFirstNext()) {

  9.        super.next();

  10.    }

  11.    // 集合的核心邏輯在這裡

  12.    if (aggregateCurrentGroupByRowAndNext()) {

  13.        currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();

  14.    }

  15.    return true;

  16. }

aggregateCurrentGroupByRowAndNext()實現如下:

  1. private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {

  2.    boolean result = false;

  3.    // selectStatement.getAggregationSelectItems()先得到select所有舉行型別的項,例如select count(o.user_id) ***中聚合項是count(o.user_id), 然後轉化成map,key就是聚合項即o.user_id,value就是集合unit實體即AccumulationAggregationUnit;即o.user_id的COUNT集合計算是透過AccumulationAggregationUnit實現的,下麵有對AggregationUnitFactory的分析

  4.    Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {

  5.        @Override

  6.        public AggregationUnit apply(final AggregationSelectItem input) {

  7.            return AggregationUnitFactory.create(input.getType());

  8.        }

  9.    });

  10.    // 接下來準備聚合,如何group by的值相同,則進行聚合(因為SQL可能會在多個資料源以及多個實際表上執行)

  11.    while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {

  12.        // 呼叫aggregate()方法進行䄦

  13.        aggregate(aggregationUnitMap);

  14.        cacheCurrentRow();

  15.        // 呼叫next()方法,實際呼叫OrderByStreamResultSetMerger中的next()方法,currentResultSet會指向下一個元素;

  16.        result = super.next();

  17.        // 如果還有值,那麼繼續遍歷

  18.        if (!result) {

  19.            break;

  20.        }

  21.    }

  22.    setAggregationValueToCurrentRow(aggregationUnitMap);

  23.    return result;

  24. }

AggregationUnitFactory 原始碼如下:

  1. public final class AggregationUnitFactory {

  2.    /**

  3.     * Create aggregation unit instance.

  4.     * 根據這段程式碼可知,select中MAX和MIN這種聚合查詢需要使用ComparableAggregationUnit,SUM和COUNT需要使用AccumulationAggregationUnit,AVG需要使用AverageAggregationUnit;(目前只支援這些聚合操作),

  5.     */

  6.    public static AggregationUnit create(final AggregationType type) {

  7.        switch (type) {

  8.            case MAX:

  9.                return new ComparableAggregationUnit(false);

  10.            case MIN:

  11.                return new ComparableAggregationUnit(true);

  12.            case SUM:

  13.            case COUNT:

  14.                return new AccumulationAggregationUnit();

  15.            case AVG:

  16.                return new AverageAggregationUnit();

  17.            default:

  18.                throw new UnsupportedOperationException(type.name());

  19.        }

  20.    }

  21. }

aggregate()原始碼如下:

  1. private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {

  2.    for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {

  3.        List<Comparable>> values = new ArrayList<>(2);

  4.        if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) {

  5.            values.add(getAggregationValue(entry.getKey()));

  6.        } else {

  7.            for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) {

  8.                values.add(getAggregationValue(each));

  9.            }

  10.        }

  11.        // aggregate()的核心就是呼叫AggregationUnit具體實現中的merge()方法,即呼叫AccumulationAggregationUnit.merge()方法(後面會對AggregationUnit的各個實現進行分析)

  12.        entry.getValue().merge(values);

  13.    }

  14. }

執行過程圖解

這一塊的程式碼邏輯稍微有點複雜,下麵透過示意圖分解執行過程,讓sharding-jdbc執行group by整個過程更加清晰: step1. SQL執行 首先在兩個實際表 t_order_0t_order_1中分別執行SQL: SELECT o.status,count(o.user_id)FROM t_order o whereo.user_id=10groupbyo.statust_order_0t_order_1分別得到如下的結果:

status count(o.user_id)
INIT 3
NEW 1
VALID 1
status count(o.user_id)
INIT 2
NEW 2
VALID 1

step2. 執行super(*) 即在GroupByStreamResultSetMerger中呼叫OrderByStreamResultSetMerger的構造方法 super(resultSets,selectStatement.getOrderByItems());,從而得到優先順序佇列,如下圖所示的第一張圖,優先順序中包含兩個元素[(INIT, 3), (INIT 2)]:

powered by afei.png

  1. 先聚合計算(INIT,3)和(INIT,2),由於NEW和INIT不相等,進行下一輪聚合計算;

  2. 再聚合計算(NEW,1)和(NEW,2),由於VALID和NEW不相等,進行下一輪聚合計算;

  3. 再聚合計算(VALID,1)和(VALID,1),兩者的next()為false,聚合計算完成;

step3. aggregationUnitMap 透過轉換得到aggregationUnitMap,key就是count(user_id),value就是COUNT聚合計算的AggregationUnit實現,即AccumulationAggregationUnit;

由於select陳述句中只有COUNT(o.userid涉及到聚合執行,所以這個map的size為1,且key是count(userid);如果SQL是 SELECT o.status,count(o.user_id),max(order_id)FROM t_order o whereo.user_id=?groupbyo.status,那麼aggregationUnitMap的size為2,且第一個entry的key是count(userid),value是AccumulationAggregationUnit;第二個entry的key是max(orderid),value是ComparableAggregationUnit;

step4. 迴圈遍歷並merge 核心程式碼如下,即將(INIT, 3)和(INIT, 2)透過呼叫AccumulationAggregationUnit中的merge方法,從而得到(INIT, 5)。同樣的原因呼叫AccumulationAggregationUnit中的merge方法merge(NEW, 1)和(NEW, 2),從而得到(NEW, 3);merge(VALID, 1)和(VALID, 1),從而得到(VALID, 2)。所以,最終的結果就是[(INIT, 5), (NEW, 3), (VALID, 2)]

  1.    aggregate(aggregationUnitMap);

  2.    cacheCurrentRow();

  3.    result = super.next();

  4.    if (!result) {

  5.        break;

  6.    }

  7. }

AggregationUnit

AggregationUnit即聚合計算介面,總計有三個實現類AccumulationAggregationUnit,ComparableAggregationUnit和AverageAggregationUnit,接下來分別對其簡單介紹;

AccumulationAggregationUnit

實現原始碼如下,SUN和COUNT兩個聚合計算都是用這個AggregationUnit實現,核心實現就是累加:

  1. @Override

  2. public void merge(final List<Comparable>> values) {

  3.    if (null == values || null == values.get(0)) {

  4.        return;

  5.    }

  6.    if (null == result) {

  7.        result = new BigDecimal("0");

  8.    }

  9.    // 核心實現程式碼:累加

  10.    result = result.add(new BigDecimal(values.get(0).toString()));

  11.    log.trace("Accumulation result: {}", result.toString());

  12. }

ComparableAggregationUnit

實現原始碼如下,MAX和MIN兩個聚合計算都是用這個AggregationUnit實現,核心實現就是比較:

  1. @Override

  2. public void merge(final List<Comparable>> values) {

  3.    if (null == values || null == values.get(0)) {

  4.        return;

  5.    }

  6.    if (null == result) {

  7.        result = values.get(0);

  8.        log.trace("Comparable result: {}", result);

  9.        return;

  10.    }

  11.    // 新的值與舊的值比較大小

  12.    int comparedValue = ((Comparable) values.get(0)).compareTo(result);

  13.    // 升序和降序比較方式不同(max聚合計算時asc為false,min聚合計算時asc為true),min聚合計算時找一個更小的值(asc && comparedValue < 0),max聚合計算時找一個更大的值(!asc && comparedValue > 0)

  14.    if (asc && comparedValue < 0 || !asc && comparedValue > 0) {

  15.        result = values.get(0);

  16.        log.trace("Comparable result: {}", result);

  17.    }

  18. }

AverageAggregationUnit

實現原始碼如下,AVG聚合計算就是用的這個AggregationUnit實現,核心實現是將AVG轉化後的SUM/COUNT,累加得到總SUM和總COUNT相除就是最終的AVG結果;

  1. @Override

  2. public void merge(final List<Comparable>> values) {

  3.    if (null == values || null == values.get(0) || null == values.get(1)) {

  4.        return;

  5.    }

  6.    if (null == count) {

  7.        count = new BigDecimal("0");

  8.    }

  9.    if (null == sum) {

  10.        sum = new BigDecimal("0");

  11.    }

  12.    // COUNT累加

  13.    count = count.add(new BigDecimal(values.get(0).toString()));

  14.    // SUM累加

  15.    sum = sum.add(new BigDecimal(values.get(1).toString()));

  16.    log.trace("AVG result COUNT: {} SUM: {}", count, sum);

  17. }

END

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖