歡迎光臨
每天分享高質量文章

Java 執行緒池詳解

(點擊上方公眾號,可快速關註)


來源:CarpenterLee ,

www.cnblogs.com/CarpenterLee/p/9558026.html

構造一個執行緒池為什麼需要幾個引數?如果避免執行緒池出現OOM?Runnable和Callable的區別是什麼?本文將對這些問題一一解答,同時還將給出使用執行緒池的常見場景和代碼片段。

基礎知識

Executors創建執行緒池

Java中創建執行緒池很簡單,只需要呼叫Executors中相應的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不僅隱藏了複雜性,也為我們埋下了潛在的隱患(OOM,執行緒耗盡)。

Executors創建執行緒池便捷方法串列:

小程式使用這些快捷方法沒什麼問題,對於服務端需要長期運行的程式,創建執行緒池應該直接使用ThreadPoolExecutor的構造方法。沒錯,上述Executors方法創建的執行緒池就是ThreadPoolExecutor。

ThreadPoolExecutor構造方法

Executors中創建執行緒池的快捷方法,實際上是呼叫了ThreadPoolExecutor的構造方法(定時任務使用的是ScheduledThreadPoolExecutor),該類構造方法引數串列如下:

// Java執行緒池的完整建構式

public ThreadPoolExecutor(

  int corePoolSize, // 執行緒池長期維持的執行緒數,即使執行緒處於Idle狀態,也不會回收。

  int maximumPoolSize, // 執行緒數的上限

  long keepAliveTime, TimeUnit unit, // 超過corePoolSize的執行緒的idle時長,

                                     // 超過這個時間,多餘的執行緒會被回收。

  BlockingQueue workQueue, // 任務的排隊佇列

  ThreadFactory threadFactory, // 新執行緒的產生方式

  RejectedExecutionHandler handler) // 拒絕策略

竟然有7個引數,很無奈,構造一個執行緒池確實需要這麼多引數。這些引數中,比較容易引起問題的有corePoolSize, maximumPoolSize, workQueue以及handler:

  • corePoolSize和maximumPoolSize設置不當會影響效率,甚至耗盡執行緒;

  • workQueue設置不當容易導致OOM;

  • handler設置不當會導致提交任務時丟擲異常。

正確的引數設置方式會在下文給出。

執行緒池的工作順序

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

corePoolSize -> 任務佇列 -> maximumPoolSize -> 拒絕策略

Runnable和Callable

可以向執行緒池提交的任務有兩種:Runnable和Callable,二者的區別如下:

  1. 方法簽名不同,void Runnable.run(), V Callable.call() throws Exception

  2. 是否允許有傳回值,Callable允許有傳回值

  3. 是否允許丟擲異常,Callable允許丟擲異常。

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有傳回值,允許丟擲異常。

三種提交任務的方式:

如何正確使用執行緒池

避免使用無界佇列

不要使用Executors.newXXXThreadPool()快捷方法創建執行緒池,因為這種方式會使用無界的任務佇列,為避免OOM,我們應該使用ThreadPoolExecutor的構造方法手動指定佇列的最大長度:

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 

                0, TimeUnit.SECONDS, 

                new ArrayBlockingQueue<>(512), // 使用有界佇列,避免OOM

                new ThreadPoolExecutor.DiscardPolicy());

明確拒絕任務時的行為

任務佇列總有占滿的時候,這是再submit()提交新的任務會怎麼樣呢?RejectedExecutionHandler接口為我們提供了控制方式,接口定義如下:

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

}

執行緒池給我們提供了幾種常見的拒絕策略:

執行緒池預設的拒絕行為是AbortPolicy,也就是丟擲RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 

                0, TimeUnit.SECONDS, 

                new ArrayBlockingQueue<>(512), 

                new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略

獲取處理結果和異常

執行緒池的處理結果、以及處理過程中的異常都被包裝到Future中,併在呼叫Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結果和任務執行過程中的異常。獲取執行結果的代碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);

Futurefuture = executorService.submit(new Callable() {

        @Override

        public Object call() throws Exception {

            throw new RuntimeException(“exception in call~”);// 該異常會在呼叫Future.get()時傳遞給呼叫者

        }

    });

     

