歡迎光臨
每天分享高質量文章

Core Java 併發:理解併發概念

精品專欄

 

來自:唐尤華

https://dzone.com/refcardz/core-java-concurrency

1. 簡介

從誕生開始,Java 就支援執行緒、鎖等關鍵的併發概念。這篇文章旨在為使用了多執行緒的 Java 開發者理解 Core Java 中的併發概念以及使用方法。

2. 概念

2.1 競爭條件

多個執行緒對共享資源執行一系列操作,根據每個執行緒的操作順序可能存在幾種結果,這時出現競爭條件。下麵的程式碼不是執行緒安全的,而且可以不止一次地初始化 value,因為 check-then-act(檢查 null,然後初始化),所以延遲初始化的欄位不具備原子性:

  1. class Lazy <T> {
  2.  private volatile T value;
  3.  T get() {
  4.    if (value == null)
  5.      value = initialize();
  6.    return value;
  7.  }
  8. }

2.2 資料競爭

兩個或多個執行緒試圖訪問同一個非 final 變數並且不加上同步機制,這時會發生資料競爭。沒有同步機制可能導致這樣的情況,執行緒執行過程中做出其他執行緒無法看到的更改,因而導致讀到修改前的資料。這樣反過來可能又會導致無限迴圈、破壞資料結構或得到錯誤的計算結果。下麵這段程式碼可能會無限迴圈,因為讀執行緒可能永遠不知道寫執行緒所做的更改:

  1. class Waiter implements Runnable {
  2.  private boolean shouldFinish;
  3.  void finish() { shouldFinish = true; }
  4.  public void run() {
  5.    long iteration = 0;
  6.    while (!shouldFinish) {
  7.      iteration++;
  8.    }
  9.    System.out.println("Finished after: " + iteration);
  10.  }
  11. }
  12.  
  13. class DataRace {
  14.  public static void main(String[] args) throws InterruptedException {
  15.    Waiter waiter = new Waiter();
  16.    Thread waiterThread = new Thread(waiter);
  17.    waiterThread.start();
  18.    waiter.finish();
  19.    waiterThread.join();
  20.  }
  21. }

3. Java 記憶體模型:happens-before 關係

Java 記憶體模型定義基於一些操作,比如讀寫欄位、 Monitor 同步等。這些操作可以按照 happens-before 關係進行排序。這種關係可用來推斷一個執行緒何時看到另一個執行緒的操作結果,以及構成一個程式同步後的所有資訊。

happens-before 關係具備以下特性:

  • 在執行緒開始所有操作前呼叫 Thread#start
  • 在獲取 Monitor 前,釋放該 Monitor
  • 在讀取 volatile 變數前,對該變數執行一次寫操作
  • 在寫入 final 變數前,確保在物件取用已存在
  • 執行緒中的所有操作應在 Thread#join 傳回之前完成

4. 標準同步特性

4.1 synchronized 關鍵字

使用 synchronized 關鍵字可以防止不同執行緒同時執行相同程式碼塊。由於進入同步執行的程式碼塊之前加鎖,受該鎖保護的資料可以在排他樣式下操作,從而讓操作具備原子性。此外,其他執行緒在獲得相同的鎖後也能看到操作結果。

  1. class AtomicOperation {
  2.  private int counter0;
  3.  private int counter1;
  4.  void increment() {
  5.    synchronized (this) {
  6.      counter0++;
  7.      counter1++;
  8.    }
  9.  }
  10. }

也可以在方法上加 synchronized 關鍵字。

表2 當整個方法都標記 synchronized 時使用的 Monitor

鎖是可重入的。如果執行緒已經持有鎖,它可以再次成功地獲得該鎖。

  1. class Reentrantcy {
  2.  synchronized void doAll() {
  3.    doFirst();
  4.    doSecond();
  5.  }
  6.  synchronized void doFirst() {
  7.    System.out.println("First operation is successful.");
  8.  }
  9.  synchronized void doSecond() {
  10.    System.out.println("Second operation is successful.");
  11.  }
  12. }

競爭的程度對獲取 Monitor 的方式有影響:

表3: Monitor 狀態

4.2 wait/notify

