歡迎光臨
每天分享高質量文章

【死磕Sharding-jdbc】—結果合併

點擊上方“Java技術驛站”,選擇“置頂公眾號”。

有內涵、有價值的文章第一時間送達!

單表查詢之結果合併

接下來以執行 SELECT o.*FROM t_order o whereo.user_id=10order byo.order_id desc limit 2,3分析下麵這段Java代碼是如何對結果進行合併的:

  1. result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());code>

  2. MergeEngine.merge()方法的原始碼如下:

  3. <code class="java">public ResultSetMerger merge() throws SQLException {

  4.    selectStatement.setIndexForItems(columnLabelIndexMap);

  5.    return decorate(build());

  6. }

build()方法原始碼如下:

  1. private ResultSetMerger build() throws SQLException {

  2.    // 說明:GroupBy***ResultSetMerger在第六篇文章單獨講解,所以此次分析的SQL條件中沒有group by

  3.    if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {

  4.        if (selectStatement.isSameGroupByAndOrderByItems()) {

  5.            return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);

  6.        } else {

  7.            return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);

  8.        }

  9.    }

  10.    // 如果select陳述句中有order by欄位,那麼需要OrderByStreamResultSetMerger對結果處理

  11.    if (!selectStatement.getOrderByItems().isEmpty()) {

  12.        return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());

  13.    }

  14.    return new IteratorStreamResultSetMerger(resultSets);

  15. }

根據這段代碼可知,其作用是根據sql陳述句選擇多個不同的ResultSetMerger對結果進行合併處理,ResultSetMerger實現有這幾種:GroupByStreamResultSetMerger,GroupByMemoryResultSetMerger,OrderByStreamResultSetMerger,IteratorStreamResultSetMerger,LimitDecoratorResultSetMerger;以測試SQL SELECT o.*FROM t_order o whereo.user_id=10order byo.order_id desc limit 2,3為例,沒有group by,但是有order by,所以使用到了OrderByStreamResultSetMerger和LimitDecoratorResultSetMerger對結果進行合併(GroupByStreamResultSetMerger&GroupByMemoryResultSetMerger;後面單獨講解)

decorate()原始碼如下:

  1. private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {

  2.    ResultSetMerger result = resultSetMerger;

  3.    // 如果SQL陳述句中有limist,還需要LimitDecoratorResultSetMerger配合進行結果歸併;

  4.    if (null != selectStatement.getLimit()) {

  5.        result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());

  6.    }

  7.    return result;

  8. }

接下來將以執行SQL: SELECT o.*FROM t_order o whereo.user_id=10order byo.order_id desc limit 2,3(該SQL會被改寫成 SELECT o.*,o.order_id AS ORDER_BY_DERIVED_0 FROM t_order_0 o whereo.user_id=?order byo.order_id desc limit 2,3)為例,一一講解OrderByStreamResultSetMerger,LimitDecoratorResultSetMerger和IteratorStreamResultSetMerger,瞭解這幾個ResultSetMerger的原理;

OrderByStreamResultSetMerger

OrderByStreamResultSetMerger的核心原始碼如下:

  1. private final Queue<OrderByValue> orderByValuesQueue;

  2. public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {

  3.    // sql中order by列的信息,實體sql是order by order_id desc,即此處就是order_id

  4.    this.orderByItems = orderByItems;

  5.    // 初始化一個優先級佇列,優先級佇列中的元素會根據OrderByValue中compareTo()方法排序,並且SQL重寫後發送到多少個標的實際表,List的size就有多大,Queue的capacity就有多大;

  6.    this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());

  7.    // 將結果壓入佇列中

  8.    orderResultSetsToQueue(resultSets);

  9.    isFirstNext = true;

  10. }

  11. private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {

  12.    // 遍歷resultSets--在多少個標的實際表上執行SQL,該集合的size就有多大

  13.    for (ResultSet each : resultSets) {

  14.        // 將ResultSet和排序列信息封裝成一個OrderByValue型別

  15.        OrderByValue orderByValue = new OrderByValue(each, orderByItems);

  16.        // 如果值存在,那麼壓入佇列中

  17.        if (orderByValue.next()) {

  18.            orderByValuesQueue.offer(orderByValue);

  19.        }

  20.    }

  21.    // 重置currentResultSet的位置:如果佇列不為空,那麼將佇列的頂部(peek)位置設置為currentResultSet的位置

  22.    setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());

  23. }

  24. @Override

  25. public boolean next() throws SQLException {

  26.    // 呼叫next()判斷是否還有值, 如果佇列為空, 表示沒有任何值, 那麼直接傳回false

  27.    if (orderByValuesQueue.isEmpty()) {

  28.        return false;

  29.    }

  30.    // 如果佇列不為空, 那麼第一次一定傳回true;即有結果可取(且將isFirstNext置為false,表示接下來的請求都不是第一次請求next()方法)

  31.    if (isFirstNext) {

  32.        isFirstNext = false;

  33.        return true;

  34.    }

  35.    // 從佇列中彈出第一個元素(因為是優先級佇列,所以poll()傳回的值,就是此次要取的值)

  36.    OrderByValue firstOrderByValue = orderByValuesQueue.poll();

  37.    // 如果它的next()存在,那麼將它的next()再添加到佇列中

  38.    if (firstOrderByValue.next()) {

  39.        orderByValuesQueue.offer(firstOrderByValue);

  40.    }

  41.    // 佇列中所有元素全部處理完後就傳回false

  42.    if (orderByValuesQueue.isEmpty()) {

  43.        return false;

  44.    }

  45.    // 再次重置currentResultSet的位置為佇列的頂部位置;

  46.    setCurrentResultSet(orderByValuesQueue.peek().getResultSet());

  47.    return true;

  48. }

