歡迎光臨
每天分享高質量文章

一些常見的併發程式設計錯誤 | Linux 中國

Go 並不會阻止一些因 Go 程式員粗心大意或者缺乏經驗而造成的併發程式設計錯誤。在本文的下麵部分將展示一些在 Go 程式設計中常見的併發程式設計錯誤,以幫助 Go 程式員們避免再犯類似的錯誤。
— Go101


致謝
編譯自 | https://go101.org/article/concurrent-common-mistakes.html 
 作者 | Go101
 譯者 | qhwdw ? ? ? ? ? 共計翻譯:117 篇 貢獻時間:212 天

Go 是一個內建支援併發程式設計的語言。藉助使用 go 關鍵字去建立協程goroutine(輕量級執行緒)和在 Go 中提供的 使用[1] 通道[2] 和 其它的併發[3] 同步方法[4],使得併發程式設計變得很容易、很靈活和很有趣。

另一方面,Go 並不會阻止一些因 Go 程式員粗心大意或者缺乏經驗而造成的併發程式設計錯誤。在本文的下麵部分將展示一些在 Go 程式設計中常見的併發程式設計錯誤,以幫助 Go 程式員們避免再犯類似的錯誤。

需要同步的時候沒有同步

程式碼行或許 不是按出現的順序執行的[5]

在下麵的程式中有兩個錯誤。

◈ 第一,在 main 協程中讀取 b 和在新的 協程 中寫入 b 可能導致資料爭用。
◈ 第二,條件 b == true 並不能保證在 main 協程 中的 a != nil。在新的協程中編譯器和 CPU 可能會透過 重排序指令[5] 進行最佳化,因此,在執行時 b 賦值可能發生在 a 賦值之前,在 main 協程 中當 a 被修改後,它將會讓部分 a 一直保持為 nil
  1. package main

  2. import (

  3.    "time"

  4.    "runtime"

  5. )

  6. func main() {

  7.    var a []int // nil

  8.    var b bool  // false

  9.    // a new goroutine

  10.    go func () {

  11.        a = make([]int, 3)

  12.        b = true // write b

  13.    }()

  14.    for !b { // read b

  15.        time.Sleep(time.Second)

  16.        runtime.Gosched()

  17.    }

  18.    a[0], a[1], a[2] = 0, 1, 2 // might panic

  19. }

上面的程式或者在一臺計算機上執行的很好,但是在另一臺上可能會引發異常。或者它可能運行了 N 次都很好,但是可能在第 (N+1) 次引發了異常。

我們將使用 sync 標準包中提供的通道或者同步方法去確保記憶體中的順序。例如,

  1. package main

  2. func main() {

  3.    var a []int = nil

  4.    c := make(chan struct{})

  5.    // a new goroutine

  6.    go func () {

  7.        a = make([]int, 3)

  8.        c struct{}{}

  9.    }()

  10.    c

  11.    a[0], a[1], a[2] = 0, 1, 2

  12. }

使用 time.Sleep 呼叫去做同步

我們先來看一個簡單的例子。

  1. package main

  2. import (

  3.    "fmt"

  4.    "time"

  5. )

  6. func main() {

  7.    var x = 123

  8.    go func() {

  9.        x = 789 // write x

  10.    }()

  11.    time.Sleep(time.Second)

  12.    fmt.Println(x) // read x

  13. }

我們預期程式將打印出 789。如果我們執行它,通常情況下,它確定列印的是 789。但是,這個程式使用的同步方式好嗎?No!原因是 Go 執行時並不保證 x 的寫入一定會發生在 x 的讀取之前。在某些條件下,比如在同一個作業系統上,大部分 CPU 資源被其它執行的程式所佔用的情況下,寫入 x 可能就會發生在讀取 x 之後。這就是為什麼我們在正式的專案中,從來不使用 time.Sleep 呼叫去實現同步的原因。

我們來看一下另外一個示例。

  1. package main

  2. import (

  3.    "fmt"

  4.    "time"

  5. )

  6. var x = 0

  7. func main() {

  8.    var num = 123

  9.    var p = &num

  10.    c := make(chan int)

  11.    go func() {

  12.        c *p + x

  13.    }()

  14.    time.Sleep(time.Second)

  15.    num = 789

  16.    fmt.Println(c)

  17. }

