歡迎光臨
每天分享高質量文章

【死磕Java併發】—–J.U.C之讀寫鎖:ReentrantReadWriteLock

此篇部落格所有原始碼均來自JDK 1.8

重入鎖ReentrantLock是排他鎖,排他鎖在同一時刻僅有一個執行緒可以進行訪問,但是在大多數場景下,大部分時間都是提供讀服務,而寫服務佔有的時間較少。然而讀服務不存在資料競爭問題,如果一個執行緒在讀時禁止其他執行緒讀勢必會導致效能降低。所以就提供了讀寫鎖。

讀寫鎖維護著一對鎖,一個讀鎖和一個寫鎖。透過分離讀鎖和寫鎖,使得併發性比一般的排他鎖有了較大的提升:在同一時間可以允許多個讀執行緒同時訪問,但是在寫執行緒訪問時,所有讀執行緒和寫執行緒都會被阻塞。

作者:大明哥
原文地址:http://cmsblogs.com/?p=2213

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【大明哥】搞基嗨皮。

讀寫鎖的主要特性:

  1. 公平性:支援公平性和非公平性。

  2. 重入性:支援重入。讀寫鎖最多支援65535個遞迴寫入鎖和65535個遞迴讀取鎖。

  3. 鎖降級:遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級成為讀鎖

讀寫鎖ReentrantReadWriteLock實現介面ReadWriteLock,該介面維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 執行緒同時保持。寫入鎖是獨佔的。

public interface ReadWriteLock {
   Lock readLock();
   Lock writeLock();
}

ReadWriteLock定義了兩個方法。readLock()傳回用於讀操作的鎖,writeLock()傳回用於寫操作的鎖。ReentrantReadWriteLock定義如下:

    /** 內部類  讀鎖 */
   private final ReentrantReadWriteLock.ReadLock readerLock;
   /** 內部類  寫鎖 */
   private final ReentrantReadWriteLock.WriteLock writerLock;

   final Sync sync;

   /** 使用預設(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */
   public ReentrantReadWriteLock() {
       this(false);
   }

   /** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */
   public ReentrantReadWriteLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
       readerLock = new ReadLock(this);
       writerLock = new WriteLock(this);
   }

   /** 傳回用於寫入操作的鎖 */
   public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
   /** 傳回用於讀取操作的鎖 */
   public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

   abstract static class Sync extends AbstractQueuedSynchronizer {
       /**
        * 省略其餘原始碼
        */

   }
   public static class WriteLock implements Lock, java.io.Serializable{
       /**
        * 省略其餘原始碼
        */

   }

   public static class ReadLock implements Lock, java.io.Serializable {
       /**
        * 省略其餘原始碼
        */

   }

ReentrantReadWriteLock與ReentrantLock一樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。所以ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一樣而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。

在ReentrantLock中使用一個int型別的state來表示同步狀態,該值表示鎖被一個執行緒重覆獲取的次數。但是讀寫鎖ReentrantReadWriteLock內部維護著兩個一對鎖,需要用一個變數維護多種狀態。所以讀寫鎖採用“按位切割使用”的方式來維護這個變數,將其切分為兩部分,高16為表示讀,低16為表示寫。分割之後,讀寫鎖是如何迅速確定讀鎖和寫鎖的狀態呢?透過為運算。假如當前同步狀態為S,那麼寫狀態等於 S & 0x0000FFFF(將高16位全部抹去),讀狀態等於S >>> 16(無符號補0右移16位)。程式碼如下:

        static final int SHARED_SHIFT   = 16;
       static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
       static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
       static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

       static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
       static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

寫鎖

寫鎖就是一個支援可重入的排他鎖。

寫鎖的獲取

