歡迎光臨
每天分享高質量文章

RedLock 實現分佈式鎖

併發是程式開發中不可避免的問題,根據系統面向用戶、功能場景的不同,併發的重視程度會有不同。從程式的角度來說,併發意味著相同的時間點執行了相同的代碼,而有些情況是不被允許的,比如:轉賬、搶購占庫存等,如果沒有做好臨界條件的驗證,會帶來非常嚴重的後果。追根結底是因為併發引起的資料不一致問題,面對併發,我們通常會採用鎖來優化。

場景模擬

如下模擬搶購的示例代碼(C#):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 有10個商品庫存
private static int stockCount = 10;

public bool Buy()
{
	// 模擬執行的邏輯代碼花費的時間
	Thread.Sleep(new Random().Next(100,500));
	if (stockCount > 0)
	{
		stockCount--;
		return true;
	}
	return false;
}
1
2
3
4
5
6
7
8
9
10
11
var test = new Test();

Parallel.For(1, 16, (i) =>
{
	var stopwatch = new Stopwatch();
	stopwatch.Start();
	var data = test.Buy();
	stopwatch.Stop();
	Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}");
});
Console.ReadKey();

模擬並行呼叫 Buy 方法 15 次(內部使用的是執行緒池,所以 ThreadId 會有重覆),實際上只有 10 個庫存,傳回結果卻顯示 11 個請求都購買成功了。

單機部署樣式解決方案

在單機部署樣式下,我們只需要加 lock(){} 就可以解決問題:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 有10個商品庫存
private static int stockCount = 10;

private static object obj = new object();

public bool Buy()
{
	lock (obj)
	{
		// 模擬執行的邏輯代碼花費的時間
		Thread.Sleep(new Random().Next(100, 500));
		if (stockCount > 0)
		{
			stockCount--;
			return true;
		}
		return false;
	}
}

從輸出結果中可以看出,確實只有10個請求是顯示購買成功,但同時發現部分請求的執行時間明顯變長,這就是加鎖帶來的最直觀影響,當某個執行緒獲得鎖之後,在沒有釋放之前,其他執行緒只能繼續等待,併發越高,更多的執行緒需要等待輪流被處理。

各種語言一般都提供了鎖的實現,用法大同小異,語言本身實現的鎖只能作用於當前行程內,所以在單機樣式部署的系統中使用基本沒什麼問題。

集群部署樣式解決方案(分佈式鎖)

在集群樣式下,系統部署於多台機器(一個系統運行在多個行程中),語言本身實現的鎖只能確保當前行程內有效(基於記憶體),多行程就沒辦法共享鎖狀態,這時我們就得考慮採用分佈式鎖,分佈式鎖可以採用 資料庫ZooKeeperRedis 等來實現,最終都是為了達到在不同的行程、執行緒內能共享鎖狀態的目的。

這裡將介紹基於 Redis 的 RedLock.net 來解決分佈式下的併發問題,RedLock.net 是 RedLock 分佈式鎖演算法的 .NET 版實現 (大部分語言都有對應的實現,查看) ,RedLock 分佈式鎖演算法是由 Redis 的作者提出。

RedLock 簡介

RedLock 的思想是使用多台 Redis Master ,節點完全獨立,節點間不需要進行資料同步,因為 Master-Slave 架構一旦 Master 發生故障時資料沒有複製到 Slave,被選為 Master 的 Slave 就丟掉了鎖,另一個客戶端就可以再次拿到鎖。鎖通過 setNX(原子操作) 命令設置,在有效時間內當獲得鎖的數量大於 (n/2+1) 代表成功,失敗後需要向所有節點發送釋放鎖的訊息。

獲取鎖:

1
SET resource_name my_random_value NX PX 30000

釋放鎖:

1
2
3
4
5
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

