歡迎光臨
每天分享高質量文章

C#並行編程(6):執行緒同步面面觀

理解執行緒同步

執行緒的資料訪問

在並行(多執行緒)環境中,不可避免地會存在多個執行緒同時訪問某個資料的情況。多個執行緒對共享資料的訪問有下麵3種情形:

  1. 多個執行緒同時讀取資料;
  2. 單個執行緒更新資料,此時其他執行緒讀取資料;
  3. 多個執行緒同時更新資料。

顯而易見,多個執行緒同時讀取資料是不會產生任何問題的。僅有一個執行緒更新資料的時候,貌似也沒有問題,但真的沒有問題嗎?多個執行緒同時更新資料,很明顯,你可能把我的更改改寫掉了,資料從此不再可信。

什麼是執行緒同步

為瞭解決多執行緒同時訪問共享資料可能導致資料被破壞的問題,我們需要採取一些措施來保證資料的一致性,讓每個執行緒都能準確地讀取或更新資料。

問題的根源在於多個執行緒同時訪問資料,那麼只要我們保證同一時間只有一個執行緒訪問資料,就能解決問題。保證同一時間只有一個執行緒訪問資料的處理,就是執行緒同步了。我在訪問資料的時候,你們都先等著,我完事了你們再來。

C#中的執行緒同步

.NET提供了很多執行緒同步的方式,這些方式分為用戶樣式和內核樣式以及混合樣式(即用戶樣式與內核樣式的結合),下麵會總結C#/.NET中各樣式下的執行緒同步。

用戶樣式與內核樣式

Windows操作系統下,CPU跟據所執行代碼的不同,會在兩種樣式下進行切換。CPU執行應用程式代碼(如我們開發的.NET程式)時,一般運行在用戶樣式下;執行操作系統核心代碼(內核函式或者某些設備驅動程式)時,CPU則切換到內核樣式。

用戶樣式的代碼只能訪問自身行程的專有地址空間,代碼異常不會影響到其他程式或者操作系統;內核樣式的所有代碼共享單個地址空間,代碼異常將可能導致系統崩潰。CPU的樣式切換,是為了保證應用程式和操作系統的穩定性。

應用程式中,執行緒可以通過Windows API呼叫操作系統內核函式,這時候執行執行緒的CPU將從用戶樣式切換到內核樣式,執行完操作系統函式後,再由內核樣式切換到用戶樣式。CPU的樣式切換是很耗時的,據《Windows核心編程》中的描述,CPU樣式的切換,要占用1000個以上的CPU周期。因此,在我們的.NET程式中,應該盡可能地避免CPU的樣式切換。

用戶樣式執行緒同步

用戶樣式下,利用特殊的CPU指令來協調執行緒,使同一時間只有一個執行緒能訪問某記憶體地址,這種協調在硬體中發生,速度很快。這種樣式下,CPU指令對執行緒的阻塞很短暫,操作系統調度執行緒時不會認為該執行緒已被阻塞,這種情況下,執行緒池不會創建新的執行緒來替換該執行緒。

用戶樣式下,等待資源的執行緒會一直被操作系統調度,導致執行緒的“自旋”並因此浪費很多的CPU資源。如果某執行緒一直占著資源不釋放,等待該資源的執行緒將一直處於自旋狀態,這樣就造成了“活鎖”,活鎖除了浪費記憶體外,還會浪費大量CPU。

.NET提供兩種用戶樣式的執行緒同步,volatileinterlocked,即易變和互鎖。

volatile關鍵字和Volatile

上面我們遺留了一個問題:只有一個執行緒更新資料,其他執行緒讀取資料,會不會出現問題?先看一個例子:

private static bool _stop;
public static void Run()
{
    Task.Run(() =>
    {
        int number = 1;
        while (!_stop) 
        {
            number++;
        }
        Console.WriteLine($"increase stopped,value = {number}");
    });

    Thread.Sleep(1000);
    _stop = true;
}

編譯器和CPU會對上面的代碼進行優化(除錯樣式不會優化),任務執行緒在執行時,會把_stop讀取到CPU暫存器中,while迴圈的時候,每次都從當前CPU暫存器中讀取_stop;同樣,主執行緒執行的時候CPU也會把_stop讀取到暫存器,更新_stop時,先更新是CPU暫存器中的_stop值,再把值存到變數_stop;在並行環境中,主執行緒和任務執行緒獨立執行,主執行緒對_stop的更新並不會公開到任務執行緒,這樣,任務執行緒的while迴圈便不會停止,永遠無法得到輸出。

把變數讀到暫存器只是CPU優化代碼的一種方式,CPU還可能調整代碼的執行順序,當前,CPU任務這種調整不會改變代碼的意圖。上面的代碼說明,由於編譯器和CPU的優化,只有一個執行緒更新資料,也可能存在問題

