歡迎光臨
每天分享高質量文章

用 Java 實現 Stream 高效混排與 Spliterator

對 Stream 執行排序操作只要呼叫排序 API 就好了,要實現相反的效果(混排)卻並不簡單。 

本文介紹瞭如何使用 Java Stream `Collectors` 工廠方法與自定義 `Spliterator` 對 Stream 進行 Shuffle(混排),支持 Eager 與 Lazy 兩種樣式。

 

1. Eager Shuffle Collector

 

Heinz [在這篇文章][1]中給出了一種解決方案:將整個 Stream 轉換為 list,對 list 執行 `Collections#shuffle`,再轉為 Stream。像下麵這樣封裝成一個複合操作:

 

[1]:https://www.javaspecialists.eu/archive/Issue258.html

 

```java
public static  Collector> toEagerShuffledStream() {
    return Collectors.collectingAndThen(
      toList(),
      list -> {
          Collections.shuffle(list);
          return list.stream();
      });
}
```

 

這種方法適用於對 Steam 中所有元素進行混排。由於會提前對集合中所有元素進行 Shuffle,如果只處理其中一部分則效果不佳,極端情況比如 Stream 只包含1個元素。

 

讓我們來看看一個簡單基準測試的運行結果:

 

```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
    private List source;

    @Param({"1", "10", "100", "1000", "10000", "10000"})
    public int limit;

    @Param({"100000"})
    public int size;

    @Setup(Level.Iteration)
    public void setUp() {
        source = IntStream.range(0, size)
          .boxed()
          .map(Object::toString)
          .collect(Collectors.toList());
    }

    @Benchmark
    public List eager() {
        return source.stream()
          .collect(toEagerShuffledStream())
          .limit(limit)
          .collect(Collectors.toList());
    }
```

 

```shell
            (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
```

 

從上面的資料可以看出,儘管運行結果 Stream 中元素不斷增加,運行效果還是相當不錯。因此,對整個集合提前混排太浪費了,尤其是元素較少的時候得分很差。

 

讓我們看看來有什麼好辦法。

 

2. Lazy Shuffle Collector

 

為了節省 CPU 資源,與其對集合中所有元素預處理,不如根據需要只處理其中一部分。

 

為了達到這個效果,需要自定義一個 Spliterator 對所有對元素隨機遍歷,然後通過 `StreamSupport.stream` 構造創建一個 Stream 物件:

 

```java
public class RandomSpliterator implements Spliterator {
    // ...
    public static  Collector> toLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toList(),
          list -> StreamSupport.stream(
            new ShuffledSpliterator<>(list), false));
    }
}
```

 

3. 實現細節

 

即使只取出一個隨機元素,也不能避免計算整個 Steam 中的元素(這意味著不支持無限序列)。因此,可以用 `List` 初始化 `RandomSpliterator`。“註意,這裡有一個陷阱”。

 

如果給定 `List` 不支持在常量時間內完成隨機訪問,這種方案要比 Eager 方案慢得多。為了避免這種情況,可以在實體化 `Spliterator` 的時候進行簡單檢查:

 

```java
private RandomSpliterator(
  List source, Supplier extends Random> random) {
    if (source.isEmpty()) { ... } // throw
    this.source = source instanceof RandomAccess 
      ? source 
      : new ArrayList<>(source);
    this.random = random.get();
}
```

 

相比隨機訪問時間複雜度不是 O(1) 的實現,創建 `ArrayList` 的成本可以忽略不計。

 

現在重寫最重要的 `tryAdvance()` 方法。實現很簡單,每次迭代都從 `source` 集合中隨機挑選並刪除一個元素。

 

不必擔心 `source` 發生改變。這裡不發佈 `RandomSpliterator`,只傳回基於它的一個 `Collector`:

 

```java
@Override
public boolean tryAdvance(Consumer super T> action) {
    int remaining = source.size();
    if (remaining > 0 ) {
        action.accept(source.remove(random.nextInt(remaining)));
        return true;
    } else {
        return false;
    }
}
```

 

除此之外,還需要實現其它3個方法:

 

```java
@Override
public Spliterator trySplit() {
    return null; // 表示 split 可不行
}

@Override
public long estimateSize() {
    return source.size();
}

@Override
public int characteristics() {
    return SIZED;
}
```

 

