歡迎光臨
每天分享高質量文章

基於redis分佈式鎖實現“秒殺”(含代碼)

最近在專案中遇到了類似“秒殺”的業務場景,在本篇博客中,我將用一個非常簡單的demo,闡述實現所謂“秒殺”的基本思路。

業務場景

所謂秒殺,從業務角度看,是短時間內多個用戶“爭搶”資源,這裡的資源在大部分秒殺場景里是商品;將業務抽象,技術角度看,秒殺就是多個執行緒對資源進行操作,所以實現秒殺,就必須控制執行緒對資源的爭搶,既要保證高效併發,也要保證操作的正確。

一些可能的實現

剛纔提到過,實現秒殺的關鍵點是控制執行緒對資源的爭搶,根據基本的執行緒知識,可以不加思索的想到下麵的一些方法:

1、秒殺在技術層面的抽象應該就是一個方法,在這個方法里可能的操作是將商品庫存-1,將商品加入用戶的購物車等等,在不考慮快取的情況下應該是要運算元據庫的。那麼最簡單直接的實現就是在這個方法上加上 synchronized關鍵字,通俗的講就是鎖住整個方法;

2、鎖住整個方法這個策略簡單方便,但是似乎有點粗暴。可以稍微優化一下,只鎖住秒殺的代碼塊,比如寫資料庫的部分;

3、既然有併發問題,那我就讓他“不併發”,將所有的執行緒用一個佇列管理起來,使之變成串行操作,自然不會有併發問題。

上面所述的方法都是有效的,但是都不好。為什麼?第一和第二種方法本質上是“加鎖”,但是鎖粒度依然比較高。什麼意思?試想一下,如果兩個執行緒同時執行秒殺方法,這兩個執行緒操作的是不同的商品,從業務上講應該是可以同時進行的,但是如果採用第一二種方法,這兩個執行緒也會去爭搶同一個鎖,這其實是不必要的。第三種方法也沒有解決上面說的問題。

那麼如何將鎖控制在更細的粒度上呢?可以考慮為每個商品設置一個互斥鎖,以和商品ID相關的字串為唯一標識,這樣就可以做到只有爭搶同一件商品的執行緒互斥,不會導致所有的執行緒互斥。分佈式鎖恰好可以幫助我們解決這個問題。

何為分佈式鎖

分佈式鎖是控制分佈式系統之間同步訪問共享資源的一種方式。在分佈式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那麼訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分佈式鎖。

我們來假設一個最簡單的秒殺場景:資料庫里有一張表,column分別是商品ID,和商品ID對應的庫存量,秒殺成功就將此商品庫存量-1。現在假設有1000個執行緒來秒殺兩件商品,500個執行緒秒殺第一個商品,500個執行緒秒殺第二個商品。我們來根據這個簡單的業務場景來解釋一下分佈式鎖。

通常具有秒殺場景的業務系統都比較複雜,承載的業務量非常巨大,併發量也很高。這樣的系統往往採用分佈式的架構來均衡負載。那麼這1000個併發就會是從不同的地方過來,商品庫存就是共享的資源,也是這1000個併發爭搶的資源,這個時候我們需要將併發互斥管理起來。這就是分佈式鎖的應用。

而key-value儲存系統,如redis,因為其一些特性,是實現分佈式鎖的重要工具。

具體的實現

先來看看一些redis的基本命令:

  1. SETNX key value

如果key不存在,就設置key對應字串value。在這種情況下,該命令和SET一樣。當key已經存在時,就不做任何操作。SETNX是”SET if Not eXists”。

  1. expire KEY seconds

設置key的過期時間。如果key已過期,將會被自動刪除。

  1. del KEY

刪除key

由於筆者的實現只用到這三個命令,就只介紹這三個命令,更多的命令以及redis的特性和使用,可以參考redis官網。

需要考慮的問題

1、用什麼操作redis?幸虧redis已經提供了jedis客戶端用於java應用程式,直接呼叫jedis API即可。