wait/notify/notifyAll 方法在 Object 類中宣告。如果之前設定了超時,執行緒進入 WAITING 或 TIMED_WAITING 狀態前保持 wait狀態。要喚醒一個執行緒,可以執行下列任何操作:

  • 另一個執行緒呼叫 notify 將喚醒任意一個在 Monitor 上等待的執行緒。
  • 另一個執行緒呼叫 notifyAll 將喚醒所有在等待 Monitor 上等待的執行緒。
  • 呼叫 Thread#interrupt 後會丟擲 InterruptedException 異常。

最常見的樣式是條件迴圈:

  1. class ConditionLoop {
  2.  private boolean condition;
  3.  synchronized void waitForCondition() throws InterruptedException {
  4.    while (!condition) {
  5.      wait();
  6.    }
  7.  }
  8.  synchronized void satisfyCondition() {
  9.    condition = true;
  10.    notifyAll();
  11.  }
  12. }
  • 請記住,在物件上呼叫 wait/notify/notifyAll,需要首先獲得該物件的鎖
  • 在檢查等待條件的迴圈中保持等待:這解決了另一個執行緒在等待開始之前即滿足條件時的計時問題。 此外,這樣做還可以讓你的程式碼免受可能(也的確會)發生的虛假喚醒
  • 在呼叫 notify/notifyAll 前,要確保滿足等待條件。如果不這樣做會引發通知,然而沒有執行緒能夠避免等待迴圈

4.3 volatile 關鍵字

volatile 解決了可見性問題,讓修改成為原子操作。由於存在 happens-before 關係,在接下來讀取 volatile 變數前,先對 volatile 變數進行寫操作。 從而保證了對該欄位的任何讀操作都能督讀到最近一次修改後的值。

  1. class VolatileFlag implements Runnable {
  2.  private volatile boolean shouldStop;
  3.  public void run() {
  4.    while (!shouldStop) {
  5.      // 執行操作
  6.    }
  7.    System.out.println("Stopped.");
  8.  }
  9.  void stop() {
  10.    shouldStop = true;
  11.  }
  12.  public static void main(String[] args) throws InterruptedException {
  13.    VolatileFlag flag = new VolatileFlag();
  14.    Thread thread = new Thread(flag);
  15.    thread.start();
  16.    flag.stop();
  17.    thread.join();
  18.  }
  19. }

4.4 Atomic

java.util.concurrent.atomic package 包含了一組類,它們用類似 volatile 的無鎖方式支援單個值的原子複合操作。

使用 AtomicXXX 類,可以實現 check-then-act 原子操作:

  1. class CheckThenAct {
  2.  private final AtomicReference<String> value = new AtomicReference<>();
  3.  void initialize() {
  4.    if (value.compareAndSet(null, "Initialized value")) {
  5.      System.out.println("Initialized only once.");
  6.    }
  7.  }
  8. }

AtomicInteger 和 AtomicLong 都提供原子 increment/decrement 操作:

  1. class Increment {
  2.  private final AtomicInteger state = new AtomicInteger();
  3.  void advance() {
  4.    int oldState = state.getAndIncrement();
  5.    System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'.");
  6.  }
  7. }

如果你希望有這樣一個計數器,不需要在獲取計數的時候具備原子性,可以考慮用 LongAdder 取代 AtomicLong/AtomicInteger。 LongAdder 能在多個單元中存值併在需要時增加計數,因此在競爭激烈的情況下表現更好。

4.5 ThreadLocal

一種在執行緒中包含資料但不用鎖的方法是使用 ThreadLocal 儲存。從概念上講,ThreadLocal 可以看做每個 Thread 存有一份自己的變數。Threadlocal 通常用於儲存每個執行緒的值,比如“當前事務”或其他資源。 此外,還可以用於維護每個執行緒的計數器、統計資訊或 ID 生成器。

  1. class TransactionManager {
  2.  private final ThreadLocal<Transaction> currentTransaction
  3.      = ThreadLocal.withInitial(NullTransaction::new);
  4.  Transaction currentTransaction() {
  5.    Transaction current = currentTransaction.get();
  6.    if (current.isNull()) {
  7.      current = new TransactionImpl();
  8.      currentTransaction.set(current);
  9.    }
  10.    return current;
  11.  }
  12. }