現在檢查一下是否有效果:

 

```java
IntStream.range(0, 10).boxed()
  .collect(toLazyShuffledStream())
  .forEach(System.out::println);
```

 

結果如下:

 

```shell
3
4
8
1
7
6
5
0
2
9
```

 

4. 性能考慮

 

在這個實現中,我們把大小為 N 的陣列換成 M 查找或刪除:

 

  • N:集合大小
  • M:挑選元素的數量

 

從 `ArrayList` 中查找或刪除單個元素通常比交換開銷大,因此方案的可擴展性不夠好。但是對於 M 值較小的時候性能會好很多。

 

現在對比 Eager 方案(都包含100000個物件):

 

```shell
            (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
lazy              1  thrpt    5  1530.763 ±  72.096  ops/s
lazy             10  thrpt    5  1462.305 ±  23.860  ops/s
lazy            100  thrpt    5   823.212 ± 119.771  ops/s
lazy           1000  thrpt    5   166.786 ±  16.306  ops/s
lazy          10000  thrpt    5    19.475 ±   4.052  ops/s
lazy         100000  thrpt    5     4.097 ±   0.416  ops/s
```

 

(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)

 

可以明顯看到,如果資料流元素較少,新方案的性能優於前者。但隨著“處理數量/集合大小”增加,吞吐量急劇下降。

 

這是因為從 `ArrayList` 中移除元素會帶來額外開銷,每次移除都會呼叫 `System#arraycopy` 對內部陣列執行移位操作,開銷較大。

 

對於較大的集合(1000000個元素)可以看到類似的樣式:

 

```shell
      (limit)    (size)   Mode  Cnt  Score   Err  Units
eager       1  10000000  thrpt    5  0.915        ops/s
eager      10  10000000  thrpt    5  0.783        ops/s
eager     100  10000000  thrpt    5  0.965        ops/s
eager    1000  10000000  thrpt    5  0.936        ops/s
eager   10000  10000000  thrpt    5  0.860        ops/s
lazy        1  10000000  thrpt    5  4.338        ops/s
lazy       10  10000000  thrpt    5  3.149        ops/s
lazy      100  10000000  thrpt    5  2.060        ops/s
lazy     1000  10000000  thrpt    5  0.370        ops/s
lazy    10000  10000000  thrpt    5  0.05         ops/s
```

 

(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)

 

在更小集合(128個元素)上的表現:

 

```shell
       (limit)    (size)   Mode  Cnt       Score   Error  Units
eager        2     128    thrpt    5  246439.459          ops/s
eager        4     128    thrpt    5  333866.936          ops/s
eager        8     128    thrpt    5  340296.188          ops/s
eager       16     128    thrpt    5  345533.673          ops/s
eager       32     128    thrpt    5  231725.156          ops/s
eager       64     128    thrpt    5  314324.265          ops/s
eager      128     128    thrpt    5  270451.992          ops/s
lazy         2     128    thrpt    5  765989.718          ops/s
lazy         4     128    thrpt    5  659421.041          ops/s
lazy         8     128    thrpt    5  652685.515          ops/s
lazy        16     128    thrpt    5  470346.570          ops/s
lazy        32     128    thrpt    5  324174.691          ops/s
lazy        64     128    thrpt    5  186472.090          ops/s
lazy       128     128    thrpt    5  108105.699          ops/s
```

 

(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)

 

能不能進一步優化?

 

5. 進一步提高性能

 

不幸的是,現有的解決方案擴展性不盡如人意,讓我們試著改進。但在此之前,先對現有操作進行測評:

 

(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)

 

不出意外,`Arraylist#remove` 是開銷最大的操作之一。換句話說,從 `ArrayList` 中刪除元素耗費了大量 CPU 資源。

 

為什麼呢?從 `ArrayList` 中刪除元素會對底層實現的陣列執行移除操作。問題是,Java 陣列不會自動調整大小,每次移除都會創建一個更小的新陣列:

 

```java
private void fastRemove(Object[] es, int i) {
    modCount++;
    final int newSize;
    if ((newSize = size - 1) > i)
        System.arraycopy(es, i + 1, es, i, newSize - i);
    es[size = newSize] = null;
}
```

 

