歡迎光臨
每天分享高質量文章

併發程式設計 – Concurrent 使用者指南( 下 )

(點選上方公眾號,可快速關註)


來源:高廣超 ,

www.jianshu.com/p/8cb5d816cb69

16. 執行器服務 ExecutorService

java.util.concurrent.ExecutorService 介面表示一個非同步執行機制,使我們能夠在後臺執行任務。因此一個 ExecutorService 很類似於一個執行緒池。實際上,存在於 java.util.concurrent 包裡的 ExecutorService 實現就是一個執行緒池實現。

ExecutorService 例子

以下是一個簡單的 ExecutorService 例子:

ExecutorService executorService = Executors.newFixedThreadPool(10);  

 

executorService.execute(new Runnable() {  

    public void run() {  

        System.out.println(“Asynchronous task”);  

    }  

});  

 

executorService.shutdown();

首先使用 newFixedThreadPool() 工廠方法建立一個 ExecutorService。這裡建立了一個十個執行緒執行任務的執行緒池。然後,將一個 Runnable 介面的匿名實現類傳遞給 execute() 方法。這將導致 ExecutorService 中的某個執行緒執行該 Runnable。

任務委派

下圖說明瞭一個執行緒是如何將一個任務委託給一個 ExecutorService 去非同步執行的:

一個執行緒將一個任務委派給一個 ExecutorService 去非同步執行。

一旦該執行緒將任務委派給 ExecutorService,該執行緒將繼續它自己的執行,獨立於該任務的執行。

ExecutorService 實現

既然 ExecutorService 是個介面,如果你想用它的話就得去使用它的實現類之一。

java.util.concurrent 包提供了 ExecutorService 介面的以下實現類:

  • ThreadPoolExecutor

  • ScheduledThreadPoolExecutor

建立一個 ExecutorService

ExecutorService 的建立依賴於你使用的具體實現。但是你也可以使用 Executors 工廠類來建立 ExecutorService 實體。

以下是幾個建立 ExecutorService 實體的例子:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();  

 

ExecutorService executorService2 = Executors.newFixedThreadPool(10);  

 

ExecutorService executorService3 = Executors.newScheduledThreadPool(10);

ExecutorService 使用

有幾種不同的方式來將任務委託給 ExecutorService 去執行:

  • execute(Runnable)

  • submit(Runnable)

  • submit(Callable)

  • invokeAny(…)

  • invokeAll(…)

接下來我們挨個看一下這些方法。

execute(Runnable)

execute(Runnable) 方法要求一個 java.lang.Runnable 物件,然後對它進行非同步執行。以下是使用 ExecutorService 執行一個 Runnable 的示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();  

 

executorService.execute(new Runnable() {  

    public void run() {  

        System.out.println(“Asynchronous task”);  

    }  

});  

 

executorService.shutdown();

沒有辦法得知被執行的 Runnable 的執行結果。如果有需要的話你得使用一個 Callable(以下將做介紹)。

submit(Runnable)

submit(Runnable) 方法也要求一個 Runnable 實現類,但它傳回一個 Future 物件。這個 Future 物件可以用來檢查 Runnable 是否已經執行完畢。以下是 ExecutorService submit() 示例:

Future future = executorService.submit(new Runnable() {  

    public void run() {  

        System.out.println(“Asynchronous task”);  

    }  

});  

 

future.get();  //returns null if the task has finished correctly.

submit(Callable)

submit(Callable) 方法類似於 submit(Runnable) 方法,除了它所要求的引數型別之外。Callable 實體除了它的 call() 方法能夠傳回一個結果之外和一個 Runnable 很相像。Runnable.run() 不能夠傳回一個結果。Callable 的結果可以透過 submit(Callable) 方法傳回的 Future 物件進行獲取。

以下是一個 ExecutorService Callable 示例:

Future future = executorService.submit(new Callable(){  

    public Object call() throws Exception {  

        System.out.println(“Asynchronous Callable”);  

        return “Callable Result”;  

    }  

});  

 

System.out.println(“future.get() = ” + future.get());

以上程式碼輸出:

Asynchronous Callable

future.get() = Callable Result

invokeAny()

invokeAny() 方法要求一系列的 Callable 或者其子介面的實體物件。呼叫這個方法並不會傳回一個 Future,但它傳回其中一個 Callable 物件的結果。無法保證傳回的是哪個 Callable 的結果 – 只能表明其中一個已執行結束。

如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。以下是示例程式碼:

ExecutorService executorService = Executors.newSingleThreadExecutor();  

 

Set> callables = new HashSet>();  

 

callables.add(new Callable() {  

    public String call() throws Exception {  

        return “Task 1”;  

    }  

});  

callables.add(new Callable() {  

    public String call() throws Exception {  

        return “Task 2”;  

    }  

});  

callables.add(new Callable() {  

    public String call() throws Exception {  

        return “Task 3”;  

    }  

});  

 