5. 安全地釋出物件

想讓一個物件在當前作用域外使用可以釋出物件,例如從 getter 傳回該物件的取用。 要確保安全地釋出物件,僅在物件完全構造好後釋出,可能需要同步。 可以透過以下方式安全地釋出:

  • 靜態初始化器。只有一個執行緒可以初始化靜態變數,因為類的初始化在獲取排他鎖條件下完成。
  1. class StaticInitializer {
  2.  // 無需額外初始化條件,釋出一個不可變物件
  3.  public static final Year year = Year.of(2017);
  4.  public static final Set<String> keywords;
  5.  // 使用靜態初始化器構造複雜物件
  6.  static {
  7.    // 建立可變集合
  8.    Set<String> keywordsSet = new HashSet<>();
  9.    // 初始化狀態
  10.    keywordsSet.add("java");
  11.    keywordsSet.add("concurrency");
  12.    // 設定 set 不可修改
  13.    keywords = Collections.unmodifiableSet(keywordsSet);
  14.  }
  15. }
  • volatile 欄位。由於寫入 volatile 變數發生在讀操作之前,因此讀執行緒總能讀到最新的值。
  1. class Volatile {
  2.  private volatile String state;
  3.  void setState(String state) {
  4.    this.state = state;
  5.  }
  6.  String getState() {
  7.    return state;
  8.  }
  9. }
  • Atomic。例如 AtomicInteger 將值儲存在 volatile 欄位中,所以 volatile 變數的規則在這裡也適用。
  1. class Atomics {
  2.  private final AtomicInteger state = new AtomicInteger();
  3.  void initializeState(int state) {
  4.    this.state.compareAndSet(0, state);
  5.  }
  6.  int getState() {
  7.    return state.get();
  8.  }
  9. }
  • final 欄位
  1. class Final {
  2.  private final String state;
  3.  Final(String state) {
  4.    this.state = state;
  5.  }
  6.  String getState() {
  7.    return state;
  8.  }
  9. }

確保在物件構造期間不會修改此取用。

  1. class ThisEscapes {
  2. private final String name;
  3. ThisEscapes(String name) {
  4.   Cache.putIntoCache(this);
  5.   this.name = name;
  6. }
  7. String getName() { return name; }
  8. }
  9. class Cache {
  10. private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>();
  11. static void putIntoCache(ThisEscapes thisEscapes) {
  12.   // 'this' 取用在物件完全構造之前發生了改變
  13.   CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes);
  14. }
  15. }
  • 正確同步欄位
  1. class Synchronization {
  2.  private String state;
  3.  synchronized String getState() {
  4.    if (state == null)
  5.      state = "Initial";
  6.    return state;
  7.  }
  8. }

6. 不可變物件

不可變物件的一個重要特徵是執行緒安全,因此不需要同步。要成為不可變物件:

  • 所有欄位都標記 final
  • 所有欄位必須是可變或不可變的物件,註意不要改變物件作用域,否則構造後不能改變物件狀態
  • this 取用在構造物件時不要洩露
  • 類標記 final,子類無法多載改變類的行為
  • 不可變物件示例:
  1. // 標記為 final,禁止繼承
  2. public final class Artist {
  3.  // 不可變變數,欄位標記 final
  4.  private final String name;
  5.  // 不可變變數集合, 欄位標記 final
  6.  private final List<Track> tracks;
  7.  public Artist(String name, List<Track> tracks) {
  8.    this.name = name;
  9.    // 防禦性複製
  10.    List<Track> copy = new ArrayList<>(tracks);
  11.    // 使可變集合不可修改
  12.    this.tracks = Collections.unmodifiableList(copy);
  13.    // 構造物件期間,'this' 不傳遞到任何其他地方
  14.  }
  15.  // getter、equals、hashCode、toString 方法
  16. }
  17. // 標記為 final,禁止繼承
  18. public final class Track {
  19.  // 不可變變數,欄位標記 final
  20.  private final String title;
  21.  public Track(String title) {
  22.    this.title = title;
  23.  }
  24.  // getter、equals、hashCode、toString 方法
  25. }

