歡迎光臨
每天分享高質量文章

【死磕Java併發】—- 深入分析CAS

CAS,Compare And Swap,即比較並交換。Doug lea大神在同步組件中大量使用CAS技術鬼斧神工地實現了Java多執行緒的併發操作。整個AQS同步組件、Atomic原子類操作等等都是以CAS實現的,甚至ConcurrentHashMap在1.8的版本中也調整為了CAS+Synchronized。可以說CAS是整個JUC的基石。

這裡寫圖片描述

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群討論技術和原始碼。

CAS分析

在CAS中有三個引數:記憶體值V、舊的預期值A、要更新的值B,當且僅當記憶體值V的值等於舊的預期值A時才會將記憶體值V的值修改為B,否則什麼都不乾。其偽代碼如下:

if(this.value == A){
   this.value = B
   return true;
}else{
   return false;
}

JUC下的atomic類都是通過CAS來實現的,下麵就以AtomicInteger為例來闡述CAS的實現。如下:

    private static final Unsafe unsafe = Unsafe.getUnsafe();
   private static final long valueOffset;

   static {
       try {
           valueOffset = unsafe.objectFieldOffset
               (AtomicInteger.class.getDeclaredField("value"));
       } catch (Exception ex) { throw new Error(ex); }
   }

   private volatile int value;

Unsafe是CAS的核心類,Java無法直接訪問底層操作系統,而是通過本地(native)方法來訪問。不過儘管如此,JVM還是開了一個後門:Unsafe,它提供了硬體級別的原子操作。

valueOffset為變數值在記憶體中的偏移地址,unsafe就是通過偏移地址來得到資料的原值的。

value當前值,使用volatile修飾,保證多執行緒環境下看見的是同一個。

我們就以AtomicInteger的addAndGet()方法來做說明,先看原始碼:

    public final int addAndGet(int delta) {
       return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
   }

   public final int getAndAddInt(Object var1, long var2, int var4) {
       int var5;
       do {
           var5 = this.getIntVolatile(var1, var2);
       } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

       return var5;
   }

內部呼叫unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

該方法為本地方法,有四個引數,分別代表:物件、物件的地址、預期值、修改值(有位伙伴告訴我他面試的時候就問到這四個變數是啥意思…+_+)。該方法的實現這裡就不做詳細介紹了,有興趣的伙伴可以看看openjdk的原始碼。

CAS可以保證一次的讀-改-寫操作是原子操作,在單處理器上該操作容易實現,但是在多處理器上實現就有點兒複雜了。

CPU提供了兩種方法來實現多處理器的原子操作:總線加鎖或者快取加鎖。

總線加鎖:總線加鎖就是就是使用處理器提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時,其他處理器的請求將被阻塞住,那麼該處理器可以獨占使用共享記憶體。但是這種處理方式顯得有點兒霸道,不厚道,他把CPU和記憶體之間的通信鎖住了,在鎖定期間,其他處理器都不能其他記憶體地址的資料,其開銷有點兒大。所以就有了快取加鎖。

快取加鎖:其實針對於上面那種情況我們只需要保證在同一時刻對某個記憶體地址的操作是原子性的即可。快取加鎖就是快取在記憶體區域的資料如果在加鎖期間,當它執行鎖操作寫回記憶體時,處理器不在輸出LOCK#信號,而是修改內部的記憶體地址,利用快取一致性協議來保證原子性。快取一致性機制可以保證同一個記憶體區域的資料僅能被一個處理器修改,也就是說當CPU1修改快取行中的i時使用快取鎖定,那麼CPU2就不能同時快取了i的快取行。

CAS缺陷

CAS雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現在三個方法:迴圈時間太長、只能保證一個共享變數原子操作、ABA問題。

迴圈時間太長

如果CAS一直不成功呢?這種情況絕對有可能發生,如果自旋CAS長時間地不成功,則會給CPU帶來非常大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。

只能保證一個共享變數原子操作

看了CAS的實現就知道這隻能針對一個共享變數,如果是多個共享變數就只能使用鎖了,當然如果你有辦法把多個變數整成一個變數,利用CAS也不錯。例如讀寫鎖中state的高地位

ABA問題

CAS需要檢查操作值有沒有發生改變,如果沒有發生改變則更新。但是存在這樣一種情況:如果一個值原來是A,變成了B,然後又變成了A,那麼在CAS檢查的時候會發現沒有改變,但是實質上它已經發生了改變,這就是所謂的ABA問題。對於ABA問題其解決方案是加上版本號,即在每個變數都加上一個版本號,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。