寫鎖的獲取最終會呼叫tryAcquire(int arg),該方法在內部類Sync中實現:

    protected final boolean tryAcquire(int acquires) {
       Thread current = Thread.currentThread();
       //當前鎖個數
       int c = getState();
       //寫鎖
       int w = exclusiveCount(c);
       if (c != 0) {
           //c != 0 && w == 0 表示存在讀鎖
           //當前執行緒不是已經獲取寫鎖的執行緒
           if (w == 0 || current != getExclusiveOwnerThread())
               return false;
           //超出最大範圍
           if (w + exclusiveCount(acquires) > MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
           setState(c + acquires);
           return true;
       }
       //是否需要阻塞
       if (writerShouldBlock() ||
               !compareAndSetState(c, c + acquires))
           return false;
       //設定獲取鎖的執行緒為當前執行緒
       setExclusiveOwnerThread(current);
       return true;
   }

該方法和ReentrantLock的tryAcquire(int arg)大致一樣,在判斷重入時增加了一項條件:讀鎖是否存在。因為要確保寫鎖的操作對讀鎖是可見的,如果在存在讀鎖的情況下允許獲取寫鎖,那麼那些已經獲取讀鎖的其他執行緒可能就無法感知當前寫執行緒的操作。因此只有等讀鎖完全釋放後,寫鎖才能夠被當前執行緒所獲取,一旦寫鎖獲取了,所有其他讀、寫執行緒均會被阻塞。

寫鎖的釋放

獲取了寫鎖用完了則需要釋放,WriteLock提供了unlock()方法釋放寫鎖:

    public void unlock() {
       sync.release(1);
   }

   public final boolean release(int arg) {
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
   }

寫鎖的釋放最終還是會呼叫AQS的模板方法release(int arg)方法,該方法首先呼叫tryRelease(int arg)方法嘗試釋放鎖,tryRelease(int arg)方法為讀寫鎖內部類Sync中定義了,如下:

    protected final boolean tryRelease(int releases) {
       //釋放的執行緒不為鎖的持有者
       if (!isHeldExclusively())
           throw new IllegalMonitorStateException();
       int nextc = getState() - releases;
       //若寫鎖的新執行緒數為0,則將鎖的持有者設定為null
       boolean free = exclusiveCount(nextc) == 0;
       if (free)
           setExclusiveOwnerThread(null);
       setState(nextc);
       return free;
   }

寫鎖釋放鎖的整個過程和獨佔鎖ReentrantLock相似,每次釋放均是減少寫狀態,當寫狀態為0時表示 寫鎖已經完全釋放了,從而等待的其他執行緒可以繼續訪問讀寫鎖,獲取同步狀態,同時此次寫執行緒的修改對後續的執行緒可見。

讀鎖

讀鎖為一個可重入的共享鎖,它能夠被多個執行緒同時持有,在沒有其他寫執行緒訪問時,讀鎖總是或獲取成功。

讀鎖的獲取

讀鎖的獲取可以透過ReadLock的lock()方法:

        public void lock() {
           sync.acquireShared(1);
       }

Sync的acquireShared(int arg)定義在AQS中:

    public final void acquireShared(int arg) {
       if (tryAcquireShared(arg) < 0)
           doAcquireShared(arg);
   }

tryAcqurireShared(int arg)嘗試獲取讀同步狀態,該方法主要用於獲取共享式同步狀態,獲取成功傳回 >= 0的傳回結果,否則傳回 < 0 的傳回結果。

    protected final int tryAcquireShared(int unused) {
       //當前執行緒
       Thread current = Thread.currentThread();
       int c = getState();
       //exclusiveCount(c)計算寫鎖
       //如果存在寫鎖,且鎖的持有者不是當前執行緒,直接傳回-1
       //存在鎖降級問題,後續闡述
       if (exclusiveCount(c) != 0 &&
               getExclusiveOwnerThread() != current)
           return -1;
       //讀鎖
       int r = sharedCount(c);

       /*
        * readerShouldBlock():讀鎖是否需要等待(公平鎖原則)
        * r < MAX_COUNT:持有執行緒小於最大數(65535)
        * compareAndSetState(c, c + SHARED_UNIT):設定讀取鎖狀態
        */

       if (!readerShouldBlock() &&
               r < MAX_COUNT &&
               compareAndSetState(c, c + SHARED_UNIT)) {
           /*
            * holdCount部分後面講解
            */

           if (r == 0) {
               firstReader = current;
               firstReaderHoldCount = 1;
           } else if (firstReader == current) {
               firstReaderHoldCount++;
           } else {
               HoldCounter rh = cachedHoldCounter;
               if (rh == null || rh.tid != getThreadId(current))
                   cachedHoldCounter = rh = readHolds.get();
               else if (rh.count == 0)
                   readHolds.set(rh);
               rh.count++;
           }
           return 1;
       }
       return fullTryAcquireShared(current);
   }

讀鎖獲取的過程相對於獨佔鎖而言會稍微複雜下,整個過程如下:

  1. 因為存在鎖降級情況,如果存在寫鎖且鎖的持有者不是當前執行緒則直接傳回失敗,否則繼續

  2. 依據公平性原則,判斷讀鎖是否需要阻塞,讀鎖持有執行緒數小於最大值(65535),且設定鎖狀態成功,執行以下程式碼(對於HoldCounter下麵再闡述),並傳回1。如果不滿足改條件,執行fullTryAcquireShared()。

    final int fullTryAcquireShared(Thread current) {
       HoldCounter rh = null;
       for (;;) {
           int c = getState();
           //鎖降級
           if (exclusiveCount(c) != 0) {
               if (getExclusiveOwnerThread() != current)
                   return -1;
           }
           //讀鎖需要阻塞
           else if (readerShouldBlock()) {
               //列頭為當前執行緒
               if (firstReader == current) {
               }
               //HoldCounter後面講解
               else {
                   if (rh == null) {
                       rh = cachedHoldCounter;
                       if (rh == null || rh.tid != getThreadId(current)) {
                           rh = readHolds.get();
                           if (rh.count == 0)
                               readHolds.remove();
                       }
                   }
                   if (rh.count == 0)
                       return -1;
               }
           }
           //讀鎖超出最大範圍
           if (sharedCount(c) == MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
           //CAS設定讀鎖成功
           if (compareAndSetState(c, c + SHARED_UNIT)) {
               //如果是第1次獲取“讀取鎖”,則更新firstReader和firstReaderHoldCount
               if (sharedCount(c) == 0) {
                   firstReader = current;
                   firstReaderHoldCount = 1;
               }
               //如果想要獲取鎖的執行緒(current)是第1個獲取鎖(firstReader)的執行緒,則將firstReaderHoldCount+1
               else if (firstReader == current) {
                   firstReaderHoldCount++;
               } else {
                   if (rh == null)
                       rh = cachedHoldCounter;
                   if (rh == null || rh.tid != getThreadId(current))
                       rh = readHolds.get();
                   else if (rh.count == 0)
                       readHolds.set(rh);
                   //更新執行緒的獲取“讀取鎖”的共享計數
                   rh.count++;
                   cachedHoldCounter = rh; // cache for release
               }
               return 1;
           }
       }
   }

fullTryAcquireShared(Thread current)會根據“是否需要阻塞等待”,“讀取鎖的共享計數是否超過限制”等等進行處理。如果不需要阻塞等待,並且鎖的共享計數沒有超過限制,則透過CAS嘗試獲取鎖,並傳回1

讀鎖的釋放

與寫鎖相同,讀鎖也提供了unlock()釋放讀鎖:

        public void unlock() {
           sync.releaseShared(1);
       }

unlcok()方法內部使用Sync的releaseShared(int arg)方法,該方法定義在AQS中:

    public final boolean releaseShared(int arg) {
       if (tryReleaseShared(arg)) {
           doReleaseShared();
           return true;
       }
       return false;
   }

呼叫tryReleaseShared(int arg)嘗試釋放讀鎖,該方法定義在讀寫鎖的Sync內部類中:

    protected final boolean tryReleaseShared(int unused) {
       Thread current = Thread.currentThread();
       //如果想要釋放鎖的執行緒為第一個獲取鎖的執行緒
       if (firstReader == current) {
           //僅獲取了一次,則需要將firstReader 設定null,否則 firstReaderHoldCount - 1
           if (firstReaderHoldCount == 1)
               firstReader = null;
           else
               firstReaderHoldCount--;
       }
       //獲取rh物件,並更新“當前執行緒獲取鎖的資訊”
       else {
           HoldCounter rh = cachedHoldCounter;
           if (rh == null || rh.tid != getThreadId(current))
               rh = readHolds.get();
           int count = rh.count;
           if (count <= 1) {
               readHolds.remove();
               if (count <= 0)
                   throw unmatchedUnlockException();
           }
           --rh.count;
       }
       //CAS更新同步狀態
       for (;;) {
           int c = getState();
           int nextc = c - SHARED_UNIT;
           if (compareAndSetState(c, nextc))
               return nextc == 0;
       }
   }

HoldCounter

在讀鎖獲取鎖和釋放鎖的過程中,我們一直都可以看到一個變數rh (HoldCounter ),該變數在讀鎖中扮演著非常重要的作用。

我們瞭解讀鎖的內在機制其實就是一個共享鎖,為了更好理解HoldCounter ,我們暫且認為它不是一個鎖的機率,而相當於一個計數器。一次共享鎖的操作就相當於在該計數器的操作。獲取共享鎖,則該計數器 + 1,釋放共享鎖,該計數器 – 1。只有當執行緒獲取共享鎖後才能對共享鎖進行釋放、重入操作。所以HoldCounter的作用就是當前執行緒持有共享鎖的數量,這個數量必須要與執行緒系結在一起,否則操作其他執行緒鎖就會丟擲異常。我們先看HoldCounter的定義:

        static final class HoldCounter {
           int count = 0;
           final long tid = getThreadId(Thread.currentThread());
       }

HoldCounter 定義非常簡單,就是一個計數器count 和執行緒 id tid 兩個變數。按照這個意思我們看到HoldCounter 是需要和某給執行緒進行系結了,我們知道如果要將一個物件和執行緒系結僅僅有tid是不夠的,而且從上面的程式碼我們可以看到HoldCounter 僅僅只是記錄了tid,根本起不到系結執行緒的作用。那麼怎麼實現呢?答案是ThreadLocal,定義如下:

        static final class ThreadLocalHoldCounter
           extends ThreadLocal<HoldCounter>
{
           public HoldCounter initialValue() {
               return new HoldCounter();
           }
       }

透過上面程式碼HoldCounter就可以與執行緒進行系結了。故而,HoldCounter應該就是系結執行緒上的一個計數器,而ThradLocalHoldCounter則是執行緒系結的ThreadLocal。從上面我們可以看到ThreadLocal將HoldCounter系結到當前執行緒上,同時HoldCounter也持有執行緒Id,這樣在釋放鎖的時候才能知道ReadWriteLock裡面快取的上一個讀取執行緒(cachedHoldCounter)是否是當前執行緒。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter系結執行緒id而不繫結執行緒物件的原因是避免HoldCounter和ThreadLocal互相系結而GC難以釋放它們(儘管GC能夠智慧的發現這種取用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助GC快速回收物件而已。

看到這裡我們明白了HoldCounter作用了,我們在看一個獲取讀鎖的程式碼段:

                else if (firstReader == current) {
                   firstReaderHoldCount++;
               } else {
                   if (rh == null)
                       rh = cachedHoldCounter;
                   if (rh == null || rh.tid != getThreadId(current))
                       rh = readHolds.get();
                   else if (rh.count == 0)
                       readHolds.set(rh);
                   rh.count++;
                   cachedHoldCounter = rh; // cache for release
               }

這段程式碼涉及了幾個變數:firstReader 、firstReaderHoldCount、cachedHoldCounter 。我們先理清楚這幾個變數:

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
private transient HoldCounter cachedHoldCounter;

firstReader 看名字就明白了為第一個獲取讀鎖的執行緒,firstReaderHoldCount為第一個獲取讀鎖的重入數,cachedHoldCounter為HoldCounter的快取。

理清楚上面所有的變數了,HoldCounter也明白了,我們就來給上面那段程式碼標明註釋,如下:

    //如果獲取讀鎖的執行緒為第一次獲取讀鎖的執行緒,則firstReaderHoldCount重入數 + 1
   else if (firstReader == current) {
       firstReaderHoldCount++;
   } else {
       //非firstReader計數
       if (rh == null)
           rh = cachedHoldCounter;
       //rh == null 或者 rh.tid != current.getId(),需要獲取rh
       if (rh == null || rh.tid != getThreadId(current))
           rh = readHolds.get();
           //加入到readHolds中
       else if (rh.count == 0)
           readHolds.set(rh);
       //計數+1
       rh.count++;
       cachedHoldCounter = rh; // cache for release
   }

這裡解釋下為何要引入firstRead、firstReaderHoldCount。這是為了一個效率問題,firstReader是不會放入到readHolds中的,如果讀鎖僅有一個的情況下就會避免查詢readHolds。

鎖降級

上開篇是LZ就闡述了讀寫鎖有一個特性就是鎖降級,鎖降級就意味著寫鎖是可以降級為讀鎖的,但是需要遵循先獲取寫鎖、獲取讀鎖在釋放寫鎖的次序。註意如果當前執行緒先獲取寫鎖,然後釋放寫鎖,再獲取讀鎖這個過程不能稱之為鎖降級,鎖降級一定要遵循那個次序。

在獲取讀鎖的方法tryAcquireShared(int unused)中,有一段程式碼就是來判讀鎖降級的:

        int c = getState();
       //exclusiveCount(c)計算寫鎖
       //如果存在寫鎖,且鎖的持有者不是當前執行緒,直接傳回-1
       //存在鎖降級問題,後續闡述
       if (exclusiveCount(c) != 0 &&
               getExclusiveOwnerThread() != current)
           return -1;
       //讀鎖
       int r = sharedCount(c);

鎖降級中讀鎖的獲取釋放為必要?肯定是必要的。試想,假如當前執行緒A不獲取讀鎖而是直接釋放了寫鎖,這個時候另外一個執行緒B獲取了寫鎖,那麼這個執行緒B對資料的修改是不會對當前執行緒A可見的。如果獲取了讀鎖,則執行緒B在獲取寫鎖過程中判斷如果有讀鎖還沒有釋放則會被阻塞,只有當前執行緒A釋放讀鎖後,執行緒B才會獲取寫鎖成功。

推薦閱讀

因為裡面很多地方涉及到了AQS部分,推薦閱讀如下部分:

  1. 【死磕Java併發】—–J.U.C之AQS:AQS簡介

  2. 【死磕Java併發】—–J.U.C之AQS:CLH同步佇列

  3. 【死磕Java併發】—–J.U.C之AQS:同步狀態的獲取與釋放

  4. 【死磕Java併發】—–J.U.C之AQS:阻塞和喚醒執行緒

參考資料

  1. Doug Lea:《Java併發程式設計實戰》

  2. 方騰飛:《Java併發程式設計的藝術》

  3. 【Java併發程式設計實戰】—–“J.U.C”:ReentrantReadWriteLock

  4. Java多執行緒(十)之ReentrantReadWriteLock深入分析

贊(0)

分享創造快樂