7. 執行緒

java.lang.Thread 類用於表示應用程式執行緒或 JVM 執行緒。 程式碼始終在某個 Thread 類的背景關係中執行,使用 Thread#currentThread() 可傳回自己的當前執行緒。

表4 執行緒狀態

表5 執行緒協調方法

7.1 如何處理 InterruptedException?

  • 清理所有資源,併在當前執行級別盡可能能完成執行緒執行
  • 當前方法宣告丟擲 InterruptedException。
  • 如果方法沒有宣告丟擲 InterruptedException,那麼應該透過呼叫 Thread.currentThread().interrupt() 將中斷標誌恢復為 true。 並且在這個級別上丟擲更合適的異常。為了能在更高呼叫級別上處理中斷,把中斷標誌設定為 true 非常重要

7.2 處理意料之外的異常

執行緒可以指定一個 UncaughtExceptionHandler 接收由於發生未捕獲異常導致執行緒突然終止的通知。

  1. Thread thread = new Thread(runnable);
  2. thread.setUncaughtExceptionHandler((failedThread, exception) -> {
  3.  logger.error("Caught unexpected exception in thread '{}'.",
  4.      failedThread.getName(), exception);
  5. });
  6. thread.start();

8. 活躍度

8.1 死鎖

有多個執行緒,每個執行緒都在等待另一個執行緒持有的資源,形成一個獲取資源的執行緒迴圈,這時會發生死鎖。最典型的資源是物件 Monitor ,但也可能是任何可能導致阻塞的資源,例如 wait/notify。

下麵的程式碼可能產生死鎖:

  1. class Account {
  2.  private long amount;
  3.  void plus(long amount) { this.amount += amount; }
  4.  void minus(long amount) {
  5.    if (this.amount < amount)
  6.      throw new IllegalArgumentException();
  7.    else
  8.      this.amount -= amount;
  9.  }
  10.  static void transferWithDeadlock(long amount, Account first, Account second){
  11.    synchronized (first) {
  12.      synchronized (second) {
  13.        first.minus(amount);
  14.        second.plus(amount);
  15.      }
  16.    }
  17.  }
  18. }

如果同時出現以下情況,就會發生死鎖:

  • 一個執行緒正試圖從第一個帳戶切換到第二個帳戶,並已獲得了第一個帳戶的鎖
  • 另一個執行緒正試圖從第二個帳戶切換到第一個帳戶,並已獲得第二個帳戶的鎖

避免死鎖的方法:

  • 按順序加鎖:總是以相同的順序獲取鎖
  1. class Account {
  2.  private long id;
  3.  private long amount;
  4.  // 此處略去了一些方法
  5.  static void transferWithLockOrdering(long amount, Account first, Account second){
  6.    boolean lockOnFirstAccountFirst = first.id < second.id;
  7.    Account firstLock = lockOnFirstAccountFirst  ? first  : second;
  8.    Account secondLock = lockOnFirstAccountFirst ? second : first;
  9.    synchronized (firstLock) {
  10.      synchronized (secondLock) {
  11.        first.minus(amount);
  12.        second.plus(amount);
  13.      }
  14.    }
  15.  }
  16. }
  • 鎖定超時:獲取鎖時不要無限期阻塞,而是釋放所有鎖並重試
  1. class Account {
  2.  private long amount;
  3.  // 此處略去了一些方法
  4.  static void transferWithTimeout(
  5.      long amount, Account first, Account second, int retries, long timeoutMillis
  6.  ) throws InterruptedException {
  7.    for (int attempt = 0; attempt < retries; attempt++) {
  8.      if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
  9.      {
  10.        try {
  11.          if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
  12.          {
  13.            try {
  14.              first.minus(amount);
  15.              second.plus(amount);
  16.            }
  17.            finally {
  18.              second.lock.unlock();
  19.            }
  20.          }
  21.        }
  22.        finally {
  23.          first.lock.unlock();
  24.        }
  25.      }
  26.    }
  27.  }
  28. }

Jvm 能夠檢測 Monitor 死鎖,並以執行緒轉儲的形式列印死鎖資訊。