你認為程式的預期輸出是什麼?123 還是 789?事實上它的輸出與編譯器有關。對於標準的 Go 編譯器 1.10 來說,這個程式很有可能輸出是 123。但是在理論上,它可能輸出的是 789,或者其它的隨機數。

現在,我們來改變 c  為 c ,然後再次執行這個程式。你將會發現輸出變成了 789 (使用標準的 Go 編譯器 1.10)。這再次說明它的輸出是與編譯器相關的。

是的,在上面的程式中存在資料爭用。運算式 *p 可能會被先計算、後計算、或者在處理賦值陳述句 num = 789 時計算。time.Sleep 呼叫並不能保證 *p 發生在賦值陳述句處理之前進行。

對於這個特定的示例,我們將在新的協程建立之前,將值儲存到一個臨時值中,然後在新的協程中使用臨時值去消除資料爭用。

  1. ...

  2.    tmp := *p + x

  3.    go func() {

  4.        c tmp

  5.    }()

  6. ...

使協程掛起

掛起協程是指讓協程一直處於阻塞狀態。導致協程被掛起的原因很多。比如,

◈ 一個協程嘗試從一個 nil 通道中或者從一個沒有其它協程給它傳送值的通道中檢索資料。
◈ 一個協程嘗試去傳送一個值到 nil 通道,或者傳送到一個沒有其它的協程接收值的通道中。
◈ 一個協程被它自己死鎖。
◈ 一組協程彼此死鎖。
◈ 當執行一個沒有 default 分支的 select 程式碼塊時,一個協程被阻塞,以及在 select 程式碼塊中  case 關鍵字後的所有通道操作保持阻塞狀態。

除了有時我們為了避免程式退出,特意讓一個程式中的 main 協程保持掛起之外,大多數其它的協程掛起都是意外情況。Go 執行時很難判斷一個協程到底是處於掛起狀態還是臨時阻塞。因此,Go 執行時並不會去釋放一個掛起的協程所佔用的資源。

在 誰先響應誰獲勝[6] 的通道使用案例中,如果使用的 future 通道容量不夠大,當嘗試向 Future 通道傳送結果時,一些響應較慢的通道將被掛起。比如,如果呼叫下麵的函式,將有 4 個協程處於永遠阻塞狀態。

  1. func request() int {

  2.    c := make(chan int)

  3.    for i := 0; i < 5; i++ {

  4.        i := i

  5.        go func() {

  6.            c i // 4 goroutines will hang here.

  7.        }()

  8.    }

  9.    return c

  10. }

為避免這 4 個協程一直處於掛起狀態, c 通道的容量必須至少是  4

在 實現誰先響應誰獲勝的第二種方法[7] 的通道使用案例中,如果將 future 通道用做非緩衝通道,那麼有可能這個資訊將永遠也不會有響應而掛起。例如,如果在一個協程中呼叫下麵的函式,協程可能會掛起。原因是,如果接收操作   準備就緒之前,五個傳送操作全部嘗試傳送,那麼所有的嘗試傳送的操作將全部失敗,因此那個呼叫者協程將永遠也不會接收到值。

  1. func request() int {

  2.    c := make(chan int)

  3.    for i := 0; i < 5; i++ {

  4.        i := i

  5.        go func() {

  6.            select {

  7.            case c i:

  8.            default:

  9.            }

  10.        }()

  11.    }

  12.    return c

  13. }

將通道 c 變成緩衝通道將保證五個傳送操作中的至少一個操作會傳送成功,這樣,上面函式中的那個呼叫者協程將不會被掛起。

在 sync 標準包中複製型別值

在實踐中,sync 標準包中的型別值不會被複製。我們應該只複製這個值的指標。

