歡迎光臨
每天分享高質量文章

【死磕Sharding-jdbc】—group by結果合併(2)

點擊上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

在sharding-jdbc原始碼之group by結果合併(1)中主要分析了sharding-jdbc如何在GroupByStreamResultSetMergerGroupByMemoryResultSetMerger中選擇,並分析了GroupByStreamResultSetMerger的實現;接下來分析GroupByMemoryResultSetMerger的實現原理;

通過sharding-jdbc原始碼之group by結果合併(1)的分析可知,如果要走GroupByMemoryResultSetMerger,那麼需要這樣的SQL: SELECT o.status,count(o.user_id)count_user_id FROM t_order o whereo.user_id=10groupbyo.status order bycount_user_id asc,即group by和order by的欄位不一樣;接下來的分析都是基於這條SQL;

ExecutorEngine.build()方法中通過 returnnewGroupByMemoryResultSetMerger(columnLabelIndexMap,resultSets,selectStatement);呼叫GroupByMemoryResultSetMerger,GroupByMemoryResultSetMerger的構造方法原始碼如下:

  1. public GroupByMemoryResultSetMerger(

  2.        final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {

  3.    // labelAndIndexMap就是select結果列與位置索引的map,例如{count_user_id:2, status:1}

  4.    super(labelAndIndexMap);

  5.    // select查詢陳述句

  6.    this.selectStatement = selectStatement;

  7.    // resultSets就是併發在多個實際表執行傳回的結果集合,在多少個實際表上執行,resultSets的size就有多大;

  8.    memoryResultSetRows = init(resultSets);

  9. }

在實際表torder0和torder1上執行SQL傳回的結果如下:

torder0和torder1結果.png

知道實際表的傳回結果後,後面的分析更容易理解;假定這些傳回結果用json表示為:{[{“status”:”NEW”, “countuserid”:1},{“status”:”VALID”, “countuserid”:1},{“status”:INIT, “countuserid”:2}],[{“status”:”VALID”, “countuserid”:1},{“status”:”INIT”, “countuserid”:1},{“status”:””NEW, “countuserid”:3}]}

init()方法原始碼如下:

  1. private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {

  2.    Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024);

  3.    Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024);

  4.    // 遍歷多個實際表執行傳回的結果集合中所有的結果,即2個實際表每個實際表3條結果,總計6條結果

  5.    for (ResultSet each : resultSets) {

  6.        while (each.next()) {

  7.            // each就是遍歷過程中的一條結果,selectStatement.getGroupByItems()即group by項,即status,將結果和group by項組成一個GroupByValue物件--實際是從ResultSet中取出group by項的值,例如NEW,VALID,INIT等

  8.            GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());

  9.            // initForFirstGroupByValue()分析如下

  10.            initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);

  11.            aggregate(each, groupByValue, aggregationMap);

  12.        }

  13.    }

  14.    // 將aggregationMap中的聚合計算結果封裝到dataMap中

  15.    setAggregationValueToMemoryRow(dataMap, aggregationMap);

  16.    // 將結果轉換成List形式

  17.    List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);

  18.    if (!result.isEmpty()) {

  19.        // 如果有結果,再將currentResultSetRow置為List的第一個元素

  20.        setCurrentResultSetRow(result.get(0));

  21.    }

  22.    // 傳回List的迭代器,後面的取結果,實際上就是迭代這個集合;

  23.    return result.iterator();

  24. }

initForFirstGroupByValue()原始碼如下:

  1. private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap,

  2.                                      final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {

  3.    // groupByValue如果是第一次出現,那麼在dataMap中初始化一條資料,key就是groupByValue,例如NEW;value就是new MemoryResultSetRow(resultSet),即將ResultSet中的結果取出來封裝到MemoryResultSetRow中,MemoryResultSetRow實際就一個屬性Object[] data,那麼data值就是這樣的["NEW", 1]

  4.    if (!dataMap.containsKey(groupByValue)) {

  5.        dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));

  6.    }

  7.    // groupByValue如果是第一次出現,那麼在aggregationMap中初始化一條資料,key就是groupByValue,例如NEW;value又是一個map,這個map的key就是select中有聚合計算的列,例如count(user_id),即count_user_id;value就是AggregationUnit的實現,count聚合計算的實現是AccumulationAggregationUnit

  8.    if (!aggregationMap.containsKey(groupByValue)) {

  9.        Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {

  10.            @Override

  11.            public AggregationUnit apply(final AggregationSelectItem input) {

  12.                // 根據聚合計算型別得到AggregationUnit的實現

  13.                return AggregationUnitFactory.create(input.getType());

  14.            }

  15.        });

  16.        aggregationMap.put(groupByValue, map);

  17.    }

  18. }

