歡迎光臨
每天分享高質量文章

擴展 ThreadPoolExecutor 的一種辦法

(點擊上方公眾號,可快速關註)


來源:Sam哥哥 ,

blog.csdn.net/linsongbin1/article/details/78275283

概述

在JAVA的世界里,如果想並行的執行一些任務,可以使用ThreadPoolExecutor。

大部分情況下直接使用ThreadPoolExecutor就可以滿足要求了,但是在某些場景下,比如瞬時大流量的,為了提高響應和吞吐量,最好還是擴展一下ThreadPoolExecutor。

全宇宙的JAVA IT人士應該都知道ThreadPoolExecutor的執行流程:

  • core執行緒還能應付的,則不斷的創建新的執行緒;

  • core執行緒無法應付,則將任務扔到佇列裡面;

  • 佇列滿了(意味著插入任務失敗),則開始創建MAX執行緒,執行緒數達到MAX後,佇列還一直是滿的,則丟擲RejectedExecutionException.

這個執行流程有個小問題,就是當core執行緒無法應付請求的時候,會立刻將任務添加到佇列中,如果佇列非常長,而任務又非常多,那麼將會有頻繁的任務入佇列和任務出佇列的操作。

根據實際的壓測發現,這種操作也是有一定消耗的。其實JAVA提供的SynchronousQueue佇列是一個零長度的佇列,任務都是直接由生產者遞交給消費者,中間沒有入佇列的過程,可見JAVA API的設計者也是有考慮過入佇列這種操作的開銷。

另外,任務一多,立刻扔到佇列里,而MAX執行緒又不幹活,如果佇列裡面太多任務了,只有可憐的core執行緒在忙,也是會影響性能的。

當core執行緒無法應付請求的時候,能不能延後入佇列這個操作呢? 讓MAX執行緒儘快啟動起來,幫忙處理任務。

也即是說,當core執行緒無法應付請求的時候,如果當前執行緒池中的執行緒數量還小於MAX執行緒數的時候,繼續創建新的執行緒處理任務,一直到執行緒數量到達MAX後,才將任務插入到佇列里。

我們通過改寫佇列的offer方法來實現這個標的。

  @Override

public  boolean offer(Runnable o) {

    int currentPoolThreadSize = executor.getPoolSize();

    //如果執行緒池裡的執行緒數量已經到達最大,將任務添加到佇列中

    if (currentPoolThreadSize == executor.getMaximumPoolSize()) {

        return super.offer(o);

    }

    //說明有空閑的執行緒,這個時候無需創建core執行緒之外的執行緒,而是把任務直接丟到佇列里即可

    if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {

        return super.offer(o);

    }

 

    //如果執行緒池裡的執行緒數量還沒有到達最大,直接創建執行緒,而不是把任務丟到佇列裡面

    if (currentPoolThreadSize < executor.getMaximumPoolSize()) {

        return false;

    }

 

    return super.offer(o);

}

註意其中的

if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {

        return super.offer(o);

}

是表示core執行緒仍然能處理的來,同時又有空閑執行緒的情況,將任務插入到佇列中。 如何判斷執行緒池中有空閑執行緒呢? 可以使用一個計數器來實現,每當execute方法被執行的時候,計算器加1,當afterExecute被執行後,計數器減1.

@Override

public void execute(Runnable command) {

    submittedTaskCount.incrementAndGet();

    //代碼未完整,待補充。。。。。

}

@Override

   protected void afterExecute(Runnable r, Throwable t) {

       submittedTaskCount.decrementAndGet();

   }

這樣,當

executor.getSubmittedTaskCount() < currentPoolThreadSize

的時候,說明有空閑執行緒。

完整代碼

EnhancedThreadPoolExecutor類

package executer;

 

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

 

public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

 

    /**

     * 計數器,用於表示已經提交到佇列裡面的task的數量,這裡task特指還未完成的task。

     * 當task執行完後,submittedTaskCount會減1的。

     */

    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

 

    public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {

        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());

        workQueue.setExecutor(this);

    }

 

    /**

     * 改寫父類的afterExecute方法,當task執行完成後,將計數器減1

     */

    @Override

    protected void afterExecute(Runnable r, Throwable t) {

        submittedTaskCount.decrementAndGet();

    }

 

 

    public int getSubmittedTaskCount() {

        return submittedTaskCount.get();

    }

 

 

    /**

     * 改寫父類的execute方法,在任務開始執行之前,計數器加1。

     */

    @Override

    public void execute(Runnable command) {

        submittedTaskCount.incrementAndGet();

        try {

            super.execute(command);

        } catch (RejectedExecutionException rx) {

            //當發生RejectedExecutionException,嘗試再次將task丟到佇列裡面,如果還是發生RejectedExecutionException,則直接丟擲異常。

            BlockingQueue taskQueue = super.getQueue();

            if (taskQueue instanceof TaskQueue) {

                final TaskQueue queue = (TaskQueue)taskQueue;

                if (!queue.forceTaskIntoQueue(command)) {

                    submittedTaskCount.decrementAndGet();

                    throw new RejectedExecutionException(“佇列已滿”);

                }

            } else {

                submittedTaskCount.decrementAndGet();

                throw rx;

            }

        }

    }

}