下麵是一個錯誤的併發程式設計示例。在這個示例中,當呼叫 Counter.Value 方法時,將複製一個 Counter 接收值。作為接收值的一個欄位,Counter 接收值的各個 Mutex 欄位也會被複製。複製不是同步發生的,因此,複製的 Mutex 值可能會出錯。即便是沒有錯誤,複製的 Counter 接收值的訪問保護也是沒有意義的。

  1. import "sync"

  2. type Counter struct {

  3.    sync.Mutex

  4.    n int64

  5. }

  6. // This method is okay.

  7. func (c *Counter) Increase(d int64) (r int64) {

  8.    c.Lock()

  9.    c.n += d

  10.    r = c.n

  11.    c.Unlock()

  12.    return

  13. }

  14. // The method is bad. When it is called, a Counter

  15. // receiver value will be copied.

  16. func (c Counter) Value() (r int64) {

  17.    c.Lock()

  18.    r = c.n

  19.    c.Unlock()

  20.    return

  21. }

我們只需要改變 Value 接收型別方法為指標型別 *Counter,就可以避免複製 Mutex 值。

在官方的 Go SDK 中提供的 go vet 命令將會報告潛在的錯誤值複製。

在錯誤的地方呼叫 sync.WaitGroup 的方法

每個 sync.WaitGroup 值維護一個內部計數器,這個計數器的初始值為 0。如果一個 WaitGroup 計數器的值是 0,呼叫 WaitGroup 值的 Wait 方法就不會被阻塞,否則,在計數器值為 0 之前,這個呼叫會一直被阻塞。

為了讓 WaitGroup 值的使用有意義,當一個 WaitGroup 計數器值為 0 時,必須在相應的 WaitGroup 值的  Wait 方法呼叫之前,去呼叫 WaitGroup 值的 Add 方法。

例如,下麵的程式中,在不正確位置呼叫了 Add 方法,這將使最後打印出的數字不總是 100。事實上,這個程式最後列印的數字可能是在 [0, 100) 範圍內的一個隨意數字。原因就是 Add 方法的呼叫並不保證一定會發生在 Wait 方法呼叫之前。

  1. package main

  2. import (

  3.    "fmt"

  4.    "sync"

  5.    "sync/atomic"

  6. )

  7. func main() {

  8.    var wg sync.WaitGroup

  9.    var x int32 = 0

  10.    for i := 0; i < 100; i++ {

  11.        go func() {

  12.            wg.Add(1)

  13.            atomic.AddInt32(&x, 1)

  14.            wg.Done()

  15.        }()

  16.    }

  17.    fmt.Println("To wait ...")

  18.    wg.Wait()

  19.    fmt.Println(atomic.LoadInt32(&x))

  20. }

為讓程式的表現符合預期,在 for 迴圈中,我們將把 Add 方法的呼叫移動到建立的新協程的範圍之外,修改後的程式碼如下。

  1. ...

  2.    for i := 0; i < 100; i++ {

  3.        wg.Add(1)

  4.        go func() {

  5.            atomic.AddInt32(&x, 1)

  6.            wg.Done()

  7.        }()

  8.    }

  9. ...

不正確使用 futures 通道

在 通道使用案例[1] 的文章中,我們知道一些函式將傳回 futures 通道[8]。假設 fa 和 fb 就是這樣的兩個函式,那麼下麵的呼叫就使用了不正確的 future 引數。

  1. doSomethingWithFutureArguments(fa(), fb())

在上面的程式碼行中,兩個通道接收操作是順序進行的,而不是併發的。我們做如下修改使它變成併發操作。

  1. ca, cb := fa(), fb()

  2. doSomethingWithFutureArguments(c1, c2)

沒有等協程的最後的活動的傳送結束就關閉通道

Go 程式員經常犯的一個錯誤是,還有一些其它的協程可能會傳送值到以前的通道時,這個通道就已經被關閉了。當這樣的傳送(傳送到一個已經關閉的通道)真實發生時,將引發一個異常。

這種錯誤在一些以往的著名 Go 專案中也有發生,比如在 Kubernetes 專案中的 這個 bug[9] 和 這個 bug[10]

如何安全和優雅地關閉通道,請閱讀 這篇文章[11]

在值上做 64 位原子操作時沒有保證值地址 64 位對齊