String result = executorService.invokeAny(callables);  

 

System.out.println(“result = ” + result);  

 

executorService.shutdown();

上述程式碼將會打印出給定 Callable 集合中的一個的執行結果。我自己試著執行了它幾次,結果始終在變。有時是 “Task 1″,有時是 “Task 2″ 等等。

invokeAll()

invokeAll() 方法將呼叫你在集合中傳給 ExecutorService 的所有 Callable 物件。invokeAll() 傳回一系列的 Future 物件,透過它們你可以獲取每個 Callable 的執行結果。記住,一個任務可能會由於一個異常而結束,因此它可能沒有 “成功”。

無法透過一個 Future 物件來告知我們是兩種結束中的哪一種。以下是一個程式碼示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();  

 

Set> callables = new HashSet>();  

 

callables.add(new Callable() {  

    public String call() throws Exception {  

        return “Task 1”;  

    }  

});  

callables.add(new Callable() {  

    public String call() throws Exception {  

        return “Task 2”;  

    }  

});  

callables.add(new Callable() {  

    public String call() throws Exception {  

        return “Task 3”;  

    }  

});  

 

List> futures = executorService.invokeAll(callables);  

 

for(Future future : futures){  

    System.out.println(“future.get = ” + future.get());  

}  

 

executorService.shutdown();

ExecutorService 關閉

使用完 ExecutorService 之後你應該將其關閉,以使其中的執行緒不再執行。

比如,如果你的應用是透過一個 main() 方法啟動的,之後 main 方法退出了你的應用,如果你的應用有一個活動的 ExexutorService 它將還會保持執行。ExecutorService 裡的活動執行緒阻止了 JVM 的關閉。

要終止 ExecutorService 裡的執行緒你需要呼叫 ExecutorService 的 shutdown() 方法。ExecutorService 並不會立即關閉,但它將不再接受新的任務,而且一旦所有執行緒都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown() 被呼叫之前所有提交給 ExecutorService 的任務都被執行。如果你想要立即關閉 ExecutorService,你可以呼叫 shutdownNow() 方法。這樣會立即嘗試停止所有執行中的任務,並忽略掉那些已提交但尚未開始處理的任務。無法擔保執行任務的正確執行。可能它們被停止了,也可能已經執行結束。

17. 執行緒池執行者 ThreadPoolExecutor

java.util.concurrent.ThreadPoolExecutor 是 ExecutorService 介面的一個實現。ThreadPoolExecutor 使用其內部池中的執行緒執行給定任務(Callable 或者 Runnable)。

ThreadPoolExecutor 包含的執行緒池能夠包含不同數量的執行緒。池中執行緒的數量由以下變數決定:

  • corePoolSize

  • maximumPoolSize

當一個任務委託給執行緒池時,如果池中執行緒數量低於 corePoolSize,一個新的執行緒將被建立,即使池中可能尚有空閑執行緒。如果內部任務佇列已滿,而且有至少 corePoolSize 正在執行,但是執行執行緒的數量低於 maximumPoolSize,一個新的執行緒將被建立去執行該任務。

ThreadPoolExecutor 圖解:

一個 ThreadPoolExecutor

建立一個 ThreadPoolExecutor

ThreadPoolExecutor 有若干個可用構造子。比如:

int  corePoolSize  =    5;  

int  maxPoolSize   =   10;  

long keepAliveTime = 5000;  

 

ExecutorService threadPoolExecutor =  

        new ThreadPoolExecutor(  

                corePoolSize,  

                maxPoolSize,  

                keepAliveTime,  

                TimeUnit.MILLISECONDS,  

                new LinkedBlockingQueue()  

                );

但是,除非你確實需要顯式為 ThreadPoolExecutor 定義所有引數,使用 java.util.concurrent.Executors 類中的工廠方法之一會更加方便,正如 ExecutorService 小節所述。

18. 定時執行者服務 ScheduledExecutorService

java.util.concurrent.ScheduledExecutorService 是一個 ExecutorService, 它能夠將任務延後執行,或者間隔固定時間多次執行。 任務由一個工作者執行緒非同步執行,而不是由提交任務給 ScheduledExecutorService 的那個執行緒執行。

ScheduledExecutorService 例子

以下是一個簡單的 ScheduledExecutorService 示例:

ScheduledExecutorService scheduledExecutorService =  

        Executors.newScheduledThreadPool(5);  

 

ScheduledFuture scheduledFuture =  

    scheduledExecutorService.schedule(new Callable() {  

        public Object call() throws Exception {  

            System.out.println(“Executed!”);  

            return “Called!”;  

        }  

    },  

    5,  

    TimeUnit.SECONDS);

首先一個內建 5 個執行緒的 ScheduledExecutorService 被建立。之後一個 Callable 介面的匿名類示例被建立然後傳遞給 schedule() 方法。後邊的倆引數定義了 Callable 將在 5 秒鐘之後被執行。