TaskQueue

package executer;

 

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.RejectedExecutionException;

 

public class TaskQueue extends LinkedBlockingQueue {

    private EnhancedThreadPoolExecutor executor;

 

    public TaskQueue(int capacity) {

        super(capacity);

    }

 

    public void setExecutor(EnhancedThreadPoolExecutor exec) {

        executor = exec;

    }

 

    public boolean forceTaskIntoQueue(Runnable o) {

        if (executor.isShutdown()) {

            throw new RejectedExecutionException(“Executor已經關閉了,不能將task添加到佇列裡面”);

        }

        return super.offer(o);

    }

 

    @Override

    public  boolean offer(Runnable o) {

        int currentPoolThreadSize = executor.getPoolSize();

        //如果執行緒池裡的執行緒數量已經到達最大,將任務添加到佇列中

        if (currentPoolThreadSize == executor.getMaximumPoolSize()) {

            return super.offer(o);

        }

        //說明有空閑的執行緒,這個時候無需創建core執行緒之外的執行緒,而是把任務直接丟到佇列里即可

        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {

            return super.offer(o);

        }

 

        //如果執行緒池裡的執行緒數量還沒有到達最大,直接創建執行緒,而不是把任務丟到佇列裡面

        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {

            return false;

        }

 

        return super.offer(o);

    }

}

TestExecuter

package executer;

 

import java.util.concurrent.TimeUnit;

 

public class TestExecuter {

    private static final int CORE_SIZE = 5;

 

    private static final int MAX_SIZE = 10;

 

    private static final long KEEP_ALIVE_TIME = 30;

 

    private static final int QUEUE_SIZE = 5;

 

    static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));

 

    public static void main(String[] args){

        for (int i = 0; i < 15; i++) {

            executor.execute(new Runnable() {

                @Override

                public void run() {

                    try {

                        Thread.currentThread().sleep(1000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            });

 

            System.out.println(“執行緒池中現在的執行緒數目是:”+executor.getPoolSize()+”,  佇列中正在等待執行的任務數量為:”+ executor.getQueue().size());

        }

    }

}

先運行一下代碼,看看是否如何預期。直接執行TestExecuter類中的main方法,運行結果如下:

執行緒池中現在的執行緒數目是:1,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:2,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:3,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:4,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:6,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:7,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:8,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:9,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:1

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:2

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:3

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:4

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:5

可以看到當執行緒數增加到core數量的時候,佇列中是沒有任務的。一直到執行緒數量增加到MAX數量,也即是10的時候,佇列中才開始有任務。符合我們的預期。

如果我們註釋掉TaskQueue類中的offer方法,也即是不改寫佇列的offer方法,那麼運行結果如下:

執行緒池中現在的執行緒數目是:1,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:2,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:3,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:4,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:0

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:1

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:2

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:3

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:4

執行緒池中現在的執行緒數目是:5,  佇列中正在等待執行的任務數量為:5

執行緒池中現在的執行緒數目是:6,  佇列中正在等待執行的任務數量為:5

執行緒池中現在的執行緒數目是:7,  佇列中正在等待執行的任務數量為:5

執行緒池中現在的執行緒數目是:8,  佇列中正在等待執行的任務數量為:5

執行緒池中現在的執行緒數目是:9,  佇列中正在等待執行的任務數量為:5

執行緒池中現在的執行緒數目是:10,  佇列中正在等待執行的任務數量為:5

可以看到當執行緒數增加到core數量的時候,佇列中已經有任務了。

進一步思考

在使用ThreadPoolExecutor的時候,如果發生了RejectedExecutionException,該如何處理?本文中的代碼是採用了重新將任務嘗試插入到佇列中,如果還是失敗則直接將reject異常丟擲去。

@Override

    public void execute(Runnable command) {

        submittedTaskCount.incrementAndGet();

        try {

            super.execute(command);

        } catch (RejectedExecutionException rx) {

            //當發生RejectedExecutionException,嘗試再次將task丟到佇列裡面,如果還是發生RejectedExecutionException,則直接丟擲異常。

            BlockingQueue taskQueue = super.getQueue();

            if (taskQueue instanceof TaskQueue) {

                final TaskQueue queue = (TaskQueue)taskQueue;

                if (!queue.forceTaskIntoQueue(command)) {

                    submittedTaskCount.decrementAndGet();

                    throw new RejectedExecutionException(“佇列已滿”);

                }

            } else {

                submittedTaskCount.decrementAndGet();

                throw rx;

            }

        }

    }

TaskQueue類提供了forceTaskIntoQueue方法,將任務插入到佇列中。

還有另一種解決方案,就是使用另外一個執行緒池來執行任務,當第一個執行緒池丟擲Reject異常時,catch住它,並使用第二個執行緒池處理任務。

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