歡迎光臨
每天分享高質量文章

【死磕Java併發】—–J.U.C之執行緒池:ThreadPoolExecutor

原文出處http://cmsblogs.com/ 『chenssy

作為Executor框架中最核心的類,ThreadPoolExecutor代表著鼎鼎大名的執行緒池,它給了我們足夠的理由來弄清楚它。

下麵我們就透過原始碼來一步一步弄清楚它。

內部狀態

執行緒有五種狀態:新建,就緒,執行,阻塞,死亡,執行緒池同樣有五種狀態:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。

  1.    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  2.    private static final int COUNT_BITS = Integer.SIZE - 3;

  3.    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  4.    // runState is stored in the high-order bits

  5.    private static final int RUNNING    = -1 << COUNT_BITS;

  6.    private static final int SHUTDOWN   =  0 << COUNT_BITS;

  7.    private static final int STOP       =  1 << COUNT_BITS;

  8.    private static final int TIDYING    =  2 << COUNT_BITS;

  9.    private static final int TERMINATED =  3 << COUNT_BITS;

  10.    // Packing and unpacking ctl

  11.    private static int runStateOf(int c)     { return c & ~CAPACITY; }

  12.    private static int workerCountOf(int c)  { return c & CAPACITY; }

  13.    private static int ctlOf(int rs, int wc) { return rs | wc; }

變數ctl定義為AtomicInteger ,其功能非常強大,記錄了“執行緒池中的任務數量”和“執行緒池的狀態”兩個資訊。共32位,其中高3位表示"執行緒池狀態",低29位表示"執行緒池中的任務數量"。

  1. RUNNING            -- 對應的高3位值是111

  2. SHUTDOWN       -- 對應的高3位值是000

  3. STOP                   -- 對應的高3位值是001

  4. TIDYING              -- 對應的高3位值是010

  5. TERMINATED     -- 對應的高3位值是011

RUNNING:處於RUNNING狀態的執行緒池能夠接受新任務,以及對新新增的任務進行處理。

SHUTDOWN:處於SHUTDOWN狀態的執行緒池不可以接受新任務,但是可以對已新增的任務進行處理。

STOP:處於STOP狀態的執行緒池不接收新任務,不處理已新增的任務,並且會中斷正在處理的任務。

TIDYING:當所有的任務已終止,ctl記錄的"任務數量"為0,執行緒池會變為TIDYING狀態。當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated()。terminated()在ThreadPoolExecutor類中是空的,若使用者想在執行緒池變為TIDYING時,進行相應的處理;可以透過多載terminated()函式來實現。

TERMINATED:執行緒池徹底終止的狀態。

各個狀態的轉換如下:

建立執行緒池

我們可以透過ThreadPoolExecutor建構式來建立一個執行緒池:

  1.    public ThreadPoolExecutor(int corePoolSize,

  2.                              int maximumPoolSize,

  3.                              long keepAliveTime,

  4.                              TimeUnit unit,

  5.                              BlockingQueue<Runnable> workQueue,

  6.                              ThreadFactory threadFactory,

  7.                              RejectedExecutionHandler handler) {

  8.        if (corePoolSize < 0 ||

  9.            maximumPoolSize <= 0 ||

  10.            maximumPoolSize < corePoolSize ||

  11.            keepAliveTime < 0)

  12.            throw new IllegalArgumentException();

  13.        if (workQueue == null || threadFactory == null || handler == null)

  14.            throw new NullPointerException();

  15.        this.corePoolSize = corePoolSize;

  16.        this.maximumPoolSize = maximumPoolSize;

  17.        this.workQueue = workQueue;

  18.        this.keepAliveTime = unit.toNanos(keepAliveTime);

  19.        this.threadFactory = threadFactory;

  20.        this.handler = handler;

  21.    }

共有七個引數,每個引數含義如下:

corePoolSize

執行緒池中核心執行緒的數量。當提交一個任務時,執行緒池會新建一個執行緒來執行任務,直到當前執行緒數等於corePoolSize。如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。

maximumPoolSize

執行緒池中允許的最大執行緒數。執行緒池的阻塞佇列滿了之後,如果還有任務提交,如果當前的執行緒數小於maximumPoolSize,則會新建執行緒來執行任務。註意,如果使用的是無界佇列,該引數也就沒有什麼效果了。