2、怎麼實現加鎖?“鎖”其實是一個抽象的概念,將這個抽象概念變為具體的東西,就是一個儲存在redis里的key-value對,key是於商品ID相關的字串來唯一標識,value其實並不重要,因為只要這個唯一的key-value存在,就表示這個商品已經上鎖。

3、如何釋放鎖?既然key-value對存在就表示上鎖,那麼釋放鎖就自然是在redis里刪除key-value對。

4、阻塞還是非阻塞?筆者採用了阻塞式的實現,若執行緒發現已經上鎖,會在特定時間內輪詢鎖。

5、如何處理異常情況?比如一個執行緒把一個商品上了鎖,但是由於各種原因,沒有完成操作(在上面的業務場景里就是沒有將庫存-1寫入資料庫),自然沒有釋放鎖,這個情況筆者加入了鎖超時機制,利用redis的expire命令為key設置超時時長,過了超時時間redis就會將這個key自動刪除,即強制釋放鎖(可以認為超時釋放鎖是一個異步操作,由redis完成,應用程式只需要根據系統特點設置超時時間即可)。

talk is cheap,show me the code

在代碼實現層面,註解有併發的方法和引數,通過動態代理獲取註解的方法和引數,在代理中加鎖,執行完被代理的方法後釋放鎖。

幾個註解定義:

cachelock是方法級的註解,用於註解會產生併發問題的方法:

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface CacheLock {
  5.    String lockedPrefix() default "";//redis 鎖key的前綴
  6.    long timeOut() default 2000;//輪詢鎖的時間
  7.    int expireTime() default 1000;//key在redis里存在的時間,1000S
  8. }

lockedObject是引數級的註解,用於註解商品ID等基本型別的引數:

  1. @Target(ElementType.PARAMETER)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface LockedObject {
  5.    //不需要值
  6. }

LockedComplexObject也是引數級的註解,用於註解自定義型別的引數:

  1. @Target(ElementType.PARAMETER)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface LockedComplexObject {
  5.    String field() default "";//含有成員變數的複雜物件中需要加鎖的成員變數,如一個商品物件的商品ID
  6.  
  7. }

CacheLockInterceptor實現 InvocationHandler接口,在invoke方法中獲取註解的方法和引數,在執行註解的方法前加鎖,執行被註解的方法後釋放鎖:

  1. public class CacheLockInterceptor implements InvocationHandler{
  2.    public static int ERROR_COUNT  = 0;
  3.    private Object proxied;
  4.  
  5.    public CacheLockInterceptor(Object proxied) {
  6.        this.proxied = proxied;
  7.    }
  8.  
  9.    @Override
  10.    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  11.  
  12.        CacheLock cacheLock = method.getAnnotation(CacheLock.class);
  13.        //沒有cacheLock註解,pass
  14.        if(null == cacheLock){
  15.            System.out.println("no cacheLock annotation");          
  16.            return method.invoke(proxied, args);
  17.        }
  18.        //獲得方法中引數的註解
  19.        Annotation[][] annotations = method.getParameterAnnotations();
  20.        //根據獲取到的引數註解和引數串列獲得加鎖的引數
  21.        Object lockedObject = getLockedObject(annotations,args);
  22.        String objectValue = lockedObject.toString();
  23.        //新建一個鎖
  24.        RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
  25.        //加鎖
  26.        boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
  27.        if(!result){//取鎖失敗
  28.            ERROR_COUNT += 1;
  29.            throw new CacheLockException("get lock fail");
  30.  
  31.        }
  32.        try{
  33.            //加鎖成功,執行方法
  34.            return method.invoke(proxied, args);
  35.        }finally{
  36.            lock.unlock();//釋放鎖
  37.        }
  38.  
  39.    }
  40.    /**
  41.     * 
  42.     * @param annotations
  43.     * @param args
  44.     * @return
  45.     * @throws CacheLockException
  46.     */
  47.    private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
  48.        if(null == args || args.length == 0){
  49.            throw new CacheLockException("方法引數為空,沒有被鎖定的物件");
  50.        }
  51.  
  52.        if(null == annotations || annotations.length == 0){
  53.            throw new CacheLockException("沒有被註解的引數");
  54.        }
  55.        //不支持多個引數加鎖,只支持第一個註解為lockedObject或者lockedComplexObject的引數
  56.        int index = -1;//標記引數的位置指標
  57.        for(int i = 0;i < annotations.length;i++){
  58.            for(int j = 0;j < annotations[i].length;j++){
  59.                if(annotations[i][j] instanceof LockedComplexObject){//註解為LockedComplexObject
  60.                    index = i;
  61.                    try {
  62.                        return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
  63.                    } catch (NoSuchFieldException | SecurityException e) {
  64.                        throw new CacheLockException("註解物件中沒有該屬性" + ((LockedComplexObject)annotations[i][j]).field());
  65.                    }
  66.                }
  67.  
  68.                if(annotations[i][j] instanceof LockedObject){
  69.                    index = i;
  70.                    break;
  71.                }
  72.            }
  73.            //找到第一個後直接break,不支持多引數加鎖
  74.            if(index != -1){
  75.                break;
  76.            }
  77.        }
  78.  
  79.        if(index == -1){
  80.            throw new CacheLockException("請指定被鎖定引數");
  81.        }
  82.  
  83.        return args[index];
  84.    }
  85. }