到目前為止(Go 1.10),在標準的 Go 編譯器中,在一個 64 位原子操作中涉及到的值的地址要求必須是 64 位對齊的。如果沒有對齊則導致當前的協程異常。對於標準的 Go 編譯器來說,這種失敗僅發生在 32 位的架構上。請閱讀 記憶體佈局[12] 去瞭解如何在一個 32 位作業系統上保證 64 位對齊。

沒有註意到大量的資源被 time.After 函式呼叫佔用

在 time 標準包中的 After 函式傳回 一個延遲通知的通道[13]。這個函式在某些情況下用起來很便捷,但是,每次呼叫它將建立一個 time.Timer 型別的新值。這個新建立的 Timer 值在透過傳遞引數到  After 函式指定期間保持啟用狀態,如果在這個期間過多的呼叫了該函式,可能會有太多的 Timer 值保持啟用,這將佔用大量的記憶體和計算資源。

例如,如果呼叫了下列的 longRunning 函式,將在一分鐘內產生大量的訊息,然後在某些週期內將有大量的 Timer 值保持啟用,即便是大量的這些 Timer 值已經沒用了也是如此。

  1. import (

  2.    "fmt"

  3.    "time"

  4. )

  5. // The function will return if a message arrival interval

  6. // is larger than one minute.

  7. func longRunning(messages chan string) {

  8.    for {

  9.        select {

  10.        case time.After(time.Minute):

  11.            return

  12.        case msg := messages:

  13.            fmt.Println(msg)

  14.        }

  15.    }

  16. }

為避免在上述程式碼中建立過多的 Timer 值,我們將使用一個單一的 Timer 值去完成同樣的任務。

  1. func longRunning(messages chan string) {

  2.    timer := time.NewTimer(time.Minute)

  3.    defer timer.Stop()

  4.    for {

  5.        select {

  6.        case timer.C:

  7.            return

  8.        case msg := messages:

  9.            fmt.Println(msg)

  10.            if !timer.Stop() {

  11.                timer.C

  12.            }

  13.        }

  14.        // The above "if" block can also be put here.

  15.        timer.Reset(time.Minute)

  16.    }

  17. }

不正確地使用 time.Timer 值

在最後,我們將展示一個符合語言使用習慣的 time.Timer 值的使用示例。需要註意的一個細節是,那個 Reset 方法總是在停止或者 time.Timer 值釋放時被使用。

在 select 塊的第一個 case 分支的結束部分,time.Timer 值被釋放,因此,我們不需要去停止它。但是必須在第二個分支中停止定時器。如果在第二個分支中 if 程式碼塊缺失,它可能至少在 Reset 方法呼叫時,會(透過 Go 執行時)傳送到 timer.C 通道,並且那個 longRunning 函式可能會早於預期傳回,對於 Reset 方法來說,它可能僅僅是重置內部定時器為 0,它將不會清理(耗盡)那個傳送到 timer.C 通道的值。

例如,下麵的程式很有可能在一秒內而不是十秒時退出。並且更重要的是,這個程式並不是 DRF 的(LCTT 譯註:data race free,多執行緒程式的一種同步程度)。

  1. package main

  2. import (

  3.    "fmt"

  4.    "time"

  5. )

  6. func main() {

  7.    start := time.Now()

  8.    timer := time.NewTimer(time.Second/2)

  9.    select {

  10.    case timer.C:

  11.    default:

  12.        time.Sleep(time.Second) // go here

  13.    }

  14.    timer.Reset(time.Second * 10)

  15.    timer.C

  16.    fmt.Println(time.Since(start)) // 1.000188181s

  17. }

當 time.Timer 的值不再被其它任何一個東西使用時,它的值可能被停留在一種非停止狀態,但是,建議在結束時停止它。

在多個協程中如果不按建議使用 time.Timer 值併發,可能會有 bug 隱患。

我們不應該依賴一個 Reset 方法呼叫的傳回值。Reset 方法傳回值的存在僅僅是為了相容性目的。


via: https://go101.org/article/concurrent-common-mistakes.html

作者:go101.org[15] 譯者:qhwdw 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出

贊(0)

分享創造快樂

© 2022 知識星球   網站地圖