keepAliveTime

執行緒空閑的時間。執行緒的建立和銷毀是需要代價的。執行緒執行完任務後不會立即銷毀,而是繼續存活一段時間:keepAliveTime。預設情況下,該引數只有在執行緒數大於corePoolSize時才會生效。

unit

keepAliveTime的單位。TimeUnit

workQueue

用來儲存等待執行的任務的阻塞佇列,等待的任務必須實現Runnable介面。我們可以選擇如下幾種:

  • ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,FIFO。【死磕Java併發】----J.U.C之阻塞佇列:ArrayBlockingQueue

  • LinkedBlockingQueue:基於連結串列結構的有界阻塞佇列,FIFO。

  • SynchronousQueue:不儲存元素的阻塞佇列,每個插入操作都必須等待一個移出操作,反之亦然。【死磕Java併發】----J.U.C之阻塞佇列:SynchronousQueue

  • PriorityBlockingQueue:具有優先界別的阻塞佇列。【死磕Java併發】----J.U.C之阻塞佇列:PriorityBlockingQueue

threadFactory

用於設定建立執行緒的工廠。該物件可以透過Executors.defaultThreadFactory(),如下:

  1.    public static ThreadFactory defaultThreadFactory() {

  2.        return new DefaultThreadFactory();

  3.    }

傳回的是DefaultThreadFactory物件,原始碼如下:

  1.    static class DefaultThreadFactory implements ThreadFactory {

  2.        private static final AtomicInteger poolNumber = new AtomicInteger(1);

  3.        private final ThreadGroup group;

  4.        private final AtomicInteger threadNumber = new AtomicInteger(1);

  5.        private final String namePrefix;

  6.        DefaultThreadFactory() {

  7.            SecurityManager s = System.getSecurityManager();

  8.            group = (s != null) ? s.getThreadGroup() :

  9.                                  Thread.currentThread().getThreadGroup();

  10.            namePrefix = "pool-" +

  11.                          poolNumber.getAndIncrement() +

  12.                         "-thread-";

  13.        }

  14.        public Thread newThread(Runnable r) {

  15.            Thread t = new Thread(group, r,

  16.                                  namePrefix + threadNumber.getAndIncrement(),

  17.                                  0);

  18.            if (t.isDaemon())

  19.                t.setDaemon(false);

  20.            if (t.getPriority() != Thread.NORM_PRIORITY)

  21.                t.setPriority(Thread.NORM_PRIORITY);

  22.            return t;

  23.        }

  24.    }

ThreadFactory的左右就是提供建立執行緒的功能的執行緒工廠。他是透過newThread()方法提供建立執行緒的功能,newThread()方法建立的執行緒都是“非守護執行緒”而且“執行緒優先順序都是Thread.NORM_PRIORITY”。

handler

RejectedExecutionHandler,執行緒池的拒絕策略。所謂拒絕策略,是指將任務新增到執行緒池中時,執行緒池拒絕該任務所採取的相應策略。當向執行緒池中提交任務時,如果此時執行緒池中的執行緒已經飽和了,而且阻塞佇列也已經滿了,則執行緒池會選擇一種拒絕策略來處理該任務。

執行緒池提供了四種拒絕策略:

  1. AbortPolicy:直接丟擲異常,預設策略;

  2. CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;

  3. DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;

  4. DiscardPolicy:直接丟棄任務; 當然我們也可以實現自己的拒絕策略,例如記錄日誌等等,實現RejectedExecutionHandler介面即可。

執行緒池

Executor框架提供了三種執行緒池,他們都可以透過工具類Executors來建立。

FixedThreadPool

FixedThreadPool,可重用固定執行緒數的執行緒池,其定義如下:

  1.    public static ExecutorService newFixedThreadPool(int nThreads) {

  2.        return new ThreadPoolExecutor(nThreads, nThreads,

  3.                                      0L, TimeUnit.MILLISECONDS,

  4.                                      new LinkedBlockingQueue<Runnable>());

  5.    }

corePoolSize 和 maximumPoolSize都設定為建立FixedThreadPool時指定的引數nThreads,意味著當執行緒池滿時且阻塞佇列也已經滿時,如果繼續提交任務,則會直接走拒絕策略,該執行緒池不會再新建執行緒來執行任務,而是直接走拒絕策略。FixedThreadPool使用的是預設的拒絕策略,即AbortPolicy,則直接丟擲異常。