繼續深入剖析:這段代碼初看可能有點繞,假設運行SQL SELECT o.*FROM t_order o whereo.user_id=10order byo.order_id desc limit 3會分發到兩個標的實際表,且第一個實際表傳回的結果是1,3,5,7,9;第二個實際表傳回的結果是2,4,6,8,10;那麼,經過OrderByStreamResultSetMerger的構造方法中的orderResultSetsToQueue()方法後, Queue<OrderByValue>orderByValuesQueue中包含兩個OrderByValue,一個是10,一個是9;接下來取值運行過程如下:

  1. 取得10,並且10的next()是8,然後執行orderByValuesQueue.offer(8);,這時候orderByValuesQueue中包含8和9;

  2. 取得9,並且9的next()是7,然後執行orderByValuesQueue.offer(7);,這時候orderByValuesQueue中包含7和8;

  3. 取得8,並且8的next()是6,然後執行orderByValuesQueue.offer(6);,這時候orderByValuesQueue中包含7和6; 取值數量已經達到limit 3的限制(原始碼在LimitDecoratorResultSetMerger中的next()方法中),退出;

這段代碼運行示意圖如下所示:

LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger核心原始碼如下:

  1. public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {

  2.    super(resultSetMerger);

  3.    // limit賦值(Limit物件包括limit m,n中的m和n兩個值)

  4.    this.limit = limit;

  5.    // 判斷是否會跳過所有的結果項,即判斷是否有符合條件的結果

  6.    skipAll = skipOffset();

  7. }

  8. private boolean skipOffset() throws SQLException {

  9.    // 假定limit.getOffsetValue()就是offset,實體sql中為limit 2,3,所以offset=2

  10.    for (int i = 0; i < limit.getOffsetValue(); i++) {

  11.        // 嘗試從OrderByStreamResultSetMerger生成的優先級佇列中跳過offset個元素,如果.next()一直為true,表示有足夠符合條件的結果,那麼傳回false;否則沒有足夠符合條件的結果,那麼傳回true;即skilAll=true就表示跳過了所有沒有符合條件的結果;

  12.        if (!getResultSetMerger().next()) {

  13.            return true;

  14.        }

  15.    }

  16.    // limit m,n的sql會被重寫為limit 0, m+n,所以limit.isRowCountRewriteFlag()為true,rowNumber的值為0;

  17.    rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue();

  18.    return false;

  19. }

  20. @Override

  21. public boolean next() throws SQLException {

  22.    // 如果skipAll為true,即跳過所有,表示沒有任何符合條件的值,那麼傳回false

  23.    if (skipAll) {

  24.        return false;

  25.    }

  26.    if (limit.getRowCountValue() > -1) {

  27.        // 每次next()獲取值後,rowNumber自增,當自增rowCountValue次後,就不能再往下繼續取值了,因為條件limit 2,3(rowCountValue=3)限制了

  28.        return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();

  29.    }

  30.    return getResultSetMerger().next();

  31. }

IteratorStreamResultSetMerger

構造方法核心原始碼:

  1. private final Iterator<ResultSet> resultSets;

  2. public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {

  3.    // 將List改成Iterator,方便接下來迭代取得結果;

  4.    this.resultSets = resultSets.iterator();

  5.    // 重置currentResultSet

  6.    setCurrentResultSet(this.resultSets.next());

  7. }

END

赞(0)

分享創造快樂

© 2021 知識星球   网站地图