最關鍵的RedisLock類中的lock方法和unlock方法:

  1. /**
  2.     * 加鎖
  3.     * 使用方式為:
  4.     * lock();
  5.     * try{
  6.     *    executeMethod();
  7.     * }finally{
  8.     *   unlock();
  9.     * }
  10.     * @param timeout timeout的時間範圍內輪詢鎖
  11.     * @param expire 設置鎖超時時間
  12.     * @return 成功 or 失敗
  13.     */
  14.    public boolean lock(long timeout,int expire){
  15.        long nanoTime = System.nanoTime();
  16.        timeout *= MILLI_NANO_TIME;
  17.        try {
  18.            //在timeout的時間範圍內不斷輪詢鎖
  19.            while (System.nanoTime() - nanoTime < timeout) {
  20.                //鎖不存在的話,設置鎖並設置鎖過期時間,即加鎖
  21.                if (this.redisClient.setnx(this.key, LOCKED) == 1) {
  22.                    this.redisClient.expire(key, expire);//設置鎖過期時間是為了在沒有釋放
  23.                    //鎖的情況下鎖過期後消失,不會造成永久阻塞
  24.                    this.lock = true;
  25.                    return this.lock;
  26.                }
  27.                System.out.println("出現鎖等待");
  28.                //短暫休眠,避免可能的活鎖
  29.                Thread.sleep(3, RANDOM.nextInt(30));
  30.            } 
  31.        } catch (Exception e) {
  32.            throw new RuntimeException("locking error",e);
  33.        }
  34.        return false;
  35.    }
  36.  
  37.    public  void unlock() {
  38.        try {
  39.            if(this.lock){
  40.                redisClient.delKey(key);//直接刪除
  41.            }
  42.        } catch (Throwable e) {
  43.  
  44.        }
  45.    }

上述的代碼是框架性的代碼,現在來講解如何使用上面的簡單框架來寫一個秒殺函式。

先定義一個接口,接口裡定義了一個秒殺方法:

  1. public interface SeckillInterface {
  2. /**
  3. *現在暫時只支持在接口方法上註解
  4. */
  5.    //cacheLock註解可能產生併發的方法
  6.    @CacheLock(lockedPrefix="TEST_PREFIX")
  7.    public void secKill(String userID,@LockedObject Long commidityID);//最簡單的秒殺方法,引數是用戶ID和商品ID。可能有多個執行緒爭搶一個商品,所以商品ID加上LockedObject註解
  8. }

上述 SeckillInterface接口的實現類,即秒殺的具體實現:

  1. public class SecKillImpl implements SeckillInterface{
  2.    static Map<Long, Long> inventory ;
  3.    static{
  4.        inventory = new HashMap<>();
  5.        inventory.put(10000001L10000l);
  6.        inventory.put(10000002L10000l);
  7.    }
  8.  
  9.    @Override
  10.    public void secKill(String arg1, Long arg2) {
  11.        //最簡單的秒殺,這裡僅作為demo示例
  12.        reduceInventory(arg2);
  13.    }
  14.    //模擬秒殺操作,姑且認為一個秒殺就是將庫存減一,實際情景要複雜的多
  15.    public Long reduceInventory(Long commodityId){
  16.        inventory.put(commodityId,inventory.get(commodityId) - 1);
  17.        return inventory.get(commodityId);
  18.    }
  19.  
  20. }