keepAliveTime設定為0L,表示空閑的執行緒會立刻終止。

workQueue則是使用LinkedBlockingQueue,但是沒有設定範圍,那麼則是最大值(Integer.MAX_VALUE),這基本就相當於一個無界佇列了。使用該“無界佇列”則會帶來哪些影響呢?當執行緒池中的執行緒數量等於corePoolSize 時,如果繼續提交任務,該任務會被新增到阻塞佇列workQueue中,當阻塞佇列也滿了之後,則執行緒池會新建執行緒執行任務直到maximumPoolSize。由於FixedThreadPool使用的是“無界佇列”LinkedBlockingQueue,那麼maximumPoolSize引數無效,同時指定的拒絕策略AbortPolicy也將無效。而且該執行緒池也不會拒絕提交的任務,如果客戶端提交任務的速度快於任務的執行,那麼keepAliveTime也是一個無效引數。

其執行圖如下(參考《Java併發程式設計的藝術》):

SingleThreadExecutor

SingleThreadExecutor是使用單個worker執行緒的Executor,定義如下:

  1.    public static ExecutorService newSingleThreadExecutor() {

  2.        return new FinalizableDelegatedExecutorService

  3.            (new ThreadPoolExecutor(1, 1,

  4.                                    0L, TimeUnit.MILLISECONDS,

  5.                                    new LinkedBlockingQueue<Runnable>()));

  6.    }

作為單一worker執行緒的執行緒池,SingleThreadExecutor把corePool和maximumPoolSize均被設定為1,和FixedThreadPool一樣使用的是無界佇列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。

CachedThreadPool

CachedThreadPool是一個會根據需要建立新執行緒的執行緒池 ,他定義如下:

  1.    public static ExecutorService newCachedThreadPool() {

  2.        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

  3.                                      60L, TimeUnit.SECONDS,

  4.                                      new SynchronousQueue<Runnable>());

  5.    }

CachedThreadPool的corePool為0,maximumPoolSize為Integer.MAXVALUE,這就意味著所有的任務一提交就會加入到阻塞佇列中。keepAliveTime這是為60L,unit設定為TimeUnit.SECONDS,意味著空閑執行緒等待新任務的最長時間為60秒,空閑執行緒超過60秒後將會被終止。阻塞佇列採用的SynchronousQueue,而我們在【死磕Java併發】----J.U.C之阻塞佇列:SynchronousQueue中瞭解到SynchronousQueue是一個沒有元素的阻塞佇列,加上corePool = 0 ,maximumPoolSize = Integer.MAXVALUE,這樣就會存在一個問題,如果主執行緒提交任務的速度遠遠大於CachedThreadPool的處理速度,則CachedThreadPool會不斷地建立新執行緒來執行任務,這樣有可能會導致系統耗盡CPU和記憶體資源,所以在使用該執行緒池是,一定要註意控制併發的任務數,否則建立大量的執行緒可能導致嚴重的效能問題

任務提交

執行緒池根據業務不同的需求提供了兩種方式提交任務:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以獲取該任務執行的Future。
我們以Executor.execute()為例,來看看執行緒池的任務提交經歷了那些過程。

定義:

  1. public interface Executor {

  2.    void execute(Runnable command);

  3. }

ThreadPoolExecutor提供實現:

  1.    public void execute(Runnable command) {

  2.        if (command == null)

  3.            throw new NullPointerException();

  4.        int c = ctl.get();

  5.        if (workerCountOf(c) < corePoolSize) {

  6.            if (addWorker(command, true))

  7.                return;

  8.            c = ctl.get();

  9.        }

  10.        if (isRunning(c) && workQueue.offer(command)) {

  11.            int recheck = ctl.get();

  12.            if (! isRunning(recheck) && remove(command))

  13.                reject(command);

  14.            else if (workerCountOf(recheck) == 0)

  15.                addWorker(null, false);

  16.        }

  17.        else if (!addWorker(command, false))

  18.            reject(command);

  19.    }

