歡迎光臨
每天分享高質量文章

程式員修神之路–高併發優雅的做限流(有福利)

點擊上方藍色字體,關註我們

菜菜哥,有時間嗎?

YY妹,什麼事?

我最近的任務是做個小的秒殺活動,我怕把後端接口壓垮,X總說這可關係到公司的存亡

簡單呀,你就做個限流唄

這個沒做過呀,菜菜哥,幫妹子寫一個唄,事成了,以後有什麼要求隨便說

那好呀,先把我工資漲一下

那算了,我找別人幫忙吧

跟你開玩笑呢,給哥2個小時時間

謝謝菜菜哥,以後你什麼要求我都答應你

好嘞,年輕人就是豪爽

◆◆
技術分析
◆◆
如果你比較關註現在的技術形式,就會知道微服務現在火的一塌糊塗,當然,事物都有兩面性,微服務也不是解決技術,架構等問題的萬能鑰匙。如果服務化帶來的利大於弊,菜菜還是推薦將系統服務化。隨著服務化的行程的不斷演化,各種概念以及技術隨之而來。任何一種方案都是為瞭解決問題而存在。比如:熔斷設計,接口冪等性設計,重試機制設計,還有今天菜菜要說的限流設計,等等這些技術幾乎都充斥在每個系統中。

        就今天來說的限流,書面意思和作用一致,就是為了限制,通過對併發訪問或者請求進行限速或者一個時間視窗內的請求進行限速來保護系統。一旦達到了限制的臨界點,可以用拒絕服務、排隊、或者等待的方式來保護現有系統,不至於發生雪崩現象。

        限流就像做帝都的地鐵一般,如果你住在西二旗或者天通苑也許會體會的更深刻一些。我更習慣在技術角度用消費者的角度來闡述,需要限流的一般原因是消費者能力有限,目的為了避免超過消費者能力而出現系統故障。當然也有其他類似的情況也可以用限流來解決。

限流的表現形式上大部分可以分為兩大類:

1.  限制消費者數量。也可以說消費的最大能力值。比如:資料庫的連接池是側重的是總的連接數。還有菜菜以前寫的執行緒池,本質上也是限制了消費者的最大消費能力。

2.  可以被消費的請求數量。這裡的數量可以是瞬時併發數,也可以是一段時間內的總併發數。菜菜今天要幫YY妹子做的也是這個。

        除此之外,限流還有別的表現形式,例如按照網絡流量來限流,按照cpu使用率來限流等。按照限流的範圍又可以分為分佈式限流,應用限流,接口限流等。無論怎麼變化,限流都可以用以下圖來表示:

◆◆
常用技術實現
◆◆
 
令牌桶演算法
        令牌桶是一個存放固定容量令牌的桶,按照固定速率往桶里添加令牌,填滿了就丟棄令牌,請求是否被處理要看桶中令牌是否足夠,當令牌數減為零時則拒絕新的請求。令牌桶允許一定程度突發流量,只要有令牌就可以處理,支持一次拿多個令牌。令牌桶中裝的是令牌。

 

漏桶演算法
        漏桶一個固定容量的漏桶,按照固定常量速率流出請求,流入請求速率任意,當流入的請求數累積到漏桶容量時,則新流入的請求被拒絕。漏桶可以看做是一個具有固定容量、固定流出速率的佇列,漏桶限制的是請求的流出速率。漏桶中裝的是請求。

計數器
        有時我們還會使用計數器來進行限流,主要用來限制一定時間內的總併發數,比如資料庫連接池、執行緒池、秒殺的併發數;計數器限流只要一定時間內的總請求數超過設定的閥值則進行限流,是一種簡單粗暴的總數量限流,而不是平均速率限流。 

除此之外,其實根據不同的業務場景,還可以出現很多不同的限流演算法,但是總的規則只有一條:只要符合當前業務場景的限流策略就是最好的
限流的其他基礎知識請百度!!
 