try {

  Object result = future.get();

} catch (InterruptedException e) {

  // interrupt

} catch (ExecutionException e) {

  // exception in Callable.call()

  e.printStackTrace();

}

上述代碼輸出類似如下:

執行緒池的常用場景

正確構造執行緒池

int poolSize = Runtime.getRuntime().availableProcessors() * 2;

BlockingQueue queue = new ArrayBlockingQueue<>(512);

RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();

executorService = new ThreadPoolExecutor(poolSize, poolSize,

    0, TimeUnit.SECONDS,

            queue,

            policy);

獲取單個結果

過submit()向執行緒池提交任務後會傳回一個Future,呼叫V Future.get()方法能夠阻塞等待執行結果,V get(long timeout, TimeUnit unit)方法可以指定等待的超時時間。

獲取多個結果

如果向執行緒池提交了多個任務,要獲取這些任務的執行結果,可以依次呼叫Future.get()獲得。但對於這種場景,我們更應該使用ExecutorCompletionService,該類的take()方法總是阻塞等待某一個任務完成,然後傳回該任務的Future物件。向CompletionService批量提交任務後,只需呼叫相同次數的CompletionService.take()方法,就能獲取所有任務的執行結果,獲取順序是任意的,取決於任務的完成順序:

void solve(Executor executor, Collection> solvers)

   throws InterruptedException, ExecutionException {

    

   CompletionService ecs = new ExecutorCompletionService(executor);// 建構式

    

   for (Callable s : solvers)// 提交所有任務

       ecs.submit(s);

        

   int n = solvers.size();

   for (int i = 0; i < n; ++i) {// 獲取每一個完成的任務

       Result r = ecs.take().get();

       if (r != null)

           use(r);

   }

}

單個任務的超時時間

V Future.get(long timeout, TimeUnit unit)方法可以指定等待的超時時間,超時未完成會丟擲TimeoutException。

多個任務的超時時間

等待多個任務完成,並設置最大等待時間,可以通過CountDownLatch完成:

public void testLatch(ExecutorService executorService, List tasks) 

    throws InterruptedException{

       

    CountDownLatch latch = new CountDownLatch(tasks.size());

      for(Runnable r : tasks){

          executorService.submit(new Runnable() {

              @Override

              public void run() {

                  try{

                      r.run();

                  }finally {

                      latch.countDown();// countDown

                  }

              }

          });

      }

      latch.await(10, TimeUnit.SECONDS); // 指定超時時間

  }

執行緒池和裝修公司

以運營一家裝修公司做個比喻。公司在辦公地點等待客戶來提交裝修請求;公司有固定數量的正式工以維持運轉;旺季業務較多時,新來的客戶請求會被排期,比如接單後告訴用戶一個月後才能開始裝修;當排期太多時,為避免用戶等太久,公司會通過某些渠道(比如人才市場、熟人介紹等)雇佣一些臨時工(註意,招聘臨時工是在排期排滿之後);如果臨時工也忙不過來,公司將決定不再接收新的客戶,直接拒單。

執行緒池就是程式中的“裝修公司”,代勞各種臟活累活。上面的過程對應到執行緒池上:

// Java執行緒池的完整建構式

public ThreadPoolExecutor(

  int corePoolSize, // 正式工數量

  int maximumPoolSize, // 工人數量上限,包括正式工和臨時工

  long keepAliveTime, TimeUnit unit, // 臨時工游手好閑的最長時間,超過這個時間將被解雇

  BlockingQueue workQueue, // 排期佇列

  ThreadFactory threadFactory, // 招人渠道

  RejectedExecutionHandler handler) // 拒單方式

總結

Executors為我們提供了構造執行緒池的便捷方法,對於服務器程式我們應該杜絕使用這些便捷方法,而是直接使用執行緒池ThreadPoolExecutor的構造方法,避免無界佇列可能導致的OOM以及執行緒個數限制不當導致的執行緒數耗盡等問題。ExecutorCompletionService提供了等待所有任務執行結束的有效方式,如果要設置等待的超時時間,則可以通過CountDownLatch完成。

參考

  • ThreadPoolExecutor API Doc

    https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

【關於投稿】


如果大家有原創好文投稿,請直接給公號發送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