執行流程如下:

  1. 如果執行緒池當前執行緒數小於corePoolSize,則呼叫addWorker建立新執行緒執行任務,成功傳回true,失敗執行步驟2。

  2. 如果執行緒池處於RUNNING狀態,則嘗試加入阻塞佇列,如果加入阻塞佇列成功,則嘗試進行Double Check,如果加入失敗,則執行步驟3。

  3. 如果執行緒池不是RUNNING狀態或者加入阻塞佇列失敗,則嘗試建立新執行緒直到maxPoolSize,如果失敗,則呼叫reject()方法執行相應的拒絕策略。

在步驟2中如果加入阻塞佇列成功了,則會進行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊裡中的執行緒是否可以被執行。如果執行緒池不是RUNNING狀態,則呼叫remove()方法從阻塞佇列中刪除該任務,然後呼叫reject()方法處理任務。否則需要確保還有執行緒執行。

addWorker當執行緒中的當前執行緒數小於corePoolSize,則呼叫addWorker()建立新執行緒執行任務,當前執行緒數則是根據ctl變數來獲取的,呼叫workerCountOf(ctl)獲取低29位即可:

  1.    private static int workerCountOf(int c)  { return c & CAPACITY; }

addWorker(Runnable firstTask, boolean core)方法用於建立執行緒執行任務,原始碼如下:

  1.    private boolean addWorker(Runnable firstTask, boolean core) {

  2.        retry:

  3.        for (;;) {

  4.            int c = ctl.get();

  5.            // 獲取當前執行緒狀態

  6.            int rs = runStateOf(c);

  7.            if (rs >= SHUTDOWN &&

  8.                    ! (rs == SHUTDOWN &&

  9.                            firstTask == null &&

  10.                            ! workQueue.isEmpty()))

  11.                return false;

  12.            // 內層迴圈,worker + 1

  13.            for (;;) {

  14.                // 執行緒數量

  15.                int wc = workerCountOf(c);

  16.                // 如果當前執行緒數大於執行緒最大上限CAPACITY  return false

  17.                // 若core == true,則與corePoolSize 比較,否則與maximumPoolSize ,大於 return false

  18.                if (wc >= CAPACITY ||

  19.                        wc >= (core ? corePoolSize : maximumPoolSize))

  20.                    return false;

  21.                // worker + 1,成功跳出retry迴圈

  22.                if (compareAndIncrementWorkerCount(c))

  23.                    break retry;

  24.                // CAS add worker 失敗,再次讀取ctl

  25.                c = ctl.get();

  26.                // 如果狀態不等於之前獲取的state,跳出內層迴圈,繼續去外層迴圈判斷

  27.                if (runStateOf(c) != rs)

  28.                    continue retry;

  29.            }

  30.        }

  31.        boolean workerStarted = false;

  32.        boolean workerAdded = false;

  33.        Worker w = null;

  34.        try {

  35.            // 新建執行緒:Worker

  36.            w = new Worker(firstTask);

  37.            // 當前執行緒

  38.            final Thread t = w.thread;

  39.            if (t != null) {

  40.                // 獲取主鎖:mainLock

  41.                final ReentrantLock mainLock = this.mainLock;

  42.                mainLock.lock();

  43.                try {

  44.                    // 執行緒狀態

  45.                    int rs = runStateOf(ctl.get());

  46.                    // rs < SHUTDOWN ==> 執行緒處於RUNNING狀態

  47.                    // 或者執行緒處於SHUTDOWN狀態,且firstTask == null(可能是workQueue中仍有未執行完成的任務,建立沒有初始任務的worker執行緒執行)

  48.                    if (rs < SHUTDOWN ||

  49.                            (rs == SHUTDOWN && firstTask == null)) {

  50.                        // 當前執行緒已經啟動,丟擲異常

  51.                        if (t.isAlive()) // precheck that t is startable

  52.                            throw new IllegalThreadStateException();

  53.                        // workers是一個HashSet

  54.                        workers.add(w);

  55.                        // 設定最大的池大小largestPoolSize,workerAdded設定為true

  56.                        int s = workers.size();

  57.                        if (s > largestPoolSize)

  58.                            largestPoolSize = s;

  59.                        workerAdded = true;

  60.                    }

  61.                } finally {

  62.                    // 釋放鎖

  63.                    mainLock.unlock();

  64.                }

  65.                // 啟動執行緒

  66.                if (workerAdded) {

  67.                    t.start();

  68.                    workerStarted = true;

  69.                }

  70.            }

  71.        } finally {

  72.            // 執行緒啟動失敗

  73.            if (! workerStarted)

  74.                addWorkerFailed(w);

  75.        }

  76.        return workerStarted;

  77.    }

  1. 判斷當前執行緒是否可以新增任務,如果可以則進行下一步,否則return false;

  2. rs >= SHUTDOWN ,表示當前執行緒處於SHUTDOWN ,STOP、TIDYING、TERMINATED狀態

  3. rs == SHUTDOWN , firstTask != null時不允許新增執行緒,因為執行緒處於SHUTDOWN 狀態,不允許新增任務

  4. rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允許新增執行緒,因為firstTask == null是為了新增一個沒有任務的執行緒然後再從workQueue中獲取任務的,如果workQueue == null,則說明新增的任務沒有任何意義。

  5. 內嵌迴圈,透過CAS worker + 1

  6. 獲取主鎖mailLock,如果執行緒池處於RUNNING狀態獲取處於SHUTDOWN狀態且 firstTask == null,則將任務新增到workers Queue中,然後釋放主鎖mainLock,然後啟動執行緒,然後return true,如果中途失敗導致workerStarted= false,則呼叫addWorkerFailed()方法進行處理。