這種情況,我們可以使用volatile關鍵字或者類System.Threading.Volatile來阻止編譯器和CPU的優化,這種阻止利用的是記憶體屏障MemoryBarrier,它告訴CPU在執行完屏障之前的記憶體存取後才能執行屏障後面的記憶體存取。上面代碼的問題在於,while迴圈讀取到的值總是CPU暫存器中的false。我們把while迴圈的條件改成!Volatile.Read(ref _stop)或者把用volatile宣告變數_stop,while條件直接讀取記憶體中的值,問題就能得到解決。

Interlocked原子訪問

.NET提供的另一種用戶樣式執行緒同步方式是System.Threading.InterlockedInterlocked的工作依賴於代碼運行的CPU平臺,如果是X86的CPU,Interlocked函式會在總線上維持一個硬體信號,來阻止其他CPU訪問同一記憶體地址(《Windows核心編程第五版》)。計算機對變數的修改一般來說並不是原子性的,而是分為3個步驟:

  1. 將變數值加載到CPU暫存器
  2. 改變值
  3. 將更新後的值儲存到記憶體中

假如執行了前兩個步驟後,CPU被搶占,變數在之前執行緒中的修改將丟失。Interlocked函式保證對值的修改是原子性的,一個執行緒完成變數的修改和儲存後,另一個執行緒才能修改變數

System.Threading.Interlocked提供了很多方法,例如遞增、遞減、求和等,下麵用Interlocked的遞增方法展示其執行緒同步功能。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number1 = 0;
    int number2 = 0;

    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    IList increaseTasks = new List();

    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            Interlocked.Increment(ref number1);
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            Interlocked.Increment(ref number1);
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            number2++;
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            number2++;
        }
    }));

    Task.WaitAll(increaseTasks.ToArray());

    Console.WriteLine($"use interlocked: number1 result = {number1}");
    Console.WriteLine($"normal increase: number2 result = {number2}");
}

運行上面的代碼多次(每個執行緒增加的數量儘量大,否則不容易體現結果),每次number1的結果都一樣,number2的結果都不同,足以體現Interlocked的執行緒同步功能。

SpinLock自旋鎖

System.Threading.SpinLock是基於InterLocked和SpinWait實現的輕量級自旋鎖,具體的實現方式這裡不去關心。SpinLock的簡單用法如下:

private static SpinLock _spinlock = new SpinLock();
public static void DoWork()
{
    bool lockTaken = false;
    try
    {
        _spinlock.Enter(ref lockTaken);
        
    }
    finally
    {
        if (lockTaken)
        {
            _spinlock.Exit(false);
        }
    }
}

SpinLock很輕量級,性能較高,但由於是自旋鎖,鎖定的操作應該是很快完成,否則會因執行緒自旋而浪費CPU。

內核樣式執行緒同步

除了用戶樣式的兩種執行緒同步方式,我們還會利用Windows系統的內核物件實現執行緒的同步。使用系統內核物件將會導致執行執行緒的CPU運行樣式的切換,這會有很大的消耗,所以能夠使用用戶樣式的執行緒同步就儘量避免使用內核樣式。

內核樣式下,執行緒在等待資源時會被系統阻塞,避免了CPU的浪費,這是內核樣式優勢。假如執行緒等待的資源一直被占用則執行緒將一直處於阻塞狀態,造成“死鎖”。相對於活鎖,死鎖只會浪費記憶體資源。

我們使用系統內核中的事件、信號量和互斥量進行內核樣式的執行緒同步。

利用內核事件實現執行緒同步

事件實際上是由系統內核維護的一個布林值。

.NET提供System.Threading.EventWaitHandle進行執行緒的信號交互。EventWaitHandle繼承WaitHandle(封裝等待對共享資源獨占訪問的操作系統特定的物件),有三個關鍵方法:

  • Set():將事件狀態設置為終止狀態,允許一個或多個等待執行緒繼續。
  • Reset():將事件狀態設置為非終止狀態,導致執行緒阻塞
  • WaitOne():阻塞執行緒直到收到事件狀態信號

執行緒交互事件有自動重置和手動重置兩種型別,分別由AutoResetEventManualResetEvent繼承EventWaitHandle得到。自動重置事件在Set喚醒第一個阻塞執行緒之後,會自動Reset事件,其他阻塞執行緒仍保持阻塞狀態;而手動重置事件Set時,會喚醒所有被該事件阻塞的執行緒,手動Reset後,事件才會繼續起作用。手動重置事件的這種性質,導致它不能用於執行緒同步,因為不能保證同一時間只有一個執行緒訪問資源;相反,自動重置時間則很適合用來處理執行緒同步。

