歡迎光臨
每天分享高質量文章

C#並行程式設計(2):.NET執行緒池

執行緒 Thread

在總結執行緒池之前,先來看一下.NET執行緒。

.NET執行緒與作業系統(Windows)執行緒有什麼區別?

.NET利用Windows的執行緒處理功能。在C#程式編寫中,我們首先會新建一個執行緒物件System.Threading.Thread,併為其指定一個回呼方法;當我們呼叫執行緒物件的Start方法啟動執行緒時,會建立一個作業系統執行緒來執行回呼方法。.NET中的執行緒實際上等價於Windows系統執行緒,都是CPU排程和分配的物件。

前臺執行緒和後臺執行緒

.NET把執行緒分為前臺執行緒和後臺執行緒,兩者幾乎相同,唯一的區別是,前臺執行緒會阻止行程的正常退出,後臺執行緒則不會。下麵用一個例子描述前、後臺執行緒的區別:


class Program
{
    static void Main(string[] args)
    {
        ThreadDemo threadDemo = new ThreadDemo();
        
        threadDemo.RunBackgroundThread();

        {
            Thread.Sleep(5000);
            Thread.CurrentThread.Abort();
        }

        Console.ReadKey();
    }
}

public class ThreadDemo
{
    private readonly Thread _foregroundThread;
    private readonly Thread _backgroundThread;

    public ThreadDemo()
    {
        this._foregroundThread = new Thread(WriteNumberWorker) { Name = "ForegroundThread"};
        this._backgroundThread = new Thread(WriteNumberWorker) { Name = "BackgroundThread", IsBackground = true };
    }

    
    
    
    private static void WriteNumberWorker()
    {
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine($"{DateTime.Now}=> {Thread.CurrentThread.Name} writes {i + 1}.");
            Thread.Sleep(500);
        }
    }

    
    
    
    public void RunForegroundThread()
    {
        this._foregroundThread?.Start();
    }

    
    
    
    public void RunBackgroundThread()
    {
        this._backgroundThread?.Start();
    }
}

執行緒池 ThreadPool

執行緒的建立和銷毀要耗費很多時間,而且過多的執行緒不僅會浪費記憶體空間,還會導致執行緒背景關係切換頻繁,影響程式效能。為改善這些問題,.NET執行時(CLR)會為每個行程開闢一個全域性唯一的執行緒池來管理其執行緒。

執行緒池內部維護一個操作請求佇列,程式執行非同步操作時,新增標的操作到執行緒池的請求佇列;執行緒池程式碼提取記錄項並派發給執行緒池中的一個執行緒;如果執行緒池中沒有可用執行緒,就建立一個新執行緒,建立的新執行緒不會隨任務的完成而銷毀,這樣就可以避免執行緒的頻繁建立和銷毀。如果執行緒池中大量執行緒長時間無所事事,空閑執行緒會進行自我終結以釋放資源。

執行緒池透過保持行程中執行緒的少量和高效來最佳化程式的效能。

C#中執行緒池是一個靜態類,維護兩種執行緒,工作執行緒非同步IO執行緒,這些執行緒都是後臺執行緒。執行緒池不會影響行程的正常退出。

執行緒池的使用

執行緒池提供兩個靜態方法SetMaxThreadsSetMinThreads讓我們設定執行緒池的最大執行緒數和最小執行緒數。最大執行緒數指的是,該執行緒池能夠建立的最大執行緒數,當執行緒數達到設定值且忙碌,非同步任務將進入請求佇列,直到有執行緒空閑才會執行;最小執行緒數指的是,執行緒池優先嘗試以設定數量的執行緒處理請求,當請求數達到一定量(未做深入研究)時,才會建立新的執行緒。

下麵的例子展示了執行緒池的特性及常見使用方式。

class Program
{
    static void Main(string[] args)
    {
        
        RunCancellableWork();

        Console.ReadKey();
    }

    static void RunThreadPoolDemo()
    {
        ThreadPoolDemo.ThreadPoolDemo.ShowThreadPoolInfo();
        ThreadPool.SetMaxThreads(100, 100);
        ThreadPool.SetMinThreads(8, 8);
        ThreadPoolDemo.ThreadPoolDemo.ShowThreadPoolInfo();
        ThreadPoolDemo.ThreadPoolDemo.MakeThreadPoolDoSomeWork(100);
        ThreadPoolDemo.ThreadPoolDemo.MakeThreadPoolDoSomeIOWork();
    }

    static void RunCancellableWork()
    {
        Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] started a work");
        Console.WriteLine("Press 'Esc' to cancel the work.");
        Console.WriteLine();
        ThreadPoolDemo.ThreadPoolDemo.DoSomeWorkWithCancellation();
        if (Console.ReadKey(true).Key == ConsoleKey.Escape)
        {
            ThreadPoolDemo.ThreadPoolDemo.CTSource.Cancel();
        }
    }
}

public class ThreadPoolDemo
{
    
    
    