在這裡需要好好理論addWorker中的引數,在execute()方法中,有三處呼叫了該方法:
第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true),這個很好理解,當然執行緒池的執行緒數量小於 corePoolSize ,則新建執行緒執行任務即可,在執行過程core == true,內部與corePoolSize比較即可。
第二次:加入阻塞佇列進行Double Check時,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)。如果執行緒池中的執行緒==0,按照道理應該該任務應該新建執行緒執行任務,但是由於已經該任務已經新增到了阻塞佇列,那麼就在執行緒池中新建一個空執行緒,然後從阻塞佇列中取執行緒即可。
第三次:執行緒池不是RUNNING狀態或者加入阻塞佇列失敗:else if (!addWorker(command, false)),這裡core == fase,則意味著是與maximumPoolSize比較。

在新建執行緒執行任務時,將講Runnable包裝成一個Worker,Woker為ThreadPoolExecutor的內部類

Woker內部類

Woker的原始碼如下:

  1.    private final class Worker extends AbstractQueuedSynchronizer

  2.            implements Runnable {

  3.        private static final long serialVersionUID = 6138294804551838833L;

  4.        // task 的thread

  5.        final Thread thread;

  6.        // 執行的任務task

  7.        Runnable firstTask;

  8.        volatile long completedTasks;

  9.        Worker(Runnable firstTask) {

  10.            //設定AQS的同步狀態private volatile int state,是一個計數器,大於0代表鎖已經被獲取

  11.            setState(-1);

  12.            this.firstTask = firstTask;

  13.            // 利用ThreadFactory和 Worker這個Runnable建立的執行緒物件

  14.            this.thread = getThreadFactory().newThread(this);

  15.        }

  16.        // 任務執行

  17.        public void run() {

  18.            runWorker(this);

  19.        }

  20.    }

從Worker的原始碼中我們可以看到Woker繼承AQS,實現Runnable介面,所以可以認為Worker既是一個可以執行的任務,也可以達到獲取鎖釋放鎖的效果。這裡繼承AQS主要是為了方便執行緒的中斷處理。這裡註意兩個地方:建構式、run()。建構式主要是做三件事:1.設定同步狀態state為-1,同步狀態大於0表示就已經獲取了鎖,2.設定將當前任務task設定為firstTask,3.利用Worker本身物件this和ThreadFactory建立執行緒物件。

當執行緒thread啟動(呼叫start()方法)時,其實就是執行Worker的run()方法,內部呼叫runWorker()。