ScheduledExecutorService 實現

既然 ScheduledExecutorService 是一個介面,你要用它的話就得使用 java.util.concurrent 包裡對它的某個實現類。ScheduledExecutorService 具有以下實現類:ScheduledThreadPoolExecutor

建立一個 ScheduledExecutorService

如何建立一個 ScheduledExecutorService 取決於你採用的它的實現類。但是你也可以使用 Executors 工廠類來建立一個 ScheduledExecutorService 實體。比如:

ScheduledExecutorService scheduledExecutorService =  

 

        Executors.newScheduledThreadPool(5);

ScheduledExecutorService 使用

一旦你建立了一個 ScheduledExecutorService,你可以透過呼叫它的以下方法:

  • schedule (Callable task, long delay, TimeUnit timeunit)

  • schedule (Runnable task, long delay, TimeUnit timeunit)

  • scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)

  • scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

下麵我們就簡單看一下這些方法。

schedule (Callable task, long delay, TimeUnit timeunit)

這個方法計劃指定的 Callable 在給定的延遲之後執行。這個方法傳回一個 ScheduledFuture,透過它你可以在它被執行之前對它進行取消,或者在它執行之後獲取結果。以下是一個示例:

ScheduledExecutorService scheduledExecutorService =  

        Executors.newScheduledThreadPool(5);  

 

ScheduledFuture scheduledFuture =  

    scheduledExecutorService.schedule(new Callable() {  

        public Object call() throws Exception {  

            System.out.println(“Executed!”);  

            return “Called!”;  

        }  

    },  

    5,  

    TimeUnit.SECONDS);  

 

System.out.println(“result = ” + scheduledFuture.get());  

 

scheduledExecutorService.shutdown();

示例輸出結果:

Executed!

result = Called!

schedule (Runnable task, long delay, TimeUnit timeunit)

除了 Runnable 無法傳回一個結果之外,這一方法工作起來就像以一個 Callable 作為一個引數的那個版本的方法一樣,因此 ScheduledFuture.get() 在任務執行結束之後傳回 null。

scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)

這一方法規劃一個任務將被定期執行。該任務將會在首個 initialDelay 之後得到執行,然後每個 period 時間之後重覆執行。如果給定任務的執行丟擲了異常,該任務將不再執行。如果沒有任何異常的話,這個任務將會持續迴圈執行到 ScheduledExecutorService 被關閉。如果一個任務佔用了比計劃的時間間隔更長的時候,下一次執行將在當前執行結束執行才開始。計劃任務在同一時間不會有多個執行緒同時執行。

scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

除了 period 有不同的解釋之外這個方法和 scheduleAtFixedRate() 非常像。

scheduleAtFixedRate() 方法中,period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。而在本方法中,period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。因此這個延遲是執行結束之間的間隔,而不是執行開始之間的間隔。

ScheduledExecutorService 關閉

正如 ExecutorService,在你使用結束之後你需要把 ScheduledExecutorService 關閉掉。否則他將導致 JVM 繼續執行,即使所有其他執行緒已經全被關閉。

你可以使用從 ExecutorService 介面繼承來的 shutdown() 或 shutdownNow() 方法將 ScheduledExecutorService 關閉。參見 ExecutorService 關閉部分以獲取更多資訊。

19. 使用 ForkJoinPool 進行分叉和合併

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務可以繼續分割成更小的子任務,只要它還能分割。可能聽起來有些抽象,因此本節中我們將會解釋 ForkJoinPool 是如何工作的,還有任務分割是如何進行的。

分叉和合併解釋

在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合併的原理。

分叉和合併原理包含兩個遞迴進行的步驟。兩個步驟分別是分叉步驟和合併步驟。

分叉

一個使用了分叉和合併原理的任務可以將自己分叉(分割)為更小的子任務,這些子任務可以被併發執行。如下圖所示:

透過把自己分割成多個子任務,每個子任務可以由不同的 CPU 並行執行,或者被同一個 CPU 上的不同執行緒執行。只有當給的任務過大,把它分割成幾個子任務才有意義。把任務分割成子任務有一定開銷,因此對於小型任務,這個分割的消耗可能比每個子任務併發執行的消耗還要大。

什麼時候把一個任務分割成子任務是有意義的,這個界限也稱作一個閥值。這要看每個任務對有意義閥值的決定。很大程度上取決於它要做的工作的種類。

合併

當一個任務將自己分割成若干子任務之後,該任務將進入等待所有子任務的結束之中。一旦子任務執行結束,該任務可以把所有結果合併到同一個結果。圖示如下:

當然,並非所有型別的任務都會傳回一個結果。如果這個任務並不傳回一個結果,它只需等待所有子任務執行完畢。也就不需要結果的合併啦。

ForkJoinPool

ForkJoinPool 是一個特殊的執行緒池,它的設計是為了更好的配合 分叉-和-合併 任務分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。

