歡迎光臨
每天分享高質量文章

Java 8 並行流介紹

(給ImportNew加星標,提高Java技能)

編譯:ImportNew/唐尤華

medium.com/javarevisited/java-8-parallel-stream-java2blog-e1254e593763

 

在這篇文章中,我們將介紹 Java 並行流(Parallel Stream)。

 

[Java 8][1] 引入了”並行流“概念實現並行處理。隨著硬體成本降低,現在的 CPU 大都擁有多個核心,因此可以使用並行處理加快操作執行。

 

[1]:https://java2blog.com/java-8-tutorial/

 

讓我們通過一個簡單的例子來幫助理解:

 

```java
package org.arpit.java2blog.java8;
import java.util.Arrays;
import java.util.stream.IntStream;
public class Java8ParallelStreamMain {
  public static void main(String[] args) {
    System.out.println("=================================");
    System.out.println("Using Sequential Stream");
    System.out.println("=================================");
    int[] array = {1,2,3,4,5,6,7,8,9,10};
    IntStream intArrStream = Arrays.stream(array);
    intArrStream.forEach(s->
    {
      System.out.println(s+" "+Thread.currentThread().getName());
    });
    System.out.println("=================================");
    System.out.println("Using Parallel Stream");
    System.out.println("=================================");
    IntStream intParallelStream=Arrays.stream(array).parallel();
    intParallelStream.forEach(s->
    {
      System.out.println(s+" "+Thread.currentThread().getName());
    });
  }
}
```

 

運行上面的程式,輸出結果如下:

 

```shell
=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1
```

 

仔細觀察輸出結果可以看到,在順序流情況下,主執行緒將完成所有工作。主執行緒會等待當前迭代完成,然後接著進行下一次迭代。

 

在[並行流][2]的情況下,會同時產生4個執行緒,在內部使用 `ForkJoinPool` 創建和管理執行緒。並行流通過 `ForkJoinPool.commonPool()` 靜態方法創建 `ForkJoinPool` 實體。

 

[2]:http://www.java67.com/2018/10/java-8-stream-and-functional-programming-interview-questions-answers.html

 

並行流會利用所有可用 CPU 核心並行處理任務。如果任務數超過了 CPU 核心數量,那麼其餘任務將等待當前正在運行的任務完成。

 

並行流很酷,可以一直用嗎?

 

“當然不是!”

 

雖然只加上 `.parallel` 就可以把[一個流][3]轉為並行流,但這並不表示所有情況都適用。

 

[3]:https://java2blog.com/java-8-stream-filter-examples/

 

在使用並行流時,需要考慮許多因素,否則將會遭遇並行流帶來的負面影響。

 

並行流的開銷遠高於順序流,而且協調執行緒需要花費大量時間。

 

[4]:http://www.java67.com/2014/04/java-8-stream-examples-and-tutorial.html

 

當且僅當以出現下麵的情況時,需要考慮使用並行流:

 

  •  需要處理大量資料;
  • 正如你所知道的那樣,Java 使用了 [ForkJoinPool][5] 實現並行。`ForkJoinPool` 會把輸入流拆分後提交執行,因此需要確保輸入流是可拆分的。

 

[5]:http://javarevisited.blogspot.sg/2016/12/difference-between-executor-framework-and-ForkJoinPool-in-Java.html

 

例如:

 

[ArrayList][6] 非常易於拆分,因為可以通過索引找到一個中間元素進行拆分。但 `LinkedList` 很難拆分,而且大多數情況下運行效果並不好。

 

  • 實際使用中會遇到麻煩的性能問題;
  • 需要確保執行緒之間的所有共享資源都能正確同步,否則可能會產生意外的結果。

 

[6]:https://javarevisited.blogspot.com/2011/05/example-of-arraylist-in-java-tutorial.html

 

衡量並行性最簡單的公式是 Brian Goetz 在他的演講中提到的 “NQ” 模型。

 

NQ 模型:

 

```
N x Q >10000
```

 

這裡,

 

  • N = 資料元素數量
  • Q = 每個元素執行的工作量

 

這意味著,如果資料元素數量很大且每個元素執行的工作量較小(如加法操作),並行性可能會讓程式更快地運行,反之亦然。如果資料元素數量較少且每個元素執行的工作量較大(比如做一些複雜的計算工作),並行處理也可能會更快地得到結果。

 

讓我們看看另一個例子。

 

下麵的示例將展示在執行長時間計算時,採用並行流或順序流 CPU 的不同表現。我們將做一些耗時的計算讓 CPU 忙碌起來。

 

```java
package org.arpit.java2blog.java8;
import java.util.ArrayList;
import java.util.List;
public class PerformanceComparisonMain {
  public static void main(String[] args) {
    
    long currentTime=System.currentTimeMillis();
    List data=new ArrayList();
    for (int i = 0; i < 100000; i++) {
      data.add(i);
    }
    
    long sum=data.stream()
        .map(i ->(int)Math.sqrt(i))
        .map(number->performComputation(number))
        .reduce(0,Integer::sum);
    
    System.out.println(sum);
    long endTime=System.currentTimeMillis();
    System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
    
  }
  
  public static int performComputation(int number)
  {
    int sum=0;
    for (int i = 1; i < 1000000; i++) {
      int div=(number/i);
      sum+=div;
      
    }
    return sum;
  }
}
```

 

運行上面的程式結果如下:

 

>>>
117612733
Time taken to complete:6 minutes
>>>

 

然而這裡不關心輸出,而是關心在執行操作時 CPU 的表現。

 

 

如圖所示,在順序流情況下,CPU 沒有得到充分利用。

 

讓我們修改第16行使並行流,再次運行程式。

 

```java
long sum=data.stream()
        .parallel()
        .map(i ->(int)Math.sqrt(i))
        .map(number->performComputation(number))
        .reduce(0,Integer::sum);
```

 

使用並行流執行,輸出的結果中耗時明顯減少。

 

>>>
117612733
Time taken to complete:3 minutes
>>>

 

查看使用並行流運行程式時 CPU 的歷史記錄。

 

 

如圖所示,並行流使用了所有4個 CPU 核心進行計算。

 

以上是有關 Java 並行流的所有介紹。

赞(0)

分享創造快樂