runWorker

  1.    final void runWorker(Worker w) {

  2.        // 當前執行緒

  3.        Thread wt = Thread.currentThread();

  4.        // 要執行的任務

  5.        Runnable task = w.firstTask;

  6.        w.firstTask = null;

  7.        // 釋放鎖,執行中斷

  8.        w.unlock(); // allow interrupts

  9.        boolean completedAbruptly = true;

  10.        try {

  11.            while (task != null || (task = getTask()) != null) {

  12.                // worker 獲取鎖

  13.                w.lock();

  14.                // 確保只有當執行緒是stoping時,才會被設定為中斷,否則清楚中斷標示

  15.                // 如果執行緒池狀態 >= STOP ,且當前執行緒沒有設定中斷狀態,則wt.interrupt()

  16.                // 如果執行緒池狀態 < STOP,但是執行緒已經中斷了,再次判斷執行緒池是否 >= STOP,如果是 wt.interrupt()

  17.                if ((runStateAtLeast(ctl.get(), STOP) ||

  18.                        (Thread.interrupted() &&

  19.                                runStateAtLeast(ctl.get(), STOP))) &&

  20.                        !wt.isInterrupted())

  21.                    wt.interrupt();

  22.                try {

  23.                    // 自定義方法

  24.                    beforeExecute(wt, task);

  25.                    Throwable thrown = null;

  26.                    try {

  27.                        // 執行任務

  28.                        task.run();

  29.                    } catch (RuntimeException x) {

  30.                        thrown = x; throw x;

  31.                    } catch (Error x) {

  32.                        thrown = x; throw x;

  33.                    } catch (Throwable x) {

  34.                        thrown = x; throw new Error(x);

  35.                    } finally {

  36.                        afterExecute(task, thrown);

  37.                    }

  38.                } finally {

  39.                    task = null;

  40.                    // 完成任務數 + 1

  41.                    w.completedTasks++;

  42.                    // 釋放鎖

  43.                    w.unlock();

  44.                }

  45.            }

  46.            completedAbruptly = false;

  47.        } finally {

  48.            processWorkerExit(w, completedAbruptly);

  49.        }

  50.    }

執行流程

  1. 根據worker獲取要執行的任務task,然後呼叫unlock()方法釋放鎖,這裡釋放鎖的主要目的在於中斷,因為在new Worker時,設定的state為-1,呼叫unlock()方法可以將state設定為0,這裡主要原因就在於interruptWorkers()方法只有在state >= 0時才會執行;

  2. 透過getTask()獲取執行的任務,呼叫task.run()執行,當然在執行之前會呼叫worker.lock()上鎖,執行之後呼叫worker.unlock()放鎖;

  3. 在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法,則兩個方法在ThreadPoolExecutor中是空實現;

  4. 如果執行緒執行完成,則會呼叫getTask()方法從阻塞佇列中獲取新任務,如果阻塞佇列為空,則根據是否超時來判斷是否需要阻塞;

  5. task == null或者丟擲異常(beforeExecute()、task.run()、afterExecute()均有可能)導致worker執行緒終止,則呼叫processWorkerExit()方法處理worker退出流程。

getTask()

  1.    private Runnable getTask() {

  2.        boolean timedOut = false; // Did the last poll() time out?

  3.        for (;;) {

  4.            // 執行緒池狀態

  5.            int c = ctl.get();

  6.            int rs = runStateOf(c);

  7.            // 執行緒池中狀態 >= STOP 或者 執行緒池狀態 == SHUTDOWN且阻塞佇列為空,則worker - 1,return null

  8.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

  9.                decrementWorkerCount();

  10.                return null;

  11.            }

  12.            int wc = workerCountOf(c);

  13.            // 判斷是否需要超時控制

  14.            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

  15.            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {

  16.                if (compareAndDecrementWorkerCount(c))

  17.                    return null;

  18.                continue;

  19.            }

  20.            try {

  21.                // 從阻塞佇列中獲取task

  22.                // 如果需要超時控制,則呼叫poll(),否則呼叫take()

  23.                Runnable r = timed ?

  24.                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

  25.                        workQueue.take();

  26.                if (r != null)

  27.                    return r;

  28.                timedOut = true;

  29.            } catch (InterruptedException retry) {

  30.                timedOut = false;

  31.            }

  32.        }

  33.    }

timed == true,呼叫poll()方法,如果在keepAliveTime時間內還沒有獲取task的話,則傳回null,繼續迴圈。timed == false,則呼叫take()方法,該方法為一個阻塞方法,沒有任務時會一直阻塞掛起,直到有任務加入時對該執行緒喚醒,傳回任務。