模擬秒殺場景,1000個執行緒來爭搶兩個商品:

  1. @Test
  2.    public void testSecKill(){
  3.        int threadCount = 1000;
  4.        int splitPoint = 500;
  5.        CountDownLatch endCount = new CountDownLatch(threadCount);
  6.        CountDownLatch beginCount = new CountDownLatch(1);
  7.        SecKillImpl testClass = new SecKillImpl();
  8.  
  9.        Thread[] threads = new Thread[threadCount];
  10.        //起500個執行緒,秒殺第一個商品
  11.        for(int i= 0;i < splitPoint;i++){
  12.            threads[i] = new Thread(new  Runnable() {
  13.                public void run() {
  14.                    try {
  15.                        //等待在一個信號量上,掛起
  16.                        beginCount.await();
  17.                        //用動態代理的方式呼叫secKill方法
  18.                        SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
  19.                            new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
  20.                        proxy.secKill("test", commidityId1);
  21.                        endCount.countDown();
  22.                    } catch (InterruptedException e) {
  23.                        // TODO Auto-generated catch block
  24.                        e.printStackTrace();
  25.                    }
  26.                }
  27.            });
  28.            threads[i].start();
  29.  
  30.        }
  31.        //再起500個執行緒,秒殺第二件商品
  32.        for(int i= splitPoint;i < threadCount;i++){
  33.            threads[i] = new Thread(new  Runnable() {
  34.                public void run() {
  35.                    try {
  36.                        //等待在一個信號量上,掛起
  37.                        beginCount.await();
  38.                        //用動態代理的方式呼叫secKill方法
  39.                        SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
  40.                            new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
  41.                        proxy.secKill("test", commidityId2);
  42.                        //testClass.testFunc("test", 10000001L);
  43.                        endCount.countDown();
  44.                    } catch (InterruptedException e) {
  45.                        // TODO Auto-generated catch block
  46.                        e.printStackTrace();
  47.                    }
  48.                }
  49.            });
  50.            threads[i].start();
  51.  
  52.        }
  53.  
  54.        long startTime = System.currentTimeMillis();
  55.        //主執行緒釋放開始信號量,並等待結束信號量,這樣做保證1000個執行緒做到完全同時執行,保證測試的正確性
  56.        beginCount.countDown();
  57.  
  58.        try {
  59.            //主執行緒等待結束信號量
  60.            endCount.await();
  61.            //觀察秒殺結果是否正確
  62.            System.out.println(SecKillImpl.inventory.get(commidityId1));
  63.            System.out.println(SecKillImpl.inventory.get(commidityId2));
  64.            System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
  65.            System.out.println("total cost " + (System.currentTimeMillis() - startTime));
  66.        } catch (InterruptedException e) {
  67.            // TODO Auto-generated catch block
  68.            e.printStackTrace();
  69.        }
  70.    }

在正確的預想下,應該每個商品的庫存都減少了500,在多次試驗後,實際情況符合預想。如果不採用鎖機制,會出現庫存減少499,498的情況。

這裡採用了動態代理的方法,利用註解和反射機制得到分佈式鎖ID,進行加鎖和釋放鎖操作。當然也可以直接在方法進行這些操作,採用動態代理也是為了能夠將鎖操作代碼集中在代理中,便於維護。

通常秒殺場景發生在web專案中,可以考慮利用spring的AOP特性將鎖操作代碼置於切麵中,當然AOP本質上也是動態代理。

小結

這篇文章從業務場景出發,從抽象到實現闡述瞭如何利用redis實現分佈式鎖,完成簡單的秒殺功能,也記錄了筆者思考的過程,希望能給閱讀到本篇文章的人一些啟發。

原始碼:

https://github.com/lsfire/redisframework

赞(0)

分享創造快樂