RedLock.net 集成

  1. 創建 .NETCore API 專案

  2. Nuget 安裝 RedLock.net

    1
    Install-Package RedLock.net
  3. appsettings.json 添加 redis 配置

    1
    2
    3
    4
    {
      "RedisUrl": "127.0.0.1:6379", // 多個用,分割
      ...
    }
  4. 添加 ProductService.cs,模擬商品購買

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 有10個商品庫存,如果同時啟動多個API服務進行測試,這裡改成存資料庫或其他方式
    private static int stockCount = 10;
    public async Task BuyAsync()
    {
    	// 模擬執行的邏輯代碼花費的時間
    	await Task.Delay(new Random().Next(100, 500));
    	if (stockCount > 0)
    	{
    		stockCount--;
    		return true;
    	}
    	return false;
    }
  5. 修改 Startup.cs ,創建 RedLockFactory

    定義 RedLockFactory 變數:

    1
    private RedLockFactory lockFactory;

    添加方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private RedLockFactory GetRedLockFactory()
    {
    	var redisUrl = Configuration["RedisUrl"];
    	if (string.IsNullOrEmpty(redisUrl))
    	{
    		throw new ArgumentException("RedisUrl 不能為空");
    	}
    	var urls = redisUrl.Split(",").ToList();
    	var endPoints = new List();
    	foreach (var item in urls)
    	{
    		var arr = item.Split(":");
    		endPoints.Add(new DnsEndPoint(arr[0], Convert.ToInt32(arr[1])));
    	}
    	return RedLockFactory.Create(endPoints);
    }

    在 ConfigureServices 註入 IDistributedLockFactory:

    1
    2
    3
    lockFactory = GetRedLockFactory();
    services.AddSingleton(typeof(IDistributedLockFactory), lockFactory);
    services.AddScoped(typeof(ProductService));

    修改 Configure,應用程式結束時釋放 lockFactory :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
    	...
    
    	lifetime.ApplicationStopping.Register(() =>
    	{
    		lockFactory.Dispose();
    	});
    }
  6. 在 Controller 添加方法 DistributedLockTest

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    private readonly IDistributedLockFactory _distributedLockFactory;
    private readonly ProductService _productService;
    
    public HomeController(IDistributedLockFactory distributedLockFactory,
    	ProductService productService)
    {
    	_distributedLockFactory = distributedLockFactory;
    	_productService = productService;
    }
    
    [HttpGet]
    public async Task DistributedLockTest()
    {
    	var productId = "id";
    	// resource 鎖定的物件
    	// expiryTime 鎖定過期時間,鎖區域內的邏輯執行如果超過過期時間,鎖將被釋放
    	// waitTime 等待時間,相同的 resource 如果當前的鎖被其他執行緒占用,最多等待時間
    	// retryTime 等待時間內,多久嘗試獲取一次
    	using (var redLock = await _distributedLockFactory.CreateLockAsync(productId, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(20)))
    	{
    		if (redLock.IsAcquired)
    		{
    			var result = await _productService.BuyAsync();
    			return result;
    		}
    		else
    		{
    			Console.WriteLine($"獲取鎖失敗:{DateTime.Now}");
    		}
    	}
    	return false;
    }
  7. 呼叫接口測試

    1
    2
    3
    4
    5
    6
    7
    8
       Parallel.For(1, 16, (i) =>
    {
    	var stopwatch = new Stopwatch();
    	stopwatch.Start();
    	var data = GetAsync($"http://localhost:5000/home/distributedLockTest").Result;
    	stopwatch.Stop();
    	Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}");
    });

關於 RedLock 分佈式鎖演算法的爭議大家可以參考:

How to do distributed locking
Is Redlock safe?

總結

如果使用鎖,必然對性能上會有一定影響,我們需要根據實際場景來判斷是真正需要。在指定鎖過期時間時要相對合理,避免出現鎖已過期,但邏輯還沒執行完成,這樣就失去了鎖的意義,當然這種情況下我們還可以考慮重入鎖。

最後推薦一下微軟開源的一個基於 Actor 模型的分佈式框架 Orleans,也可以達到分佈式鎖的效果。

參考鏈接

  • Distributed locks with Redis
  • RedLock.net
赞(0)

分享創造快樂