在runWorker()方法中,無論最終結果如何,都會執行processWorkerExit()方法對worker進行退出處理。

processWorkerExit()

  1.    private void processWorkerExit(Worker w, boolean completedAbruptly) {

  2.        // true:使用者執行緒執行異常,需要扣減

  3.        // false:getTask方法中扣減執行緒數量

  4.        if (completedAbruptly)

  5.            decrementWorkerCount();

  6.        // 獲取主鎖

  7.        final ReentrantLock mainLock = this.mainLock;

  8.        mainLock.lock();

  9.        try {

  10.            completedTaskCount += w.completedTasks;

  11.            // 從HashSet中移出worker

  12.            workers.remove(w);

  13.        } finally {

  14.            mainLock.unlock();

  15.        }

  16.        // 有worker執行緒移除,可能是最後一個執行緒退出需要嘗試終止執行緒池

  17.        tryTerminate();

  18.        int c = ctl.get();

  19.        // 如果執行緒為running或shutdown狀態,即tryTerminate()沒有成功終止執行緒池,則判斷是否有必要一個worker

  20.        if (runStateLessThan(c, STOP)) {

  21.            // 正常退出,計算min:需要維護的最小執行緒數量

  22.            if (!completedAbruptly) {

  23.                // allowCoreThreadTimeOut 預設false:是否需要維持核心執行緒的數量

  24.                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

  25.                // 如果min ==0 或者workerQueue為空,min = 1

  26.                if (min == 0 && ! workQueue.isEmpty())

  27.                    min = 1;

  28.                // 如果執行緒數量大於最少數量min,直接傳回,不需要新增執行緒

  29.                if (workerCountOf(c) >= min)

  30.                    return; // replacement not needed

  31.            }

  32.            // 新增一個沒有firstTask的worker

  33.            addWorker(null, false);

  34.        }

  35.    }

首先completedAbruptly的值來判斷是否需要對執行緒數-1處理,如果completedAbruptly == true,說明在任務執行過程中出現了異常,那麼需要進行減1處理,否則不需要,因為減1處理在getTask()方法中處理了。然後從HashSet中移出該worker,過程需要獲取mainlock。然後呼叫tryTerminate()方法處理,該方法是對最後一個執行緒退出做終止執行緒池動作。如果執行緒池沒有終止,那麼執行緒池需要保持一定數量的執行緒,則透過addWorker(null,false)新增一個空的執行緒。

addWorkerFailed()

在addWorker()方法中,如果執行緒t==null,或者在add過程出現異常,會導致workerStarted == false,那麼在最後會呼叫addWorkerFailed()方法:

  1.    private void addWorkerFailed(Worker w) {

  2.        final ReentrantLock mainLock = this.mainLock;

  3.        mainLock.lock();

  4.        try {

  5.            // 從HashSet中移除該worker

  6.            if (w != null)

  7.                workers.remove(w);

  8.            // 執行緒數 - 1

  9.            decrementWorkerCount();

  10.            // 嘗試終止執行緒

  11.            tryTerminate();

  12.        } finally {

  13.            mainLock.unlock();

  14.        }

  15.    }

整個邏輯顯得比較簡單。

tryTerminate()

當執行緒池涉及到要移除worker時候都會呼叫tryTerminate(),該方法主要用於判斷執行緒池中的執行緒是否已經全部移除了,如果是的話則關閉執行緒池。

  1.    final void tryTerminate() {

  2.        for (;;) {

  3.            int c = ctl.get();

  4.            // 執行緒池處於Running狀態

  5.            // 執行緒池已經終止了

  6.            // 執行緒池處於ShutDown狀態,但是阻塞佇列不為空

  7.            if (isRunning(c) ||

  8.                    runStateAtLeast(c, TIDYING) ||

  9.                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

  10.                return;

  11.            // 執行到這裡,就意味著執行緒池要麼處於STOP狀態,要麼處於SHUTDOWN且阻塞佇列為空

  12.            // 這時如果執行緒池中還存在執行緒,則會嘗試中斷執行緒

  13.            if (workerCountOf(c) != 0) {

  14.                // /執行緒池還有執行緒,但是佇列沒有任務了,需要中斷喚醒等待任務的執行緒

  15.                // (runwoker的時候首先就透過w.unlock設定執行緒可中斷,getTask最後面的catch處理中斷)

  16.                interruptIdleWorkers(ONLY_ONE);

  17.                return;

  18.            }

  19.            final ReentrantLock mainLock = this.mainLock;

  20.            mainLock.lock();

  21.            try {

  22.                // 嘗試終止執行緒池

  23.                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

  24.                    try {

  25.                        terminated();

  26.                    } finally {

  27.                        // 執行緒池狀態轉為TERMINATED

  28.                        ctl.set(ctlOf(TERMINATED, 0));

  29.                        termination.signalAll();

  30.                    }

  31.                    return;

  32.                }

  33.            } finally {

  34.                mainLock.unlock();

  35.            }

  36.        }

  37.    }