    public static void ShowThreadPoolInfo()
    {
        int workThreads, completionPortThreads;

        
        ThreadPool.GetAvailableThreads(out workThreads, out completionPortThreads);
        Console.WriteLine($"GetAvailableThreads => workThreads:{workThreads};completionPortThreads:{completionPortThreads}");
        
        ThreadPool.GetMaxThreads(out workThreads, out completionPortThreads);
        Console.WriteLine($"GetMaxThreads => workThreads:{workThreads};completionPortThreads:{completionPortThreads}");
        
        ThreadPool.GetMinThreads(out workThreads, out completionPortThreads);
        Console.WriteLine($"GetMinThreads => workThreads:{workThreads};completionPortThreads:{completionPortThreads}");
        Console.WriteLine();
    }

    
    
    
    public static void MakeThreadPoolDoSomeWork(int workCount = 10)
    {
        for (int i = 0; i < workCount; i++)
        {
            int index = i;
            
            ThreadPool.QueueUserWorkItem(s =>
            {
                Thread.Sleep(100);
                Debug.Print($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] is running. [{index}]");
                ShowAvailableThreads("WorkerThread");
            });
        }
    }

    
    
    
    public static void MakeThreadPoolDoSomeIOWork()
    {
        
        IList<string> uriList = new List<string>()
        {
            "http://news.baidu.com/",
            "https://www.hao123.com/",
            "https://map.baidu.com/",
            "https://tieba.baidu.com/",
            "https://wenku.baidu.com/",
            "http://fanyi-pro.baidu.com",
            "http://bit.baidu.com/",
            "http://xueshu.baidu.com/",
            "http://www.cnki.net/",
            "http://www.wanfangdata.com.cn",
        };

        foreach (string uri in uriList)
        {
            WebRequest request = WebRequest.Create(uri);
            request.BeginGetResponse(ac =>
            {
                try
                {
                    WebResponse response = request.EndGetResponse(ac);
                    ShowAvailableThreads("IOThread");
                    Debug.Print($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] is running. [{response.ContentLength}]");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }, request);
        }
    }

    
    
    
    private static void ShowAvailableThreads(string sourceTag = null)
    {
        int workThreads, completionPortThreads;
        ThreadPool.GetAvailableThreads(out workThreads, out completionPortThreads);
        Console.WriteLine($"{sourceTag} GetAvailableThreads => workThreads:{workThreads};completionPortThreads:{completionPortThreads}");
        Console.WriteLine();
    }

    
    
    
    public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();

    
    
    
    public static void DoSomeWorkWithCancellation()
    {
        ThreadPool.QueueUserWorkItem(t =>
        {
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] begun running. [0 - 9999]");

            for (int i = 0; i < 10000; i++)
            {
                if (CTSource.Token.IsCancellationRequested)
                {
                    Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] recived the cancel token. [{i}]");
                    break;
                }
                Thread.Sleep(100);
            }

            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was cancelled.");
        });
    }
}

執行緒池的排程

前面提到,執行緒池內部維護者一個工作項佇列,這個佇列指的是執行緒池全域性佇列。實際上,除了全域性佇列,執行緒池會給每個工作者執行緒維護一個本地佇列。

當我們呼叫ThreadPool.QueueUserWorkItem方法時,工作項會被放入全域性佇列;使用定時器Timer的時候,也會將工作項放入全域性佇列;但是,當我們使用任務Task的時候,假如使用預設的任務排程器,任務會被排程到工作者執行緒的本地佇列中。

工作者執行緒優先執行本地佇列中最新進入的任務,如果本地佇列中已經沒有任務,執行緒會嘗試從其他工作者執行緒任務佇列的隊尾取任務執行,這裡需要進行同步。如果所有工作者執行緒的本地佇列都沒有任務可以執行,工作者執行緒才會從全域性佇列取最新的工作項來執行。所有任務執行完畢後,執行緒睡眠,睡眠一定時間後,執行緒醒來並銷毀自己以釋放資源。

執行緒池處理非同步IO的內部原理

上面的例子中,從網站獲取資訊需要用到執行緒池的非同步IO執行緒,執行緒池內部利用IOCP(IO完成埠)與硬體裝置建立連線。非同步IO實現過程如下:

  1. 託管的IO請求執行緒呼叫Win32原生代碼ReadFile方法
  2. ReadFile方法分配IO請求包IRP併傳送至Windows核心
  3. Windows核心把收到的IRP放入對應裝置驅動程式的IRP佇列中,此時IO請求執行緒已經可以傳回託管程式碼
  4. 驅動程式處理IRP並將處理結果放入.NET執行緒池的IRP結果佇列中
  5. 執行緒池分配IO執行緒處理IRP結果

小結

.NET執行緒池是併發程式設計的主要實現方式。C#中TimerParallelTask在內部都是利用執行緒池實現的非同步功能,深入理解執行緒池在並行程式設計中十分重要。

已同步到看一看
贊(0)

分享創造快樂