接下來該怎麼辦?避免從 `ArrayList` 中移除元素。

 

為了達到這個效果,可以用一個陣列儲存剩餘的元素並記錄它的大小:

 

```java
public class ImprovedRandomSpliterator implements Spliterator {
    private final Random random;
    private final T[] source;
    private int size;
    private ImprovedRandomSpliterator(
      List source, Supplier extends Random> random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException(...);
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }
}
```

 

幸運的是,由於 `Spliterator` 的實體不會在執行緒之間共享,因此不會遇到併發問題。

 

現在嘗試移除元素時,實際上不需要創建縮小後的新陣列。相反,只要減小 `size` 並忽略陣列的其餘部分即可。

 

在此之前,把最後一個元素與傳回的元素交換:

 

```java
@Override
public boolean tryAdvance(Consumer super T> action) {
    if (size > 0) {
        int nextIdx = random.nextInt(size);
        int lastIdx = size - 1;
        action.accept(source[nextIdx]);
        source[nextIdx] = source[lastIdx];
        source[lastIdx] = null; // let object be GCed
        size--;
        return true;
    } else {
        return false;
    }
}
```

 

對改進後的方案進行評測,可以看到開銷最大的呼叫已經消失了:

 

(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)

 

準備在此運行基準測試進行比較:

 

```shell
               (limit)  (size)   Mode  Cnt     Score     Error  Units
eager                1  100000  thrpt    3   456.811 ±  20.585  ops/s
eager               10  100000  thrpt    3   469.635 ±  23.281  ops/s
eager              100  100000  thrpt    3   466.486 ±  68.820  ops/s
eager             1000  100000  thrpt    3   454.459 ±  13.103  ops/s
eager            10000  100000  thrpt    3   443.640 ±  96.929  ops/s
eager           100000  100000  thrpt    3   335.134 ±  21.944  ops/s
lazy                 1  100000  thrpt    3  1587.536 ± 389.128  ops/s
lazy                10  100000  thrpt    3  1452.855 ± 406.879  ops/s
lazy               100  100000  thrpt    3   814.978 ± 242.077  ops/s
lazy              1000  100000  thrpt    3   167.825 ± 129.559  ops/s
lazy             10000  100000  thrpt    3    19.782 ±   8.596  ops/s
lazy            100000  100000  thrpt    3     3.970 ±   0.408  ops/s
lazy_improved        1  100000  thrpt    3  1509.264 ± 170.423  ops/s
lazy_improved       10  100000  thrpt    3  1512.150 ± 143.927  ops/s
lazy_improved      100  100000  thrpt    3  1463.093 ± 593.370  ops/s
lazy_improved     1000  100000  thrpt    3  1451.007 ±  58.948  ops/s
lazy_improved    10000  100000  thrpt    3  1148.581 ± 232.218  ops/s
lazy_improved   100000  100000  thrpt    3   383.022 ±  97.082  ops/s
```

 

(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)

 

從上面的結果可以看出,改進後的方案性能受元素數量變化影響顯著減小。

 

實際上,即使遇到最差情況,改進方案的性能也比基於 `Collections#shuffle` 的方案略好一些。

 

6. 完整示例

 

完整示例可以在 [GitHub][2] 上找到。

 

[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream

 

```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator implements Spliterator {
    private final Random random;
    private final T[] source;
    private int size;
    ImprovedRandomSpliterator(List source, Supplier extends Random> random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }
     @Override
    public boolean tryAdvance(Consumer super T> action) {
        if (size > 0) {
            int nextIdx = random.nextInt(size);
            int lastIdx = size - 1;
            action.accept(source[nextIdx]);
            source[nextIdx] = source[lastIdx];
            source[lastIdx] = null; // let object be GCed
            size--;
            return true;
        } else {
            return false;
        }
    }
    @Override
    public Spliterator trySplit() {
        return null;
    }
    @Override
    public long estimateSize() {
        return source.length;
    }
    @Override
    public int characteristics() {
        return SIZED;
    }
}
```

 

```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {

    private RandomCollectors() {
    }

    public static  Collector> toImprovedLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static  Collector> toLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static  Collector> toEagerShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> {
              Collections.shuffle(list);
              return list.stream();
          });
    }
}
```

    赞(0)

    分享創造快樂