在關閉執行緒池的過程中,如果執行緒池處於STOP狀態或者處於SHUDOWN狀態且阻塞佇列為null,則執行緒池會呼叫interruptIdleWorkers()方法中斷所有執行緒,註意ONLY_ONE== true,表示僅中斷一個執行緒。

interruptIdleWorkers

  1.    private void interruptIdleWorkers(boolean onlyOne) {

  2.        final ReentrantLock mainLock = this.mainLock;

  3.        mainLock.lock();

  4.        try {

  5.            for (Worker w : workers) {

  6.                Thread t = w.thread;

  7.                if (!t.isInterrupted() && w.tryLock()) {

  8.                    try {

  9.                        t.interrupt();

  10.                    } catch (SecurityException ignore) {

  11.                    } finally {

  12.                        w.unlock();

  13.                    }

  14.                }

  15.                if (onlyOne)

  16.                    break;

  17.            }

  18.        } finally {

  19.            mainLock.unlock();

  20.        }

  21.    }

onlyOne==true僅終止一個執行緒,否則終止所有執行緒。

執行緒終止

執行緒池ThreadPoolExecutor提供了shutdown()和shutDownNow()用於關閉執行緒池。

shutdown():按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務。

shutdownNow() :嘗試停止所有的活動執行任務、暫停等待任務的處理,並傳回等待執行的任務串列。

shutdown

  1.    public void shutdown() {

  2.        final ReentrantLock mainLock = this.mainLock;

  3.        mainLock.lock();

  4.        try {

  5.            checkShutdownAccess();

  6.            // 推進執行緒狀態

  7.            advanceRunState(SHUTDOWN);

  8.            // 中斷空閑的執行緒

  9.            interruptIdleWorkers();

  10.            // 交給子類實現

  11.            onShutdown();

  12.        } finally {

  13.            mainLock.unlock();

  14.        }

  15.        tryTerminate();

  16.    }

shutdownNow

  1.    public List<Runnable> shutdownNow() {

  2.        List<Runnable> tasks;

  3.        final ReentrantLock mainLock = this.mainLock;

  4.        mainLock.lock();

  5.        try {

  6.            checkShutdownAccess();

  7.            advanceRunState(STOP);

  8.            // 中斷所有執行緒

  9.            interruptWorkers();

  10.            // 傳回等待執行的任務串列

  11.            tasks = drainQueue();

  12.        } finally {

  13.            mainLock.unlock();

  14.        }

  15.        tryTerminate();

  16.        return tasks;

  17.    }

與shutdown不同,shutdownNow會呼叫interruptWorkers()方法中斷所有執行緒。

  1.    private void interruptWorkers() {

  2.        final ReentrantLock mainLock = this.mainLock;

  3.        mainLock.lock();

  4.        try {

  5.            for (Worker w : workers)

  6.                w.interruptIfStarted();

  7.        } finally {

  8.            mainLock.unlock();

  9.        }

  10.    }

同時會呼叫drainQueue()方法傳回等待執行到任務串列。

  1.    private List<Runnable> drainQueue() {

  2.        BlockingQueue<Runnable> q = workQueue;

  3.        ArrayList<Runnable> taskList = new ArrayList<Runnable>();

  4.        q.drainTo(taskList);

  5.        if (!q.isEmpty()) {

  6.            for (Runnable r : q.toArray(new Runnable[0])) {

  7.                if (q.remove(r))

  8.                    taskList.add(r);

  9.            }

  10.        }

  11.        return taskList;

  12.    }

END

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