歡迎光臨
每天分享高質量文章

Go併發呼叫的超時處理

之前有聊過 golang 的協程,我發覺似乎還很理論,特別是在併發安全上,所以特結合網上的一些例子,來試驗下go routine中 的 channel, select, context 的妙用。

場景-微服務呼叫

我們用 gin(一個web框架) 作為處理請求的工具,沒有安裝過的話,需求是這樣的:
一個請求 X 會去並行呼叫 A, B, C 三個方法,並把三個方法傳回的結果加起來作為 X 請求的 Response。
但是我們這個 Response 是有時間要求的(不能超過3秒的響應時間)。

可能 A, B, C 中任意一個或兩個,處理邏輯十分複雜,或者資料量超大,導致處理時間超出預期,
那麼我們就馬上切斷,並傳回已經拿到的任意個傳回結果之和。

我們先來定義主函式:

 

func main() {
  r := gin.New()
  r.GET("/calculate", calHandler)
  http.ListenAndServe(":8008", r)
}

非常簡單,普通的請求接受和 handler 定義。其中 calHandler 是我們用來處理請求的函式。

分別定義三個假的微服務,其中第三個將會是我們超時的哪位~

func microService1() int {
  time.Sleep(1*time.Second)
  return 1
}

func microService2() int {
  time.Sleep(2*time.Second)
  return 2
}

func microService3() int {
  time.Sleep(10*time.Second)
  return 3
}

接下來,我們看看 calHandler 里到底是什麼?

 

func calHandler(c *gin.Context) {
    ...
}

 

要點1–併發呼叫

直接用 go 就好了嘛~ 所以一開始我們可能就這麼寫:

go microService1()
go microService2()
go microService3()

 

很簡單有沒有,但是等等,說好的傳回值我怎麼接呢? 為了能夠並行地接受處理結果,我們很容易想到用 channel 去接。 所以我們把呼叫服務改成這樣:

 

var resChan = make(chan int, 3) // 因為有3個結果,所以我們創建一個可以容納3個值的 int channel。
go func() {
    resChan }()

go func() {
    resChan }()

go func() {
    resChan }()

 

有東西接,那也要有方法去算,所以我們加一個一直迴圈拿 resChan 中結果並計算的方法:

 

var resContainer, sum int
for {
    resContainer =     sum += resContainer
}

 

這樣一來我們就有一個 sum 來計算每次從 resChan 中拿出的結果了。

要點2–超時信號。

還沒結束,說好的超時處理呢?
為了實現超時處理,我們需要引入一個東西,就是 context,
什麼是 context ?我們這裡只使用 context 的一個特性,超時通知(其實這個特性完全可以用 channel 來替代)。

可以看在定義 calHandler 的時候我們已經將 c *gin.Context 作為引數傳了進來,那我們就不用自己在宣告了。
gin.Context 簡單理解為貫穿整個 gin 宣告周期的背景關係容器,有點像是分身,亦或是量子糾纏的感覺。

有了這個 gin.Context, 我們就能在一個地方對 context 做出操作,而其他正在使用 context 的函式或方法,也會感受到 context 做出的變化。

 

ctx, _ := context.WithTimeout(c, 3*time.Second) //定義一個超時的 context

 

只要時間到了,我們就能用 ctx.Done() 獲取到一個超時的 channel(通知),然後其他用到這個 ctx 的地方也會停掉,並釋放 ctx。
一般來說,ctx.Done() 是結合 select 使用的。
所以我們又需要一個迴圈來監聽 ctx.Done()

 