下麵的例子演示了利用自動重置時間進行的執行緒同步。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number = 0;
    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    AutoResetEvent are = new AutoResetEvent(true);

    IList increaseTasks = new List();
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            are.WaitOne();
            number++;
            are.Set();
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            are.WaitOne();
            number++;
            are.Set();
        }
    }));

    Task.WaitAll(increaseTasks.ToArray());
    are.Dispose();
    Console.WriteLine($"use AutoResetEvent: result = {number}");
}

利用信號量進行執行緒同步

信號量是系統內核維護的一個整型變數。

信號量值為0時,所有等待信號量的執行緒會被阻塞;信號量值大於零0,等待的執行緒會被解除阻塞,每喚醒一個阻塞的執行緒,系統內核就會把信號量的值減1。此外,我們能夠對信號量進行最大值限制,從而控制訪問同一資源的最大執行緒數量。

.Net中,利用System.Threading.Semaphore進行信號量操作。下麵時利用信號量實現執行緒同步的一個例子。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number = 0;
    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    Semaphore semaphore = new Semaphore(1,1);

    IList increaseTasks = new List();
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            semaphore.WaitOne();
            number++;
            semaphore.Release(1);

        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            semaphore.WaitOne();
            number++;
            semaphore.Release(1);

        }
    }));

    Task.WaitAll(increaseTasks.ToArray());
    semaphore.Dispose();
    Console.WriteLine($"use Semaphore: result = {number}");
}

利用互斥體行程執行緒同步

互斥體Mutex的使用與自動重置事件和信號量類似,這裡不再進行詳細的總結。

互斥體常被用來保證應用程式只有一個實體運行,具體用法如下:

bool createNew;
using (new Mutex(true, Assembly.GetExecutingAssembly().FullName, out createNew))
{
    if (!createNew)
    {
        
        Environment.Exit(0);
    }
    else
    {
        
    }
}

執行緒同步的混合樣式

通過上面的總結我們知道,用戶樣式和內核樣式由各自的優缺點,需要有一種樣式既能兼顧用戶和內核樣式的優點又能避免他們的缺點,這就是混合樣式。

混合樣式會優先使用用戶樣式的執行緒同步處理,當多個執行緒競爭同步鎖的時候,才會使用內核物件進行處理。如果多個執行緒一直不產生資源競爭,就不會發生CPU用戶樣式到內核樣式的轉換,開始資源競爭時,又會通過執行緒阻塞來防止CPU資源的浪費。

.NET中提供了多種混合樣式的執行緒同步方式。例如手工重置事件和信號量的簡化版本ManualResetEventSlimSemaphoreSlim,他們是執行緒在用戶樣式中自旋,直到發生資源競爭。具體使用與各自的內核樣式一樣,這裡不再贅述。

lock關鍵字和Monitor

相信lock加鎖是很多人做常用的執行緒同步方式。lock的使用很簡單,如下:

private static readonly object _syncObject = new object();
public static void DoWork()
{
    lock (_syncObject)
    {
        
    }
}

實際上,lock語法是對System.Threading.Monitor使用的一種簡化,Monitor的用法如下:

private static readonly object _syncObject = new object();
public static void DoWork()
{
    Monitor.Enter(_syncObject);
    
    Monitor.Exit(_syncObject);
}

使用Monitor的可能會出先一些意象不到的問題。例如,如果不相關的業務代碼在使用Monitor進行執行緒同步的時候,鎖定了同一字串,將會造成不相關業務代碼的同步執行;此外需要註意的是,Monitor不能使用值型別作為鎖物件,值型別會被裝箱,裝箱後的物件不同,將導致無法同步。

讀寫鎖ReaderWriterLockSlim

ReaderWriterLockSlim可以用來實現多執行緒讀取或獨占寫入的資源訪問。讀寫鎖的執行緒控制邏輯如下:

  • 一個執行緒寫資料時,其他請求資源的執行緒全部被阻塞;
  • 一個執行緒讀資料時,寫執行緒被阻塞,其他讀執行緒能繼續運行;
  • 寫結束時,解除其他某個寫執行緒的阻塞,或者解除所有讀執行緒的阻塞;
  • 讀結束時,解除一個寫執行緒的阻塞。

下麵是讀寫鎖的簡單用法,詳細用法可參考msdn文件。

private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
public static void DoWork()
{
_rwlock.EnterWriteLock();

_rwlock.ExitWriteLock();
}

ReaderWriterLockSlim還有一個比較老的版本ReaderWriterLock,據說存在較多問題應儘量避免使用。

執行緒安全集合

.NET除了提供包含上面總結到的各種執行緒同步的諸多方式外,還封裝了一些執行緒安全集合。這些集合在內部實現了執行緒同步,我們直接使用即可,很友好。執行緒安全集合在命名空間System.Collections.Concurrent下,包括ConcurrentQueue (T),ConcurrentStack,ConcurrentDictionary,ConcurrentBag,BlockingCollection,具體可閱讀《何時使用執行緒安全集合》。