建立一個 ForkJoinPool

你可以透過其構造子建立一個 ForkJoinPool。作為傳遞給 ForkJoinPool 構造子的一個引數,你可以定義你期望的並行級別。並行級別表示你想要傳遞給 ForkJoinPool 的任務所需的執行緒或 CPU 數量。以下是一個 ForkJoinPool 示例:

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

這個示例建立了一個並行級別為 4 的 ForkJoinPool。

提交任務到 ForkJoinPool

就像提交任務到 ExecutorService 那樣,把任務提交到 ForkJoinPool。你可以提交兩種型別的任務。一種是沒有任何傳回值的(一個 “行動”),另一種是有傳回值的(一個”任務”)。這兩種型別分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種型別的任務,以及如何對它們進行提交。

RecursiveAction

RecursiveAction 是一種沒有任何傳回值的任務。它只是做一些工作,比如寫資料到磁碟,然後就退出了。一個 RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨立的執行緒或者 CPU 執行。你可以透過繼承來實現一個 RecursiveAction。示例如下:

import java.util.ArrayList;  

import java.util.List;  

import java.util.concurrent.RecursiveAction;  

 

public class MyRecursiveAction extends RecursiveAction {  

 

    private long workLoad = 0;  

 

    public MyRecursiveAction(long workLoad) {  

        this.workLoad = workLoad;  

    }  

 

    @Override 

    protected void compute() {  

 

        //if work is above threshold, break tasks up into smaller tasks  

        if(this.workLoad > 16) {  

            System.out.println(“Splitting workLoad : ” + this.workLoad);  

 

            List subtasks =  

                new ArrayList();  

 

            subtasks.addAll(createSubtasks());  

 

            for(RecursiveAction subtask : subtasks){  

                subtask.fork();  

            }  

 

        } else {  

            System.out.println(“Doing workLoad myself: ” + this.workLoad);  

        }  

    }  

 

    private List createSubtasks() {  

        List subtasks =  

            new ArrayList();  

 

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);  

        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);  

 

        subtasks.add(subtask1);  

        subtasks.add(subtask2);  

 

        return subtasks;  

    }  

}

例子很簡單。MyRecursiveAction 將一個虛構的 workLoad 作為引數傳給自己的構造子。如果 workLoad 高於一個特定閥值,該工作將被分割為幾個子工作,子工作繼續分割。如果 workLoad 低於特定閥值,該工作將由 MyRecursiveAction 自己執行。你可以這樣規劃一個 MyRecursiveAction 的執行:

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);  

forkJoinPool.invoke(myRecursiveAction);

RecursiveTask

RecursiveTask 是一種會傳回結果的任務。它可以將自己的工作分割為若干更小任務,並將這些子任務的執行結果合併到一個集體結果。可以有幾個水平的分割和合併。以下是一個 RecursiveTask 示例:

import java.util.ArrayList;  

import java.util.List;  

import java.util.concurrent.RecursiveTask;  

 

 

public class MyRecursiveTask extends RecursiveTask {  

 

    private long workLoad = 0;  

 

    public MyRecursiveTask(long workLoad) {  

        this.workLoad = workLoad;  

    }  

 

    protected Long compute() {  

 

        //if work is above threshold, break tasks up into smaller tasks  

        if(this.workLoad > 16) {  

            System.out.println(“Splitting workLoad : ” + this.workLoad);  

 

            List subtasks =  

                new ArrayList();  

            subtasks.addAll(createSubtasks());  

 

            for(MyRecursiveTask subtask : subtasks){  

                subtask.fork();  

            }  

 

            long result = 0;  

            for(MyRecursiveTask subtask : subtasks) {  

                result += subtask.join();  

            }  

            return result;  

 

        } else {  

            System.out.println(“Doing workLoad myself: ” + this.workLoad);  

            return workLoad * 3;  

        }  

    }  

 

    private List createSubtasks() {  

        List subtasks =  

        new ArrayList();  

 

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);  

        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);  

 

        subtasks.add(subtask1);  

        subtasks.add(subtask2);  

 

        return subtasks;  

    }  

}

除了有一個結果傳回之外,這個示例和 RecursiveAction 的例子很像。MyRecursiveTask 類繼承自 RecursiveTask,這也就意味著它將傳回一個 Long 型別的結果。

MyRecursiveTask 示例也會將工作分割為子任務,並透過 fork() 方法對這些子任務計劃執行。

此外,本示例還透過呼叫每個子任務的 join() 方法收集它們傳回的結果。子任務的結果隨後被合併到一個更大的結果,並最終將其傳回。對於不同級別的遞迴,這種子任務的結果合併可能會發生遞迴。

你可以這樣規劃一個 RecursiveTask:

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);  

long mergedResult = forkJoinPool.invoke(myRecursiveTask);  

System.out.println(“mergedResult = ” + mergedResult);

註意是如何透過 ForkJoinPool.invoke() 方法的呼叫來獲取最終執行結果的。

