歡迎光臨
每天分享高質量文章

一次 .NET Core 中玩鎖的經歷:ManualResetEventSlim,  SemaphoreSlim

最近同事對  .net core memcached 快取客戶端 EnyimMemcachedCore 進行了高併發下的壓力測試,發現在 linux 上高併發下使用 async 異步方法讀取快取資料會出現大量失敗的情況,比如在一次測試中,100萬次讀取快取,只有12次成功,999988次失敗,好恐怖。如果改為同步方法,沒有一次失敗,100%成功。奇怪的是,同樣的壓力測試程式在 Windows 上異步讀取卻沒問題,100%成功。

排查後發現是2個地方使用的鎖引起的,一個是 ManualResetEventSlim ,一個是 Semaphore ,這2個鎖是在同步方法中使用的,但 aync 異步方法中呼叫了這2個同步方法,我們來分別看一下。

使用 ManualResetEventSlim 是在創建 Socket 連接時用於控制連接超時

var args = new SocketAsyncEventArgs();
using (var mres = new ManualResetEventSlim())
{
    args.Completed += (s, e) => mres.Set();
    if (socket.ConnectAsync(args))
    {
        if (!mres.Wait(timeout))
        {
            throw new TimeoutException("Could not connect to " + endpoint);
        }
    }
}

使用 Semaphore 是在從 EnyimMemcachedCore 自己實現的 Socket 連接池獲取 Socket 連接時

if (!this.semaphore.WaitOne(this.queueTimeout))
{
    message = "Pool is full, timeouting. " + _endPoint;
    if (_isDebugEnabled) _logger.LogDebug(message);
    result.Fail(message, new TimeoutException());

    // everyone is so busy
    return result;
}

為了棄用這個2個鎖造成的異步併發問題,採取了下麵2個改進措施:

1)對於 ManualResetEventSlim ,參考 corefx 中 SqlClient 的 SNITcpHandle 的實現,改用 CancellationTokenSource 控制連接超時

var cts = new CancellationTokenSource();
cts.CancelAfter(timeout);
void Cancel()
{
    if (!socket.Connected)
    {
        socket.Dispose();
    }
}
cts.Token.Register(Cancel);

socket.Connect(endpoint);
if (socket.Connected)
{
    connected = true;
}
else
{
    socket.Dispose();
}

2)對於 Semaphore ,根據同事提交的 PR ,將 Semaphore 換成 SemaphoreSlim ,用 SemaphoreSlim.WaitAsync 方法等待信號量鎖

if (!await this.semaphore.WaitAsync(this.queueTimeout))
{
    message = "Pool is full, timeouting. " + _endPoint;
    if (_isDebugEnabled) _logger.LogDebug(message);
    result.Fail(message, new TimeoutException());

    // everyone is so busy
    return result;
}

改進後,壓力測試結果立馬與同步方法一樣,100% 成功!

為什麼會這樣?

我們到 github 的 coreclr 倉庫(針對 .net core 2.2)中看看 ManualResetEventSlim 與 Semaphore 的實現原始碼,看能否找到一些線索。

(一)

先看看 ManualResetEventSlim.Wait 方法的實現代碼(523開始):

1)先 SpinWait 等待

var spinner = new SpinWait();
while (spinner.Count < spinCount)
{
    spinner.SpinOnce(sleep1Threshold: -1);

    if (IsSet)
    {
        return true;
    }
}

SpinWait 等待時間比較短,不會造成長時間阻塞執行緒。

在高併發下大量執行緒在爭搶鎖,所以大量執行緒在這個階段等不到鎖。

2)然後 Monitor.Wait 等待

try
{
    // ** the actual wait **
    if (!Monitor.Wait(m_lock, realMillisecondsTimeout))
        return false; //return immediately if the timeout has expired.
}
finally
{
    // Clean up: we're done waiting.
    Waiters = Waiters - 1;
}

Monitor.Wait 對應的實現代碼

[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj);

public static bool Wait(object obj, int millisecondsTimeout, bool exitContext)
{
    if (obj == null)
        throw (new ArgumentNullException(nameof(obj)));
    return ObjWait(exitContext, millisecondsTimeout, obj);
}

最終呼叫的是一個本地庫的 ObjWait 方法。

查閱一下 Monitor.Wait 方法的說明檔案:

Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.

Monitor.Wait 的確會阻塞當前執行緒,這在異步高併發下會帶來問題,詳見一碼阻塞,萬碼等待:ASP.NET Core 同步方法呼叫異步方法“死鎖”的真相。

