歡迎光臨
每天分享高質量文章

Java執行緒池監控小結

最近我們組楊青同學遇到一個使用執行緒池不當的問題:非同步處理的執行緒池執行緒將主執行緒hang住了,分析程式碼發現是執行緒池的拒絕策略設定得不合理,設定為CallerRunsPolicy。當非同步執行緒的執行效率降低時,阻塞佇列滿了,觸發了拒絕策略,進而導致主執行緒hang死。

從這個問題中,我們學到了兩點:

  • 執行緒池的使用,需要充分分析業務場景後作出選擇,必要的情況下需要自定義執行緒池;

  • 執行緒池的執行狀況,也需要監控

關於執行緒池的監控,我參考了《Java程式設計的藝術》中提供的思路實現的,分享下我的程式碼片段,如下:

  1. public class AsyncThreadExecutor implements AutoCloseable {

  2.    private static final int DEFAULT_QUEUE_SIZE = 1000;

  3.    private static final int DEFAULT_POOL_SIZE = 10;

  4.    @Setter

  5.    private int queueSize = DEFAULT_QUEUE_SIZE;

  6.    @Setter

  7.    private int poolSize = DEFAULT_POOL_SIZE;

  8.    /**

  9.     * 用於週期性監控執行緒池的執行狀態

  10.     */

  11.    private final ScheduledExecutorService scheduledExecutorService =

  12.        Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());

  13.    /**

  14.     * 自定義非同步執行緒池

  15.     * (1)任務佇列使用有界佇列

  16.     * (2)自定義拒絕策略

  17.     */

  18.    private final ThreadPoolExecutor threadPoolExecutor =

  19.        new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),

  20.                               new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),

  21.                               (r, executor) -> log.error("the async executor pool is full!!"));

  22.    private final ExecutorService executorService = threadPoolExecutor;

  23.    @PostConstruct

  24.    public void init() {

  25.        scheduledExecutorService.scheduleAtFixedRate(() -> {

  26.            /**

  27.             * 執行緒池需要執行的任務數

  28.             */

  29.            long taskCount = threadPoolExecutor.getTaskCount();

  30.            /**

  31.             * 執行緒池在執行過程中已完成的任務數

  32.             */

  33.            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();

  34.            /**

  35.             * 曾經建立過的最大執行緒數

  36.             */

  37.            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();

  38.            /**

  39.             * 執行緒池裡的執行緒數量

  40.             */

  41.            long poolSize = threadPoolExecutor.getPoolSize();

  42.            /**

  43.             * 執行緒池裡活躍的執行緒數量

  44.             */

  45.            long activeCount = threadPoolExecutor.getActiveCount();

  46.            log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",

  47.                     taskCount, completedTaskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);

  48.        }, 0, 10, TimeUnit.MINUTES);

  49.    }

  50.    public void execute(Runnable task) {

  51.        executorService.execute(task);

  52.    }

  53.    @Override

  54.    public void close() throws Exception {

  55.        executorService.shutdown();

  56.    }

  57. }

這裡的主要思路是:(1)使用有界佇列的固定數量執行緒池;(2)拒絕策略是將任務丟棄,但是需要記錄錯誤日誌;(3)使用一個排程執行緒池對業務執行緒池進行監控。

在檢視監控日誌的時候,看到下圖所示的監控日誌:

這裡我對largestPooSize的含義比較困惑,按字面理解是“最大的執行緒池數量”,但是按照執行緒池的定義,maximumPoolSize和coreSize相同的時候(在這裡,都是10),一個執行緒池裡的最大執行緒數是10,那麼為什麼largestPooSize可以是39呢?我去翻這塊的原始碼:

  1.    /**

  2.     * Returns the largest number of threads that have ever

  3.     * simultaneously been in the pool.

  4.     *

  5.     * @return the number of threads

  6.     */

  7.    public int getLargestPoolSize() {

  8.        final ReentrantLock mainLock = this.mainLock;

  9.        mainLock.lock();

  10.        try {

  11.            return largestPoolSize;

  12.        } finally {

  13.            mainLock.unlock();

  14.        }

  15.    }

註釋的翻譯是:傳回在這個執行緒池裡曾經同時存在過的執行緒數。再看這個變數largestPoolSize在ThreadExecutor中的賦值的地方,程式碼如下:

  1. private boolean addWorker(Runnable firstTask, boolean core) {

  2.        retry:

  3.        for (;;) {

  4.            int c = ctl.get();

  5.            int rs = runStateOf(c);

  6.            // Check if queue empty only if necessary.

  7.            if (rs >= SHUTDOWN &&

  8.                ! (rs == SHUTDOWN &&

  9.                   firstTask == null &&

  10.                   ! workQueue.isEmpty()))

  11.                return false;

  12.            for (;;) {

  13.                int wc = workerCountOf(c);

  14.                if (wc >= CAPACITY ||

  15.                    wc >= (core ? corePoolSize : maximumPoolSize))

  16.                    return false;

  17.                if (compareAndIncrementWorkerCount(c))

  18.                    break retry;

  19.                c = ctl.get();  // Re-read ctl

  20.                if (runStateOf(c) != rs)

  21.                    continue retry;

  22.                // else CAS failed due to workerCount change; retry inner loop

  23.            }

  24.        }

  25.        boolean workerStarted = false;

  26.        boolean workerAdded = false;

  27.        Worker w = null;

  28.        try {

  29.            w = new Worker(firstTask);

  30.            final Thread t = w.thread;

  31.            if (t != null) {

  32.                final ReentrantLock mainLock = this.mainLock;

  33.                mainLock.lock();

  34.                try {

  35.                    // Recheck while holding lock.

  36.                    // Back out on ThreadFactory failure or if

  37.                    // shut down before lock acquired.

  38.                    int rs = runStateOf(ctl.get());

  39.                    if (rs < SHUTDOWN ||

  40.                        (rs == SHUTDOWN && firstTask == null)) {

  41.                        if (t.isAlive()) // precheck that t is startable

  42.                            throw new IllegalThreadStateException();

  43.                        workers.add(w);

  44.                        int s = workers.size();

  45.                        if (s > largestPoolSize)

  46.                            largestPoolSize = s;//這裡這裡!

  47.                        workerAdded = true;

  48.                    }

  49.                } finally {

  50.                    mainLock.unlock();

  51.                }

  52.                if (workerAdded) {

  53.                    t.start();

  54.                    workerStarted = true;

  55.                }

  56.            }

  57.        } finally {

  58.            if (! workerStarted)

  59.                addWorkerFailed(w);

  60.        }

  61.        return workerStarted;

  62.    }

發現largestPoolSize是worker集合的大小,但是註意,並不是worker集合中的所有worker都處於工作狀態。因此這裡結論就出來了:執行緒池的容量,值得是同時活躍(執行)的執行緒池個數;largestPoolSize的大小是執行緒池曾建立的執行緒個數,跟執行緒池的容量無關。

PS:楊青同學是這篇文章的靈感來源,他做了很多壓測。給了我很多思路,並跟我一起分析了一些程式碼。

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