8.2 活鎖與執行緒饑餓

當執行緒將所有時間用於協商資源訪問或者檢測避免死鎖,以至於沒有執行緒能夠訪問資源時,會造成活鎖(Livelock)。 執行緒饑餓發生在執行緒長時間持鎖,導致一些執行緒無法繼續執行被“餓死”。

9. java.util.concurrent

9.1 執行緒池

執行緒池的核心介面是 ExecutorService。 java.util.concurrent 還提供了一個靜態工廠類 Executors,其中包含了新建執行緒池的工廠方法,新建的執行緒池引數採用最常見的配置。

表6 靜態工廠方法

譯註:在平行計算中,work-stealing 是一種針對多執行緒計算機程式的排程策略。 它解決了在具有固定數量處理器或內核的靜態多執行緒計算機上執行動態多執行緒計算的問題,這種計算可以“產生”新的執行執行緒。 在執行時間、記憶體使用和處理器間通訊方面都能夠高效地完成任務。

在調整執行緒池的大小時,通常需要根據執行應用程式的計算機中的邏輯核心數量來確定執行緒池的大小。 在 Java 中,可以透過呼叫 Runtime.getRuntime().availableProcessors() 讀取。

表7 執行緒池實現

可透過 ExecutorService#submit、ExecutorService#invokeAll 或 ExecutorService#invokeAny 提交任務,可根據不同任務進行多次多載。

表8 任務的功能介面

9.2 Future

Future 是對非同步計算的一種抽象,代表計算結果。計算結果可能是某個計算值或異常。ExecutorService 的大多數方法都使用 Future 作為傳回型別。使用 Future 時,可透過提供的介面檢查當前狀態,或者一直阻塞直到結果計算完成。

  1. ExecutorService executorService = Executors.newSingleThreadExecutor();
  2. Future<String> future = executorService.submit(() -> "result");
  3. try {
  4.  String result = future.get(1L, TimeUnit.SECONDS);
  5.  System.out.println("Result is '" + result + "'.");
  6. }
  7. catch (InterruptedException e) {
  8.  Thread.currentThread().interrupt();
  9.  throw new RuntimeException(e);
  10. }
  11. catch (ExecutionException e) {
  12.  throw new RuntimeException(e.getCause());
  13. }
  14. catch (TimeoutException e) {
  15.  throw new RuntimeException(e);
  16. }
  17. assert future.isDone();

9.3 鎖

9.3.1 Lock

java.util.concurrent.locks package 提供了標準 Lock 介面。ReentrantLock 在實現 synchronized 關鍵字功能的同時還包含了其他功能,例如獲取鎖的狀態資訊、非阻塞 tryLock() 和可中斷鎖定。直接使用 ReentrantLock 示例如下:

  1. class Counter {
  2.  private final Lock lock = new ReentrantLock();
  3.  private int value;
  4.  int increment() {
  5.    lock.lock();
  6.    try {
  7.      return ++value;
  8.    } finally {
  9.      lock.unlock();
  10.    }
  11.  }
  12. }

9.3.2 ReadWriteLock

java.util.concurrent.locks package 還包含 ReadWriteLock 介面(以及 Reentrantreadelock 實現)。該介面定義了一對鎖進行讀寫操作,通常支援多個併發讀取,但只允許一個寫入。

  1. class Statistic {
  2.  private final ReadWriteLock lock = new ReentrantReadWriteLock();
  3.  private int value;
  4.  void increment() {
  5.    lock.writeLock().lock();
  6.    try {
  7.      value++;
  8.    } finally {
  9.      lock.writeLock().unlock();
  10.    }
  11.  }
  12.  int current() {
  13.    lock.readLock().lock();
  14.    try {
  15.      return value;
  16.    } finally {
  17.      lock.readLock().unlock();
  18.    }
  19.  }
  20. }

9.3.3 CountDownLatch

CountDownLatch 用一個計數器初始化。執行緒可以呼叫 await() 等待計數歸零。其他執行緒(或同一執行緒)可能會呼叫 countDown() 來減小計數。一旦計數歸零即不可重用。CountDownLatch 用於發生某些操作時觸發一組未知的執行緒。

9.3.4 CompletableFuture

