歡迎光臨
每天分享高質量文章

C#並行編程(3):並行迴圈

初識並行迴圈

並行迴圈主要用來處理資料並行的,如,同時對陣列或串列中的多個資料執行相同的操作。

在C#編程中,我們使用並行類System.Threading.Tasks.Parallel提供的靜態方法Parallel.ForParallel.ForEach來實現並行迴圈。從方法名可以看出,這兩個方法是對常規迴圈forforeach的並行化。

簡單用法

使用並行迴圈時需要傳入迴圈範圍(集合)和運算元據的委托Action

Parallel.For(0, 100, i => { Console.WriteLine(i); });

Parallel.ForEach(Enumerable.Range(0, 100), i => { Console.WriteLine(i); });

使用場景

對於資料的處理需要耗費較長時間的迴圈適宜使用並行迴圈,利用多執行緒加快執行速度。

對於簡單的迭代操作,且迭代範圍較小,使用常規迴圈更好好,因為並行迴圈涉及到執行緒的創建、背景關係切換和銷毀,使用並行迴圈反而影響執行效率。

對於迭代操作簡單但迭代範圍很大的情況,我們可以對資料進行分割槽,再執行並行迴圈,減少執行緒數量。

迴圈結果

Parallel.ForParallel.ForEach方法的所有多載有著同樣的傳回值型別ParallelLoopResult,並行迴圈結果包含迴圈是否完成以及最低迭代次數兩項信息。

下麵的例子使用Parallel.ForEach展示了並行迴圈的結果。

ParallelLoopResult result = Parallel.ForEach(Enumerable.Range(0, 100), (i,loop) =>
{
    Console.WriteLine(i + 1);
    Thread.Sleep(100);
    if (i == 30)
    {
        loop.Break();
        
    }
});
Console.WriteLine($"{result.IsCompleted}-{result.LowestBreakIteration}");

值得一提的是,迴圈的Break()Stop()只能儘早地跳出或者停止迴圈,而不能立即停止。

取消迴圈操作

有時候,我們需要在中途取消迴圈操作,但又不知道確切條件是什麼,比如用戶觸發的取消。這時候,可以利用迴圈的ParallelOptions傳入一個CancellationToken,同時使用異常處理捕獲OperationCanceledException以進行取消後的處理。下麵是一個簡單的例子。




public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();




public static void CancelParallelLoop()
{
    Task.Factory.StartNew(() =>
    {
        try
        {
            Parallel.ForEach(Enumerable.Range(0, 100), new ParallelOptions { CancellationToken = CTSource.Token },
                i =>
                {
                    Console.WriteLine(i + 1);
                    Thread.Sleep(1000);
                });
        }
        catch (OperationCanceledException oce)
        {
            Console.WriteLine(oce.Message);
        }
    });
}
static void Main(string[] args)
{
    ParallelDemo.CancelParallelLoop();
    Thread.Sleep(3000);
    ParallelDemo.CTSource.Cancel();

    Console.ReadKey();
}

迴圈異常收集

並行迴圈執行過程中,可以捕獲並收集迭代操作引發的異常,迴圈結束時丟擲一個AggregateException異常,並將收集到的異常賦給它的內部異常集合InnerExceptions。外部使用時,捕獲AggregateException,即可進行並行迴圈的異常處理。

下麵的例子模擬了並行迴圈的異常丟擲、收集及處理的過程。




public static void CaptureTheLoopExceptions()
{
    ConcurrentQueue exceptions = new ConcurrentQueue();
    Parallel.ForEach(Enumerable.Range(0, 100), i =>
    {
        try
        {
            if (i % 10 == 0)
            {
                throw new Exception($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] had thrown a exception. [{i}]");
            }
            Console.WriteLine(i + 1);
            Thread.Sleep(100);
        }
        catch (Exception ex)
        {
            exceptions.Enqueue(ex);
        }
    });

    if (!exceptions.IsEmpty)
    {
        throw new AggregateException(exceptions);
    }
}

外部處理方式

try
{
    ParallelDemo.CaptureTheLoopExceptions();
}
catch (AggregateException aex)
{
    foreach (Exception ex in aex.InnerExceptions)
    {
        Console.WriteLine(ex.Message);
    }
}

分割槽並行處理

當迴圈操作很簡單,迭代範圍很大的時候,ParallelLoop提供一種分割槽的方式來優化迴圈性能。下麵的例子展示了分割槽迴圈的使用,同時也能比較幾種迴圈方式的執行效率。






public static void PartationParallelLoop(int rangeSize = 10000, int opDuration = 1)
{
    
    Stopwatch watch0 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize), EnumerablePartitionerOptions.None),
        i =>
        {
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch0.Stop();

    
    Stopwatch watch1 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize),EnumerablePartitionerOptions.NoBuffering),
        i =>
        {
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch1.Stop();

    
    Stopwatch watch2 = Stopwatch.StartNew();
    Parallel.ForEach(Enumerable.Range(0, rangeSize),
        i =>
        {
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch2.Stop();

    
    Stopwatch watch3 = Stopwatch.StartNew();
    foreach (int i in Enumerable.Range(0, rangeSize))
    {
        Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
        Thread.Sleep(opDuration);
    }
    watch2.Stop();
            
    Console.WriteLine();
    Console.WriteLine($"PartationParallelLoopWithBuffer    => {watch0.ElapsedMilliseconds}ms");
    Console.WriteLine($"PartationParallelLoopWithoutBuffer => {watch1.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalParallelLoop                 => {watch2.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalLoop                         => {watch3.ElapsedMilliseconds}ms");
}

在 I7-7700HQ + 16GB 配置 VS除錯樣式下得到下麵一組測試結果。

10000,1 10527 11799 11155 19434
10000,1 9513 11442 11048 19354
10000,1 9871 11391 14782 19154
100,1000 9107 5951 5081 100363
100,1000 9086 5974 5187 100162
100,1000 9208 5125 5255 100239
100,1 350 439 243 200
100,1 390 227 166 198
100,1 466 225 84 197

應該根據不同的應用場景選擇合適的迴圈策略,具體如何選擇,朋友們可自行體會~

原文地址:https://www.cnblogs.com/chenbaoshun/p/10572639.html

已同步到看一看
赞(0)

分享創造快樂