(二)

再看看 Semaphore 的實現代碼,它繼承自 WaitHandle , Semaphore.Wait 實際呼叫的是 WaitHandle.Wait ,後者呼叫的是 WaitOneNative ,這是一個本地庫的方法

[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);

.net core 3.0 中有些變化,這裡呼叫的是 WaitOneCore 方法

[MethodImpl(MethodImplOptions.InternalCall)]
private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);

查閱一下 WaitHandle.Wait 方法的說明檔案:

Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.

WaitHandle.Wait 也會阻塞當前執行緒。

2個地方在等待鎖時都會阻塞執行緒,難怪高併發下會出問題。

(三)

接著閱讀 SemaphoreSlim 的原始碼學習它是如何在 WaitAsync 中實現異步等待鎖的?

public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken)
{
    //...

    lock (m_lockObj!)
    {
        // If there are counts available, allow this waiter to succeed.
        if (m_currentCount > 0)
        {
            --m_currentCount;
            if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();
            return s_trueTask;
        }
        else if (millisecondsTimeout == 0)
        {
            // No counts, if timeout is zero fail fast
            return s_falseTask;
        }
        // If there aren't, create and return a task to the caller.
        // The task will be completed either when they've successfully acquired
        // the semaphore or when the timeout expired or cancellation was requested.
        else
        {
            Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative");
            var asyncWaiter = CreateAndAddAsyncWaiter();
            return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?
                asyncWaiter :
                WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);
        }
    }
}

重點看 else 部分的代碼,SemaphoreSlim.WaitAsync 造了一個專門用於等待鎖的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用於創建 TaskNode 的實體

private TaskNode CreateAndAddAsyncWaiter()
{
    // Create the task
    var task = new TaskNode();

    // Add it to the linked list
    if (m_asyncHead == null)
    {
        m_asyncHead = task;
        m_asyncTail = task;
    }
    else
    {
        m_asyncTail.Next = task;
        task.Prev = m_asyncTail;
        m_asyncTail = task;
    }

    // Hand it back
    return task;
}

從上面的代碼看到 TaskNode 用到了鏈表,神奇的等鎖專用 Task —— TaskNode 是如何實現的呢?

private sealed class TaskNode : Task<bool>
{
    internal TaskNode? Prev, Next;
    internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { }
}

好簡單!

那 SemaphoreSlim.WaitAsync 如何用 TaskNode 實現指定了超時時間的鎖等待?

看 WaitUntilCountOrTimeoutAsync 方法的實現原始碼:

private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken)
{
    // Wait until either the task is completed, timeout occurs, or cancellation is requested.
    // We need to ensure that the Task.Delay task is appropriately cleaned up if the await
    // completes due to the asyncWaiter completing, so we use our own token that we can explicitly
    // cancel, and we chain the caller's supplied token into it.
    using (var cts = cancellationToken.CanBeCanceled ?
        CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :
        new CancellationTokenSource())
    {
        var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));
        if (asyncWaiter == await waitCompleted.ConfigureAwait(false))
        {
            cts.Cancel(); // ensure that the Task.Delay task is cleaned up
            return true; // successfully acquired
        }
    }

    // If we get here, the wait has timed out or been canceled.

    // If the await completed synchronously, we still hold the lock.  If it didn't,
    // we no longer hold the lock.  As such, acquire it.
    lock (m_lockObj)
    {
        // Remove the task from the list.  If we're successful in doing so,
        // we know that no one else has tried to complete this waiter yet,
        // so we can safely cancel or timeout.
        if (RemoveAsyncWaiter(asyncWaiter))
        {
            cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred
            return false; // timeout occurred
        }
    }

    // The waiter had already been removed, which means it's already completed or is about to
    // complete, so let it, and don't return until it does.
    return await asyncWaiter.ConfigureAwait(false);
}

用 Task.WhenAny 等待 TaskNode 與 Task.Delay ,等其中任一者先完成,簡單到可怕。

又一次通過 .net core 原始碼欣賞了高手是怎麼玩轉 Task 的。

【2019-5-6更新】

今天將 Task.WhenAny + Task.Delay 的招式用到了異步連接 Socket 的超時控制中

var connTask = _socket.ConnectAsync(_endpoint);
if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask)
{
    await connTask;
}
已同步到看一看
赞(0)

分享創造快樂