for {
    select {
    case         // 傳回結果
}

 

現在我們有兩個 for 了,是不是能夠合併下?

 

for {
    select {
    case resContainer =         sum += resContainer
        fmt.Println("add", resContainer)
    case         fmt.Println("result:", sum)
        return
    }
}

 

誒嘿,看上去不錯。 不過我們怎麼在正常完成微服務呼叫的時候輸出結果呢? 看來我們還需要一個 flag。

 

var count int
for {
    select {
    case resContainer =         sum += resContainer
        count ++
        fmt.Println("add", resContainer)
        if count > 2 {
            fmt.Println("result:", sum)
            return
        }
    case         fmt.Println("timeout result:", sum)
        return
    }
}

 

我們加入一個計數器,因為我們只是呼叫3次微服務,所以當 count 大於2的時候,我們就應該結束並輸出結果了。

 

要點3–併發中的等待

 

這是一種偷懶的方法,因為我們知道了呼叫微服務的次數,如果我們並不知道,或者之後還要添加呢?
手動每次改 count 的判斷閾值會不會太沙雕了?這時候我們就要加入 sync 包了。
我們將會使用的 sync 的一個特性是 WaitGroup。它的作用是等待一組協程運行完畢後,執行接下去的步驟。

 

我們來改下之前微服務呼叫的代碼塊:

 

var success = make(chan int, 1) // 成功的通道標識
wg := sync.WaitGroup{} // 創建一個 waitGroup 組
wg.Add(3) // 我們往組裡加3個標識,因為我們要運行3個任務
go func() {
    resChan     wg.Done() // 完成一個,Done()一個
}()

go func() {
    resChan     wg.Done()
}()

go func() {
    resChan     wg.Done()
}()
wg.Wait() // 直到我們前面三個標識都被 Done 了,否則程式一直會阻塞在這裡
success 1 // 我們發送一個成功信號到通道中

 

既然我們有了 success 這個信號,那麼再把它加入到監控 for 迴圈中,並做些修改,刪除原來 count 判斷的部分。

 

go func() {
    for {
        select {
        case resContainer =             sum += resContainer
            fmt.Println("add", resContainer)
        case             fmt.Println("result:", sum)
            return
        case             fmt.Println("result:", sum)
            return
        }
    }
}()

三個 case,分工明確,一個用來拿服務輸出的結果並計算,一個用來做最終的完成輸出,一個是超時輸出。 同時我們將這個迴圈監聽,也作為協程運行。

至此,所有的主要代碼都完成了。下麵是完全版:

package main

import (
  "context"
  "fmt"
  "net/http"
  "sync"
  "time"

  "github.com/gin-gonic/gin"
)

// 一個請求會觸發呼叫三個服務,每個服務輸出一個 int,
// 請求要求結果為三個服務輸出 int 之和
// 請求傳回時間不超過3秒,大於3秒只輸出已經獲得的 int 之和
func calHandler(c *gin.Context) {
  var resContainer, sum int
  var success, resChan = make(chan int), make(chan int, 3)
  ctx, _ := context.WithTimeout(c, 3*time.Second)

  go func() {
    for {
      select {
      case resContainer =         sum += resContainer
        fmt.Println("add", resContainer)
      case         fmt.Println("result:", sum)
        return
      case         fmt.Println("result:", sum)
        return
      }
    }
  }()

  wg := sync.WaitGroup{}
  wg.Add(3)
  go func() {
    resChan     wg.Done()
  }()

  go func() {
    resChan     wg.Done()
  }()

  go func() {
    resChan     wg.Done()
  }()
  wg.Wait()
  success 1

  return
}

func main() {
  r := gin.New()
  r.GET("/calculate", calHandler)
    http.ListenAndServe(":8008", r)
}

func microService1() int {
  time.Sleep(1*time.Second)
  return 1
}

func microService2() int {
  time.Sleep(2*time.Second)
  return 2
}

func microService3() int {
  time.Sleep(10*time.Second)
  return 3
}

 

上面的程式只是簡單描述了一個呼叫其他微服務超時的處理場景。 實際過程中還需要加很多很多調料,才能保證接口的對外完整性。 大家,講究看下吧~啊哈哈哈哈

赞(0)

分享創造快樂