CompletableFuture 是對非同步計算的一種抽象。 與普通 Future 不同,CompletableFuture 僅支援阻塞方式獲得結果。當結果產生或發生異常時,執行由已註冊的回呼函式建立的任務管道。無論是建立過程中(透過 CompletableFuture#supplyAsync/runAsync),還是在加入回呼過程中( *async 系列方法),如果沒有指定標準的全域性 ForkJoinPool#commonPool 都可以設定執行計算的執行器。

考慮 CompletableFuture 已執行完畢,那麼透過非 async 方法註冊的回呼將在呼叫者的執行緒中執行。

如果程式中有幾個 future,可以使用 CompletableFuture#allOf 獲得一個 future,這個 future 在所有 future 完成時結束。也可以呼叫 CompletableFuture#anyOf 獲得一個 future,這個 future 在其中任何一個 future 完成時結束。

  1. ExecutorService executor0 = Executors.newWorkStealingPool();
  2. ExecutorService executor1 = Executors.newWorkStealingPool();
  3. // 當這兩個 future 完成時結束
  4. CompletableFuture<String> waitingForAll = CompletableFuture
  5.    .allOf(
  6.        CompletableFuture.supplyAsync(() -> "first"),
  7.        CompletableFuture.supplyAsync(() -> "second", executor1)
  8.    )
  9.    .thenApply(ignored -> " is completed.");
  10. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
  11.    // 使用同一個 executor
  12.    .thenApply(result -> "Java " + result)
  13.    // 使用不同的 executor
  14.    .thenApplyAsync(result -> "Dzone " + result, executor1)
  15.    // 當前與其他 future 完成後結束
  16.    .thenCombine(waitingForAll, (first, second) -> first + second)
  17.    // 預設使用 ForkJoinPool#commonPool 作為 executor
  18.    .thenAcceptAsync(result -> {
  19.      System.out.println("Result is '" + result + "'.");
  20.    })
  21.    // 通用處理
  22.    .whenComplete((ignored, exception) -> {
  23.      if (exception != null)
  24.        exception.printStackTrace();
  25.    });
  26. // 第一個阻塞呼叫:在 future 完成前保持阻塞
  27. future.join();
  28. future
  29.    // 在當前執行緒(main)中執行
  30.    .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
  31.    // 預設使用 ForkJoinPool#commonPool 作為 executor
  32.    .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))

9.4 併發集合

使集合執行緒安全最簡單方法是使用 Collections#synchronized* 系列方法。 由於這種解決方案在競爭激烈的情況下效能很差,所以 java.util.concurrent 提供了多種針對併發最佳化的資料結構。

9.4.1 List

表9:java.util.concurrent 中的 Lists

譯註:copy-on-write(寫入時複製)是一種計算機程式設計領域的最佳化策略。其核心思想是,如果有多個呼叫者同時請求相同資源(如記憶體或磁碟上的資料儲存),他們會共同獲取相同的指標指向相同的資源,直到某個呼叫者試圖修改資源的內容時,系統才會真正複製一份專用副本給該呼叫者,而其他呼叫者所見到的最初的資源仍然保持不變。這個過程對其他的呼叫者透明。這種做法的主要優點是如果呼叫者沒有修改該資源,就不會新建副本,因此多個呼叫者只是讀取操作可以共享同一份資源。

9.4.2 Map

表10 java.util.concurrent 中的 Map

9.4.3 Set

表11 java.util.concurrent 中的 Set

封裝 concurrent map 進而建立 concurrent set 的另一種方法:

  1. Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());

9.4.4 Queue

佇列就像是“生產者”和“消費者”之間的管道。按照“先進先出(FIFO)”順序,將物件從管道的一端加入,從管道的另一端取出。BlockingQueue 介面繼承了 Queue介面,並且增加了(生產者新增物件時)佇列滿或(消費者讀取或移除物件時)佇列空的處理。 在這些情況下,BlockingQueue 提供的方法可以一直保持或在一段時間內保持阻塞狀態,直到等待的條件由另一個執行緒的操作改變。

表12 java.util.concurrent 中的 Queue

 

    閱讀原文

    贊(0)

    分享創造快樂