ForkJoinPool 評論

貌似並非每個人都對 Java 7 裡的 ForkJoinPool 滿意:《一個 Java 分叉-合併 帶來的災禍》。

在你計劃在自己的專案裡使用 ForkJoinPool 之前最好讀一下該篇文章。

20. 鎖 Lock

java.util.concurrent.locks.Lock 是一個類似於 synchronized 塊的執行緒同步機制。但是 Lock 比 synchronized 塊更加靈活、精細。順便說一下,在我的《Java 併發指南》中我對如何實現你自己的鎖進行了描述。

Java Lock 例子

既然 Lock 是一個介面,在你的程式裡需要使用它的實現類之一來使用它。以下是一個簡單示例:

Lock lock = new ReentrantLock();  

lock.lock();  

//critical section  

lock.unlock();

首先建立了一個 Lock 物件。之後呼叫了它的 lock() 方法。這時候這個 lock 實體就被鎖住啦。任何其他再過來呼叫 lock() 方法的執行緒將會被阻塞住,直到鎖定 lock 實體的執行緒呼叫了 unlock() 方法。最後 unlock() 被呼叫了,lock 物件解鎖了,其他執行緒可以對它進行鎖定了。

Java Lock 實現

java.util.concurrent.locks 包提供了以下對 Lock 介面的實現類:

  • ReentrantLock

Lock 和 synchronized 程式碼塊的主要不同點

一個 Lock 物件和一個 synchronized 程式碼塊之間的主要不同點是:

  • synchronized 程式碼塊不能夠保證進入訪問等待的執行緒的先後順序。

  • 你不能夠傳遞任何引數給一個 synchronized 程式碼塊的入口。因此,對於 synchronized 程式碼塊的訪問等待設定超時時間是不可能的事情。

  • synchronized 塊必須被完整地包含在單個方法裡。而一個 Lock 物件可以把它的 lock() 和 unlock() 方法的呼叫放在不同的方法裡。

Lock 的方法

Lock 介面具有以下主要方法:

  • lock()

  • lockInterruptibly()

  • tryLock()

  • tryLock(long timeout, TimeUnit timeUnit)

  • unlock()

lock() 將 Lock 實體鎖定。如果該 Lock 實體已被鎖定,呼叫 lock() 方法的執行緒將會阻塞,直到 Lock 實體解鎖。

lockInterruptibly() 方法將會被呼叫執行緒鎖定,除非該執行緒被打斷。此外,如果一個執行緒在透過這個方法來鎖定 Lock 物件時進入阻塞等待,而它被打斷了的話,該執行緒將會退出這個方法呼叫。

tryLock() 方法試圖立即鎖定 Lock 實體。如果鎖定成功,它將傳回 true,如果 Lock 實體已被鎖定該方法傳回 false。這一方法永不阻塞。tryLock(long timeout, TimeUnit timeUnit) 的工作類似於 tryLock() 方法,除了它在放棄鎖定 Lock 之前等待一個給定的超時時間之外。

unlock() 方法對 Lock 實體解鎖。一個 Lock 實現將只允許鎖定了該物件的執行緒來呼叫此方法。其他(沒有鎖定該 Lock 物件的執行緒)執行緒對 unlock() 方法的呼叫將會拋一個未檢查異常(RuntimeException)。

21. 讀寫鎖 ReadWriteLock

java.util.concurrent.locks.ReadWriteLock 讀寫鎖是一種先進的執行緒鎖機制。它能夠允許多個執行緒在同一時間對某特定資源進行讀取,但同一時間內只能有一個執行緒對其進行寫入。

讀寫鎖的理念在於多個執行緒能夠對一個共享資源進行讀取,而不會導致併發問題。併發問題的發生場景在於對一個共享資源的讀和寫操作的同時進行,或者多個寫操作併發進行。

本節只討論 Java 內建 ReadWriteLock。如果你想瞭解 ReadWriteLock 背後的實現原理,請參考我的《Java 併發指南》主題中的《讀寫鎖》小節。

ReadWriteLock 鎖規則

一個執行緒在對受保護資源在讀或者寫之前對 ReadWriteLock 鎖定的規則如下:

  • 讀鎖:如果沒有任何寫操作執行緒鎖定 ReadWriteLock,並且沒有任何寫操作執行緒要求一個寫鎖(但還沒有獲得該鎖)。因此,可以有多個讀操作執行緒對該鎖進行鎖定。

  • 寫鎖:如果沒有任何讀操作或者寫操作。因此,在寫操作的時候,只能有一個執行緒對該鎖進行鎖定。

ReadWriteLock 實現

ReadWriteLock 是個介面,如果你想用它的話就得去使用它的實現類之一。java.util.concurrent.locks 包提供了 ReadWriteLock 介面的以下實現類:

  • ReentrantReadWriteLock

ReadWriteLock 程式碼示例