該方法都是為了接下來的聚合計算做準備工作;

aggregate()原始碼如下–即在記憶體中將多個實際表中傳回的結果進行聚合:

  1. private void aggregate(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {

  2.    // 遍歷select中所有的聚合型別,例如COUNT(o.user_id)

  3.    for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) {

  4.        List<Comparable>> values = new ArrayList<>(2);

  5.        if (each.getDerivedAggregationSelectItems().isEmpty()) {

  6.            values.add(getAggregationValue(resultSet, each));

  7.        } else {

  8.            for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {

  9.                values.add(getAggregationValue(resultSet, derived));

  10.            }

  11.        }

  12.        // 通過AggregationUnit實現類即AccumulationAggregationUnit進行聚合,實際上就是聚合本次遍歷到的ResultSet,聚合的臨時結果就在AccumulationAggregationUnit的屬性result中(AccumulationAggregationUnit聚合的本質就是累加)

  13.        aggregationMap.get(groupByValue).get(each).merge(values);

  14.    }

  15. }

經過 for(ResultSeteach :resultSets){while(each.next()){...遍歷所有結果並聚合計算後,aggregationMap這個map中已經聚合計算完後的結果,如下所示:

  1. {

  2.    "VALID": {

  3.        "COUNT(user_id)": 2

  4.    },

  5.    "INIT": {

  6.        "COUNT(user_id)": 5

  7.    },

  8.    "NEW": {

  9.        "COUNT(user_id)": 3

  10.    }

  11. }

再將aggregationMap中的結果封裝到 Map<GroupByValue,MemoryResultSetRow>dataMap這個map中,結果形式如下所示:

  1. {

  2.    "VALID": ["VALID", 2],

  3.    "INIT": ["INIT", 5],

  4.    "NEW": ["NEW", 3]

  5. }

MemoryResultSetRow的本質就是一個 Object[]data,所以其值是[“VALID”, 2],[“INIT”, 5]這種形式

將結果轉成 List<MemoryResultSetRow>,並且排序–如果有order by,那麼根據order by的值進行排序,否則根據group by的值排序:

  1. private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) {

  2.    List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values());

  3.    Collections.sort(result, new GroupByRowComparator(selectStatement));

  4.    return result;

  5. }

  6. @RequiredArgsConstructor

  7. public final class GroupByRowComparator implements Comparator<MemoryResultSetRow> {

  8.    private final SelectStatement selectStatement;

  9.    @Override

  10.    public int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2) {

  11.        if (!selectStatement.getOrderByItems().isEmpty()) {

  12.            return compare(o1, o2, selectStatement.getOrderByItems());

  13.        }

  14.        return compare(o1, o2, selectStatement.getGroupByItems());

  15.    }

  16.    ...

  17. }

到這裡,GroupByMemoryResultSetMerger即記憶體GROUP聚合計算已經分析完成,依舊通過運行過程圖解加深對GroupByMemoryResultSetMerger的理解,運行過程圖如下圖所示:

總結

正如GroupByMemoryResultSetMerger的名字一樣,其實現原理是把所有結果加載到記憶體中,在記憶體中進行計算,而GroupByMemoryResultSetMerger是流式計算方法,並不需要加載所有實際表傳回的結果到記憶體中。這樣的話,如果SQL傳回的總結果數比較多,GroupByMemoryResultSetMerger的處理方式就可能會撐爆記憶體;這個是使用sharding-jdbc一個非常需要註意的地方;

END

赞(0)

分享創造快樂

© 2021 知識星球   网站地图