◆◆
優雅解決妹子問題
◆◆
 
        回歸問題,YY妹子的問題,菜菜不准備用以上所說的幾種演算法來幫助她。菜菜準備用一個按照時間段限制請求總數的方式來限流。 總體思路是這樣:1.  用一個環形來代表通過的請求容器。

2.  用一個指標指向當前請求所到的位置索引,來判斷當前請求時間和當前位置上次請求的時間差,依此來判斷是否被限制。

3.  如果請求通過,則當前指標向前移動一個位置,不通過則不移動位置

4.  重覆以上步驟 直到永遠…….

 
◆◆
用代碼說話才是王道
◆◆
 
以下代碼不改或者稍微修改可用於生產環境
 
以下代碼的核心思路是這樣的:指標當前位置的時間元素和當前時間的差來決定是否允許此次請求,這樣通過的請求在時間上表現的比較平滑。
思路遠比語言重要,任何語言也可為之,請phper,golanger,javaer 自行實現一遍即可
 
//限流組件,採用陣列做為一個環
    class LimitService
    {
        //當前指標的位置
        int currentIndex = 0;
        //限制的時間的秒數,即:x秒允許多少請求
        int limitTimeSencond = 1;
        //請求環的容器陣列
        DateTime?[] requestRing = null;
        //容器改變或者移動指標時候的鎖
        object objLock = new object();

        public LimitService(int countPerSecond,int  _limitTimeSencond)
        {
            requestRing = new DateTime?[countPerSecond];
            limitTimeSencond= _limitTimeSencond;
        }

        //程式是否可以繼續
        public bool IsContinue()
        {
            lock (objLock)
            {
                var currentNode = requestRing[currentIndex];
                //如果當前節點的值加上設置的秒 超過當前時間,說明超過限制
                if (currentNode != null&& currentNode.Value.AddSeconds(limitTimeSencond) >DateTime.Now)
                {
                    return false;
                }
                //當前節點設置為當前時間
                requestRing[currentIndex] = DateTime.Now;
                //指標移動一個位置
                MoveNextIndex(ref currentIndex);
            }            
            return true;
        }
        //改變每秒可以通過的請求數
        public bool ChangeCountPerSecond(int countPerSecond)
        {
            lock (objLock)
            {
                requestRing = new DateTime?[countPerSecond];
                currentIndex = 0;
            }
            return true;
        }

        //指標往前移動一個位置
        private void MoveNextIndex(ref int currentIndex)
        {
            if (currentIndex != requestRing.Length - 1)
            {
                currentIndex = currentIndex + 1;
            }
            else
            {
                currentIndex = 0;
            }
        }
    }
 
測試程式如下:
static  LimitService l = new LimitService(10001);
        static void Main(string[] args)
        {
            int threadCount = 50;
            while (threadCount >= 0)
            {
                Thread t = new Thread(s =>
                {
                    Limit();
                });
                t.Start();
                threadCount--;
            }           

            Console.Read();
        }

        static void Limit()
        {
            int i = 0;
            int okCount = 0;
            int noCount = 0;
            Stopwatch w = new Stopwatch();
            w.Start();
            while (i 1000000)
            {
                var ret = l.IsContinue();
                if (ret)
                {
                    okCount++;
                }
                else
                {
                    noCount++;
                }
                i++;
            }
            w.Stop();
            Console.WriteLine($"共用{w.ElapsedMilliseconds},允許:{okCount},  攔截:{noCount}");
        }

 

測試結果如下: 

 

 

最大用時15秒,共處理請求1000000*50=50000000 次並未發生GC操作,記憶體使用率非常低,每秒處理 300萬次+請求 。以上程式修改為10個執行緒,大約用時4秒之內

 

如果是強勁的服務器或者執行緒數較少情況下處理速度將會更快
寫在最後
以上代碼雖然簡單,但是卻為限流的核心代碼(其實還有優化餘地),經過其他封裝可以適用於Webapi的filter或其他場景。妹子問題解決了,要不要讓她請我吃個飯呢?

    赞(0)

    分享創造快樂