用一個例子來闡述ABA問題所帶來的影響。

有如下鏈表

這裡寫圖片描述

假如我們想要把B替換為A,也就是compareAndSet(this,A,B)。執行緒1執行B替換A操作,執行緒2主要執行如下動作,A 、B出棧,然後C、A入棧,最終該鏈表如下:

這裡寫圖片描述

完成後執行緒1發現仍然是A,那麼compareAndSet(this,A,B)成功,但是這時會存在一個問題就是B.next = null,compareAndSet(this,A,B)後,會導致C丟失,改棧僅有一個B元素,平白無故把C給丟失了。

CAS的ABA隱患問題,解決方案則是版本號,Java提供了AtomicStampedReference來解決。AtomicStampedReference通過包裝[E,Integer]的元組來對物件標記版本戳stamp,從而避免ABA問題。對於上面的案例應該執行緒1會失敗。

AtomicStampedReference的compareAndSet()方法定義如下:

    public boolean compareAndSet(V   expectedReference,
                                V   newReference,
                                int expectedStamp,
                                int newStamp)
{
       Pair current = pair;
       return
           expectedReference == current.reference &&
           expectedStamp == current.stamp &&
           ((newReference == current.reference &&
             newStamp == current.stamp) ||
            casPair(current, Pair.of(newReference, newStamp)));
   }

compareAndSet有四個引數,分別表示:預期取用、更新後的取用、預期標誌、更新後的標誌。原始碼部門很好理解預期的取用 == 當前取用,預期的標識 == 當前標識,如果更新後的取用和標誌和當前的取用和標誌相等則直接傳回true,否則通過Pair生成一個新的pair物件與當前pair CAS替換。Pair為AtomicStampedReference的內部類,主要用於記錄取用和版本戳信息(標識),定義如下:

    private static class Pair<T> {
       final T reference;
       final int stamp;
       private Pair(T reference, int stamp) {
           this.reference = reference;
           this.stamp = stamp;
       }
       static Pair of(T reference, int stamp) {
           return new Pair(reference, stamp);
       }
   }

   private volatile Pair pair;

Pair記錄著物件的取用和版本戳,版本戳為int型,保持自增。同時Pair是一個不可變物件,其所有屬性全部定義為final,對外提供一個of方法,該方法傳回一個新建的Pari物件。pair物件定義為volatile,保證多執行緒環境下的可見性。在AtomicStampedReference中,大多方法都是通過呼叫Pair的of方法來產生一個新的Pair物件,然後賦值給變數pair。如set方法:

    public void set(V newReference, int newStamp) {
       Pair current = pair;
       if (newReference != current.reference || newStamp != current.stamp)
           this.pair = Pair.of(newReference, newStamp);
   }

下麵我們將通過一個例子可以可以看到AtomicStampedReference和AtomicInteger的區別。我們定義兩個執行緒,執行緒1負責將100 —> 110 —> 100,執行緒2執行 100 —>120,看兩者之間的區別。

public class Test {
   private static AtomicInteger atomicInteger = new AtomicInteger(100);
   private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);

   public static void main(String[] args) throws InterruptedException {

       //AtomicInteger
       Thread at1 = new Thread(new Runnable() {
           @Override
           public void run() {
               atomicInteger.compareAndSet(100,110);
               atomicInteger.compareAndSet(110,100);
           }
       });

       Thread at2 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   TimeUnit.SECONDS.sleep(2);      // at1,執行完
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120));
           }
       });

       at1.start();
       at2.start();

       at1.join();
       at2.join();

       //AtomicStampedReference

       Thread tsf1 = new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   //讓 tsf2先獲取stamp,導致預期時間戳不一致
                   TimeUnit.SECONDS.sleep(2);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               // 預期取用:100,更新後的取用:110,預期標識getStamp() 更新後的標識getStamp() + 1
               atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
               atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
           }
       });

       Thread tsf2 = new Thread(new Runnable() {
           @Override
           public void run() {
               int stamp = atomicStampedReference.getStamp();

               try {
                   TimeUnit.SECONDS.sleep(2);      //執行緒tsf1執行完
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));
           }
       });

       tsf1.start();
       tsf2.start();
   }

}

運行結果:

這裡寫圖片描述

運行結果充分展示了AtomicInteger的ABA問題和AtomicStampedReference解決ABA問題。

赞(0)

分享創造快樂