各種執行緒同步性能對比

下麵我們對整數零進行多執行緒遞增操作,每個執行緒固定遞增量,來測試以下各種同步方式的性能對比。測試代碼如下。




private static int _numberToIncrease;

public static void Run()
{
    int increment = 100000;
    int threadCount = 4;
    DoIncrease(increment, threadCount, DoIncreaseByInterLocked);
    DoIncrease(increment, threadCount, DoIncreaseWithSpinLock);
    DoIncrease(increment, threadCount, DoIncreaseWithEvent);
    DoIncrease(increment, threadCount, DoIncreaseWithSemaphore);
    DoIncrease(increment, threadCount, DoIncreaseWithMonitor);
    DoIncrease(increment, threadCount, DoIncreaseWithReaderWriterLockSlim);

}

public static void DoIncrease(int increment, int threadCount, Action<int> action)
{
_numberToIncrease = 0;
IList increaseTasks = new List(threadCount);
Stopwatch watch = Stopwatch.StartNew();
for (int i = 0; i < threadCount; i++)
{
increaseTasks.Add(Task.Run(() => action(increment)));
}
Task.WaitAll(increaseTasks.ToArray());
Console.WriteLine($"{action.Method.Name}=> Result: {_numberToIncrease} , Time: {watch.ElapsedMilliseconds} ms.");
}

#region 使用Interlocked,用戶樣式

public static void DoIncreaseByInterLocked(int increment)
{
for (int i = 0; i < increment; i++)
{
Interlocked.Increment(ref _numberToIncrease);
}
}

#endregion

#region 使用SpinLock,用戶樣式

private static SpinLock _spinlock = new SpinLock();
public static void DoIncreaseWithSpinLock(int increment)
{
for (int i = 0; i < increment; i++)
{
bool lockTaken = false;
try
{
_spinlock.Enter(ref lockTaken);
_numberToIncrease++;
}
finally
{
if (lockTaken)
{
_spinlock.Exit(false);
}
}
}
}

#endregion

#region 使用信號量Semaphore,內核樣式

private static readonly Semaphore _semaphore = new Semaphore(1, 10);

public static void DoIncreaseWithSemaphore(int increment)
{
for (int i = 0; i < increment; i++)
{
_semaphore.WaitOne();
_numberToIncrease++;
_semaphore.Release(1);
}
}

#endregion

#region 使用事件AutoResetEvent,內核樣式

private static readonly AutoResetEvent _are = new AutoResetEvent(true);
public static void DoIncreaseWithEvent(int increment)
{
for (int i = 0; i < increment; i++)
{
_are.WaitOne();
_numberToIncrease++;
_are.Set();
}
}

#endregion

#region 使用Monitor,混合樣式

private static readonly object _monitorLocker = new object();
public static void DoIncreaseWithMonitor(int increment)
{
for (int i = 0; i < increment; i++)
{
bool lockTaken = false;
try
{
Monitor.Enter(_monitorLocker, ref lockTaken);
_numberToIncrease++;
}
finally
{
if (lockTaken)
{
Monitor.Exit(_monitorLocker);
}
}
}
}

#endregion

#region 使用ReaderWriterLockSlim,混合樣式

private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
public static void DoIncreaseWithReaderWriterLockSlim(int increment)
{
for (int i = 0; i < increment; i++)
{
_rwlock.EnterWriteLock();
_numberToIncrease++;
_rwlock.ExitWriteLock();
}
}

#endregion

下麵是一組測試結果,可以很明顯地看出,內核樣式是相當耗時的,應儘量避免使用。而用戶樣式和混合樣式,也需要根據具體的場景進行選擇。這個測試過於簡單,不具有普遍性。

DoIncreaseByInterLocked=> Result: 400000 , Time: 15 ms.
DoIncreaseWithSpinLock=> Result: 400000 , Time: 75 ms.
DoIncreaseWithEvent=> Result: 400000 , Time: 1892 ms.
DoIncreaseWithSemaphore=> Result: 400000 , Time: 1779 ms.
DoIncreaseWithMonitor=> Result: 400000 , Time: 14 ms.
DoIncreaseWithReaderWriterLockSlim=> Result: 400000 , Time: 22 ms.

小結

本文對C#/.NET中的執行緒同步進行了儘量詳盡的總結,並行環境中在追求程式的高性能、響應性的同時,務必要保證資料的安全性。

C#並行編程系列的文章暫時就告一段落了。剛開始寫博客,文章肯定存在不少問題,歡迎各位博友指出。

已同步到看一看
赞(0)

分享創造快樂