以下是 ReadWriteLock 的建立以及如何使用它進行讀、寫鎖定的簡單示例程式碼:

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();  

readWriteLock.readLock().lock();  

 

    // multiple readers can enter this section  

    // if not locked for writing, and not writers waiting  

    // to lock for writing.  

 

readWriteLock.readLock().unlock();  

 

 

readWriteLock.writeLock().lock();  

 

    // only one writer can enter this section,  

    // and only if no threads are currently reading.  

 

readWriteLock.writeLock().unlock();

註意如何使用 ReadWriteLock 對兩種鎖實體的持有。一個對讀訪問進行保護,一個隊寫訪問進行保護。

22. 原子性布林 AtomicBoolean

AtomicBoolean 類為我們提供了一個可以用原子方式進行讀和寫的布林值,它還擁有一些先進的原子性操作,比如 compareAndSet()。AtomicBoolean 類位於 java.util.concurrent.atomic 包,完整類名是為 java.util.concurrent.atomic.AtomicBoolean。本小節描述的 AtomicBoolean 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。

建立一個 AtomicBoolean

你可以這樣建立一個 AtomicBoolean:

AtomicBoolean atomicBoolean = new AtomicBoolean();

以上示例新建了一個預設值為 false 的 AtomicBoolean。如果你想要為 AtomicBoolean 實體設定一個顯式的初始值,那麼你可以將初始值傳給 AtomicBoolean 的構造子:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);

獲取 AtomicBoolean 的值

你可以透過使用 get() 方法來獲取一個 AtomicBoolean 的值。示例如下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  

boolean value = atomicBoolean.get();

以上程式碼執行後 value 變數的值將為 true。

設定 AtomicBoolean 的值

你可以透過使用 set() 方法來設定一個 AtomicBoolean 的值。

示例如下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  

 

atomicBoolean.set(false);

以上程式碼執行後 AtomicBoolean 的值為 false。

交換 AtomicBoolean 的值

你可以透過 getAndSet() 方法來交換一個 AtomicBoolean 實體的值。getAndSet() 方法將傳回 AtomicBoolean 當前的值,並將為 AtomicBoolean 設定一個新值。示例如下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  

 

boolean oldValue = atomicBoolean.getAndSet(false);

以上程式碼執行後 oldValue 變數的值為 true,atomicBoolean 實體將持有 false 值。程式碼成功將 AtomicBoolean 當前值 ture 交換為 false。

比較並設定 AtomicBoolean 的值

compareAndSet() 方法允許你對 AtomicBoolean 的當前值與一個期望值進行比較,如果當前值等於期望值的話,將會對 AtomicBoolean 設定一個新值。compareAndSet() 方法是原子性的,因此在同一時間之內有單個執行緒執行它。因此 compareAndSet() 方法可被用於一些類似於鎖的同步的簡單實現。以下是一個 compareAndSet() 示例:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  

 

boolean expectedValue = true;  

boolean newValue      = false;  

 

boolean wasNewValueSet = atomicBoolean.compareAndSet(  

    expectedValue, newValue);

本示例對 AtomicBoolean 的當前值與 true 值進行比較,如果相等,將 AtomicBoolean 的值更新為 false。

23. 原子性整型 AtomicInteger

AtomicInteger 類為我們提供了一個可以進行原子性讀和寫操作的 int 變數,它還包含一系列先進的原子性操作,比如 compareAndSet()。AtomicInteger 類位於 java.util.concurrent.atomic 包,因此其完整類名為 java.util.concurrent.atomic.AtomicInteger。本小節描述的 AtomicInteger 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。

建立一個 AtomicInteger

建立一個 AtomicInteger 示例如下:

AtomicInteger atomicInteger = new AtomicInteger();

本示例將建立一個初始值為 0 的 AtomicInteger。如果你想要建立一個給定初始值的 AtomicInteger,你可以這樣:

AtomicInteger atomicInteger = new AtomicInteger(123);

本示例將 123 作為引數傳給 AtomicInteger 的構造子,它將設定 AtomicInteger 實體的初始值為 123。

獲取 AtomicInteger 的值

你可以使用 get() 方法獲取 AtomicInteger 實體的值。示例如下:

AtomicInteger atomicInteger = new AtomicInteger(123);

int theValue = atomicInteger.get();

設定 AtomicInteger 的值

你可以透過 set() 方法對 AtomicInteger 的值進行重新設定。以下是 AtomicInteger.set() 示例:

AtomicInteger atomicInteger = new AtomicInteger(123);  

atomicInteger.set(234);

以上示例建立了一個初始值為 123 的 AtomicInteger,而在第二行將其值更新為 234。

比較並設定 AtomicInteger 的值

AtomicInteger 類也透過了一個原子性的 compareAndSet() 方法。這一方法將 AtomicInteger 實體的當前值與期望值進行比較,如果二者相等,為 AtomicInteger 實體設定一個新值。AtomicInteger.compareAndSet() 程式碼示例:

AtomicInteger atomicInteger = new AtomicInteger(123);  

 

int expectedValue = 123;  

int newValue      = 234;  

atomicInteger.compareAndSet(expectedValue, newValue);

本示例首先新建一個初始值為 123 的 AtomicInteger 實體。然後將 AtomicInteger 與期望值 123 進行比較,如果相等,將 AtomicInteger 的值更新為 234。

增加 AtomicInteger 值

AtomicInteger 類包含有一些方法,透過它們你可以增加 AtomicInteger 的值,並獲取其值。這些方法如下:

  • addAndGet()

  • getAndAdd()

  • getAndIncrement()

  • incrementAndGet()

第一個 addAndGet() 方法給 AtomicInteger 增加了一個值,然後傳回增加後的值。getAndAdd() 方法為 AtomicInteger 增加了一個值,但傳回的是增加以前的 AtomicInteger 的值。具體使用哪一個取決於你的應用場景。以下是這兩種方法的示例:

AtomicInteger atomicInteger = new AtomicInteger();  

System.out.println(atomicInteger.getAndAdd(10));  

System.out.println(atomicInteger.addAndGet(10));

本示例將打印出 0 和 20。例子中,第二行拿到的是加 10 之前的 AtomicInteger 的值。加 10 之前的值是 0。第三行將 AtomicInteger 的值再加 10,並傳回加操作之後的值。該值現在是為 20。你當然也可以使用這倆方法為 AtomicInteger 新增負值。結果實際是一個減法操作。getAndIncrement() 和 incrementAndGet() 方法類似於 getAndAdd() 和 addAndGet(),但每次只將 AtomicInteger 的值加 1。

減小 AtomicInteger 的值

AtomicInteger 類還提供了一些減小 AtomicInteger 的值的原子性方法。這些方法是:

  • decrementAndGet()

  • getAndDecrement()

decrementAndGet() 將 AtomicInteger 的值減一,並傳回減一後的值。getAndDecrement() 也將 AtomicInteger 的值減一,但它傳回的是減一之前的值。

24. 原子性長整型 AtomicLong

AtomicLong 類為我們提供了一個可以進行原子性讀和寫操作的 long 變數,它還包含一系列先進的原子性操作,比如 compareAndSet()AtomicLong 類位於 java.util.concurrent.atomic 包,因此其完整類名為 java.util.concurrent.atomic.AtomicLong。本小節描述的 AtomicLong 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。

建立一個 AtomicLong

建立一個 AtomicLong 如下:

AtomicLong atomicLong = new AtomicLong();

將建立一個初始值為 0 的 AtomicLong。如果你想建立一個指定初始值的 AtomicLong,可以:

AtomicLong atomicLong = new AtomicLong(123);

本示例將 123 作為引數傳遞給 AtomicLong 的構造子,後者將 AtomicLong 實體的初始值設定為 123。

獲取 AtomicLong 的值

你可以透過 get() 方法獲取 AtomicLong 的值。AtomicLong.get() 示例:

AtomicLong atomicLong = new AtomicLong(123);

long theValue = atomicLong.get();

設定 AtomicLong 的值

你可以透過 set() 方法設定 AtomicLong 實體的值。一個 AtomicLong.set() 的示例:

AtomicLong atomicLong = new AtomicLong(123);  

atomicLong.set(234);

本示例新建了一個初始值為 123 的 AtomicLong,第二行將其值設定為 234。

比較並設定 AtomicLong 的值

AtomicLong 類也有一個原子性的 compareAndSet() 方法。這一方法將 AtomicLong 實體的當前值與一個期望值進行比較,如果兩種相等,為 AtomicLong 實體設定一個新值。AtomicLong.compareAndSet() 使用示例:

AtomicLong atomicLong = new AtomicLong(123);  

 

long expectedValue = 123;  

long newValue      = 234;  

atomicLong.compareAndSet(expectedValue, newValue);

本示例新建了一個初始值為 123 的 AtomicLong。然後將 AtomicLong 的當前值與期望值 123 進行比較,如果相等的話,AtomicLong 的新值將變為 234。

增加 AtomicLong 值

AtomicLong 具備一些能夠增加 AtomicLong 的值並傳回自身值的方法。這些方法如下:

  • addAndGet()

  • getAndAdd()

  • getAndIncrement()

  • incrementAndGet()

第一個方法 addAndGet() 將 AtomicLong 的值加一個數字,並傳回增加後的值。第二個方法 getAndAdd() 也將 AtomicLong 的值加一個數字,但傳回的是增加前的 AtomicLong 的值。具體使用哪一個取決於你自己的場景。示例如下:

AtomicLong atomicLong = new AtomicLong();  

System.out.println(atomicLong.getAndAdd(10));  

System.out.println(atomicLong.addAndGet(10));

本示例將打印出 0 和 20。例子中,第二行拿到的是加 10 之前的 AtomicLong 的值。加 10 之前的值是 0。第三行將 AtomicLong 的值再加 10,並傳回加操作之後的值。該值現在是為 20。你當然也可以使用這倆方法為 AtomicLong 新增負值。結果實際是一個減法操作。getAndIncrement() 和 incrementAndGet() 方法類似於 getAndAdd() 和 addAndGet(),但每次只將 AtomicLong 的值加 1。

減小 AtomicLong 的值

AtomicLong 類還提供了一些減小 AtomicLong 的值的原子性方法。這些方法是:

  • decrementAndGet()

  • getAndDecrement()

decrementAndGet() 將 AtomicLong 的值減一,並傳回減一後的值。getAndDecrement() 也將 AtomicLong 的值減一,但它傳回的是減一之前的值。

25. 原子性取用型 AtomicReference

AtomicReference 提供了一個可以被原子性讀和寫的物件取用變數。原子性的意思是多個想要改變同一個 AtomicReference 的執行緒不會導致 AtomicReference 處於不一致的狀態。AtomicReference 還有一個 compareAndSet() 方法,透過它你可以將當前取用於一個期望值(取用)進行比較,如果相等,在該 AtomicReference 物件內部設定一個新的取用。

建立一個 AtomicReference

建立 AtomicReference 如下:

AtomicReference atomicReference = new AtomicReference();

如果你需要使用一個指定取用建立 AtomicReference,可以:

String initialReference = “the initially referenced string”;  

AtomicReference atomicReference = new AtomicReference(initialReference);

建立泛型 AtomicReference

你可以使用 Java 泛型來建立一個泛型 AtomicReference。示例:

AtomicReference atomicStringReference =  

    new AtomicReference();

你也可以為泛型 AtomicReference 設定一個初始值。示例:

String initialReference = “the initially referenced string”;  

AtomicReference atomicStringReference =  

    new AtomicReference(initialReference);

獲取 AtomicReference 取用

你可以透過 AtomicReference 的 get() 方法來獲取儲存在 AtomicReference 裡的取用。如果你的 AtomicReference 是非泛型的,get() 方法將傳回一個 Object 型別的取用。如果是泛型化的,get() 將傳回你建立 AtomicReference 時宣告的那個型別。先來看一個非泛型的 AtomicReference get() 示例:

AtomicReference atomicReference = new AtomicReference(“first value referenced”);  

 

String reference = (String) atomicReference.get();

註意如何對 get() 方法傳回的取用強制轉換為 String。泛型化的 AtomicReference 示例:

AtomicReference atomicReference =   

     new AtomicReference(“first value referenced”);  

 

String reference = atomicReference.get();

編譯器知道了取用的型別,所以我們無需再對 get() 傳回的取用進行強制轉換了。

設定 AtomicReference 取用

你可以使用 get() 方法對 AtomicReference 裡邊儲存的取用進行設定。如果你定義的是一個非泛型 AtomicReference,set() 將會以一個 Object 取用作為引數。如果是泛型化的 AtomicReference,set() 方法將只接受你定義給的型別。AtomicReference set() 示例:

new AtomicReference();  

atomicReference.set(“New object referenced”);

這個看起來非泛型和泛型化的沒啥區別。真正的區別在於編譯器將對你能夠設定給一個泛型化的 AtomicReference 引數型別進行限制。

比較並設定 AtomicReference 取用

AtomicReference 類具備了一個很有用的方法:compareAndSet()。compareAndSet() 可以將儲存在 AtomicReference 裡的取用於一個期望取用進行比較,如果兩個取用是一樣的(並非 equals() 的相等,而是 == 的一樣),將會給 AtomicReference 實體設定一個新的取用。

如果 compareAndSet() 為 AtomicReference 設定了一個新的取用,compareAndSet() 將傳回 true。否則 compareAndSet() 傳回 false。AtomicReference compareAndSet() 示例:

String initialReference = “initial value referenced”;  

 

AtomicReference atomicStringReference =  

    new AtomicReference(initialReference);  

 

String newReference = “new value referenced”;  

boolean exchanged = atomicStringReference.compareAndSet(initialReference, newReference);  

System.out.println(“exchanged: ” + exchanged);  

 

exchanged = atomicStringReference.compareAndSet(initialReference, newReference);  

System.out.println(“exchanged: ” + exchanged);

本示例建立了一個帶有一個初始取用的泛型化的 AtomicReference。之後兩次呼叫 comparesAndSet()來對儲存值和期望值進行對比,如果二者一致,為 AtomicReference 設定一個新的取用。第一次比較,儲存的取用(initialReference)和期望的取用(initialReference)一致,所以一個新的取用(newReference)被設定給 AtomicReference,compareAndSet() 方法傳回 true。第二次比較時,儲存的取用(newReference)和期望的取用(initialReference)不一致,因此新的取用沒有被設定給 AtomicReference,compareAndSet() 方法傳回 false。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