歡迎光臨
每天分享高質量文章

RabbitMQ一個簡單可靠的方案(.NET Core實現)

來自:編程玩家

鏈接:http://www.cnblogs.com/Erik_Xu/p/9515208.html

前言


最近需要使用到訊息佇列相關技術,於是重新接觸RabbitMQ。其中遇到了不少可靠性方面的問題,歸納了一下,大概有以下幾種:

1、臨時異常,如資料庫網絡閃斷、http請求臨時失效等;

2、時序異常,如A任務依賴於B任務,但可能由於調度或消費者分配的原因,導致A任務先於B任務執行;

3、業務異常,由於系統測試不充分,上線後發現某幾個或某幾種訊息無法正常處理;

4、系統異常,業務中間件無法正常操作,如網絡中斷、資料庫宕機等;

5、非法異常,一些偽造、攻擊型別的訊息。

針對這些異常,我採用了一種基於訊息審計、訊息重試、訊息檢索、訊息重發的方案。

方案

1、訊息均使用Exchange進行通訊,方式可以是direct或topic,不建議fanout。

2、根據業務在Exchange下分配一個或多個Queue,同時設置一個審計執行緒(Audit)監聽所有Queue,用於記錄訊息到MongoDB,同時又不阻塞正常業務處理。

3、生產者(Publisher)在發佈訊息時,基於AMQP協議,生成訊息標識MessageId和時間戳Timestamp,根據訊息業務添加頭信息Headers便於跟蹤。

4、消費者(Comsumer)訊息處理失敗時,則把訊息發送到重試交換機(Retry Exchange),並設置過期(重試)時間及更新重試次數;如果超過重試次數則刪除訊息。

5、重試交換機Exchange設置死信交換機(Dead Letter Exchange),訊息過期後自動轉發到業務交換機(Exchange)。

6、WebApi可以根據訊息標識MessageId、時間戳Timestamp以及頭信息Headers在MongoDB中對訊息進行檢索或重試。

註:選擇MongoDB作為儲存介質的主要原因是其對頭信息(essay-headers)的動態查詢支持較好,同等的替代產品還可以是Elastic Search這些。

生產者(Publisher)


1、設置斷線自動恢復

var factory = new ConnectionFactory

{

  Uri = new Uri(“amqp://guest:[email protected]:5672”),

  AutomaticRecoveryEnabled = true

};

2、定義Exchange,樣式為direct

channel.ExchangeDeclare(“Exchange”, “direct”);

3、根據業務定義QueueA和QueueB

channel.QueueDeclare(“QueueA”, true, false, false);

channel.QueueBind(“QueueA”, “Exchange”, “RouteA”);

channel.QueueDeclare(“QueueB”, true, false, false);

channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);

4、啟動訊息發送確認機制,即需要收到RabbitMQ服務端的確認訊息

channel.ConfirmSelect();

5、設置訊息持久化

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

6、生成訊息標識MessageId、時間戳Timestamp以及頭信息Headers

properties.MessageId = Guid.NewGuid().ToString(“N”);

properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

properties.Headers = new Dictionary

{

  { “key”, “value” + i}

};

7、發送訊息,偶數序列發送到QueueA(RouteA),奇數序列發送到QueueB(RouteB)

channel.BasicPublish(“Exchange”, i % 2 == 0 ? “RouteA” : “RouteB”, properties, body);

8、確定收到RabbitMQ服務端的確認訊息

var isOk = channel.WaitForConfirms();

if (!isOk)

{

  throw new Exception(“The message is not reached to the server!”);

}

完整代碼

var factory = new ConnectionFactory

{

    Uri = new Uri(“amqp://guest:[email protected]:5672”),

    AutomaticRecoveryEnabled = true

};

using (var connection = factory.CreateConnection())

{

    using (var channel = connection.CreateModel())

    {

        channel.ExchangeDeclare(“Exchange”, “direct”);

        channel.QueueDeclare(“QueueA”, true, false, false);

        channel.QueueBind(“QueueA”, “Exchange”, “RouteA”);

        channel.QueueDeclare(“QueueB”, true, false, false);

        channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);

        channel.ConfirmSelect();

        for (var i = 0; i < 2; i++)

        {

            var properties = channel.CreateBasicProperties();

            properties.Persistent = true;

            properties.MessageId = Guid.NewGuid().ToString(“N”);

            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            properties.Headers = new Dictionary

            {

                { “key”, “value” + i}

            };

            var message = “Hello ” + i;

            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(“Exchange”, i % 2 == 0 ? “RouteA” : “RouteB”, properties, body);

            var isOk = channel.WaitForConfirms();

            if (!isOk)

            {

                throw new Exception(“The message is not reached to the server!”);

            }

        }

    }

}

效果:QueueA和QueueB各一條訊息,QueueAudit兩條訊息

註:Exchange下必須先宣告Queue才能接收到訊息,上述代碼並沒有QueueAudit的宣告;需要手動宣告,或者先執行下麵的消費者程式進行宣告。

正常消費者(ComsumerA)


1、設置預取訊息,避免公平輪訓問題,可以根據需要設置預取訊息數,這裡是1

_channel.BasicQos(0, 1, false);

2、宣告Exchange和Queue

_channel.ExchangeDeclare(“Exchange”, “direct”);

_channel.QueueDeclare(“QueueA”, true, false, false);

_channel.QueueBind(“QueueA”, “Exchange”, “RouteA”);

3、編寫回呼函式

var consumer = new EventingBasicConsumer(_channel);

consumer.Received += (model, ea) =>

{

  //The QueueA is always successful.

  try

  {

      _channel.BasicAck(ea.DeliveryTag, false);

  }

  catch (AlreadyClosedException ex)

  {

      _logger.LogCritical(ex, “RabbitMQ is closed!”);

  }

};

_channel.BasicConsume(“QueueA”, false, consumer);

註:設置了RabbitMQ的斷線恢復機制,當RabbitMQ連接不可用時,與MQ通訊的操作會丟擲AlreadyClosedException的異常,導致主執行緒退出,哪怕連接恢復了,程式也無法恢復,因此,需要捕獲處理該異常。

異常消費者(ComsumerB)


1、設置預取訊息

_channel.BasicQos(0, 1, false);

2、宣告Exchange和Queue

_channel.ExchangeDeclare(“Exchange”, “direct”);

_channel.QueueDeclare(“QueueB”, true, false, false);

_channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);

3、設置死信交換機(Dead Letter Exchange)

var retryDic = new Dictionary

{

  {“x-dead-letter-exchange”, “Exchange”},

  {“x-dead-letter-routing-key”, “RouteB”}

};

_channel.ExchangeDeclare(“Exchange_Retry”, “direct”);

_channel.QueueDeclare(“QueueB_Retry”, true, false, false, retryDic);

_channel.QueueBind(“QueueB_Retry”, “Exchange_Retry”, “RouteB_Retry”);

4、重試設置,3次重試;第一次1秒,第二次10秒,第三次30秒

_retryTime = new List

{

  1 * 1000,

  10 * 1000,

  30 * 1000

};

5、獲取當前重試次數

var retryCount = 0;

if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey(“retryCount”))

{

  retryCount = (int)ea.BasicProperties.Headers[“retryCount”];

  _logger.LogWarning($”[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started…”);

}

6、發生異常,判斷是否可以重試

private bool CanRetry(int retryCount)

{

  return retryCount <= _retryTime.Count - 1;

}

7、可以重試,則啟動重試機制

private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea)

{

  var body = ea.Body;

  var properties = ea.BasicProperties;

  properties.Headers = properties.Headers ?? new Dictionary();

  properties.Headers[“retryCount”] = retryCount;

  properties.Expiration = _retryTime[retryCount].ToString();

  try

  {

      _channel.BasicPublish(retryExchange, retryRoute, properties, body);

  }

  catch (AlreadyClosedException ex)

  {

      _logger.LogCritical(ex, “RabbitMQ is closed!”);

  }

}


完整代碼

_channel.BasicQos(0, 1, false);

_channel.ExchangeDeclare(“Exchange”, “direct”);

_channel.QueueDeclare(“QueueB”, true, false, false);

_channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);

var retryDic = new Dictionary

{

    {“x-dead-letter-exchange”, “Exchange”},

    {“x-dead-letter-routing-key”, “RouteB”}

};

_channel.ExchangeDeclare(“Exchange_Retry”, “direct”);

_channel.QueueDeclare(“QueueB_Retry”, true, false, false, retryDic);

_channel.QueueBind(“QueueB_Retry”, “Exchange_Retry”, “RouteB_Retry”);

var consumer = new EventingBasicConsumer(_channel);

consumer.Received += (model, ea) =>

{

    //The QueueB is always failed.

    bool canAck;

    var retryCount = 0;

    if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey(“retryCount”))

    {

        retryCount = (int)ea.BasicProperties.Headers[“retryCount”];

        _logger.LogWarning($”[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started…”);

    }

    try

    {

        Handle();

        canAck = true;

    }

    catch (Exception ex)

    {

        _logger.LogCritical(ex, “Error!”);

        if (CanRetry(retryCount))

        {

            SetupRetry(retryCount, “Exchange_Retry”, “RouteB_Retry”, ea);

            canAck = true;

        }

        else

        {

            canAck = false;

        }

    }

    try

    {

        if (canAck)

        {

            _channel.BasicAck(ea.DeliveryTag, false);

        }

        else

        {

            _channel.BasicNack(ea.DeliveryTag, false, false);

        }

    }

    catch (AlreadyClosedException ex)

    {

        _logger.LogCritical(ex, “RabbitMQ is closed!”);

    }

};

_channel.BasicConsume(“QueueB”, false, consumer);

審計消費者(Audit Comsumer)


1、宣告Exchange和Queue

_channel.ExchangeDeclare(“Exchange”, “direct”);

_channel.QueueDeclare(“QueueAudit”, true, false, false);

_channel.QueueBind(“QueueAudit”, “Exchange”, “RouteA”);

_channel.QueueBind(“QueueAudit”, “Exchange”, “RouteB”);

2、排除死信Exchange轉發過來的重覆訊息

if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey(“x-death”))

{

  …

}

 

3、生成訊息物體

var message = new Message

{

  MessageId = ea.BasicProperties.MessageId,

  Body = ea.Body,

  Exchange = ea.Exchange,

  Route = ea.RoutingKey

};

4、RabbitMQ會用bytes來儲存字串,因此,要把頭中bytes轉回字串

if (ea.BasicProperties.Headers != null)

{

  var essay-headers = new Dictionary();

  foreach (var essay-header in ea.BasicProperties.Headers)

  {

      if (essay-header.Value is byte[] bytes)

      {

          essay-headers[essay-header.Key] = Encoding.UTF8.GetString(bytes);

      }

      else

      {

          essay-headers[essay-header.Key] = essay-header.Value;

      }

  }

  message.Headers = essay-headers;

}

5、把Unix格式的Timestamp轉成UTC時間

if (ea.BasicProperties.Timestamp.UnixTime > 0)

{

  message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;

  var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);

  message.Timestamp = offset.UtcDateTime;

}

6、訊息存入MongoDB

_mongoDbContext.Collection().InsertOne(message, cancellationToken: cancellationToken);

MongoDB記錄:

重試記錄:

訊息檢索及重發(WebApi)


1、通過訊息Id檢索訊息

2、通過頭訊息檢索訊息

3、訊息重發,會重新生成MessageId

Ack,Nack,Reject的關係


1、訊息處理成功,執行Ack,RabbitMQ會把訊息從佇列中刪除。

2、訊息處理失敗,執行Nack或者Reject:

a) 當requeue=true時,訊息會重新回到佇列,然後當前消費者會馬上再取回這條訊息;

b) 當requeue=false時,如果Exchange有設置Dead Letter Exchange,則訊息會去到Dead Letter Exchange;

c) 當requeue=false時,如果Exchange沒設置Dead Letter Exchange,則訊息從佇列中刪除,效果與Ack相同。

3、Nack與Reject的區別在於:Nack可以批量操作,Reject只能單條操作。

RabbitMQ自動恢復


連接(Connection)恢復

1、重連(Reconnect)

2、恢復連接監聽(Listeners)

3、重新打開通道(Channels)

4、恢復通道監聽(Listeners)

5、恢復basic.qos,publisher confirms以及transaction設置

拓撲(Topology)恢復

1、重新宣告交換機(Exchanges)

2、重新宣告佇列(Queues)

3、恢復所有系結(Bindings)

4、恢復所有消費者(Consumers)

異常處理機制


1、臨時異常,如資料庫網絡閃斷、http請求臨時失效等通過短時間重試(如1秒後)的方式處理,也可以考慮Nack/Reject來實現重試(時效性更高)。

2、時序異常,如A任務依賴於B任務,但可能由於調度或消費者分配的原因,導致A任務先於B任務執行通過長時間重試(如1分鐘、30分鐘、1小時、1天等),等待B任務先執行完的方式處理。

3、業務異常,由於系統測試不充分,上線後發現某幾個或某幾種訊息無法正常處理

等系統修正後,通過訊息重發的方式處理。

4、系統異常,業務中間件無法正常操作,如網絡中斷、資料庫宕機等等系統恢復後,通過訊息重發的方式處理。

5、非法異常,一些偽造、攻擊型別的訊息多次重試失敗後,訊息從佇列中被刪除,也可以針對此業務做進一步處理。

原始碼地址:https://github.com/ErikXu/RabbitMesage


看完本文有收穫?請轉發分享給更多人



●編號137,輸入編號直達本文

●輸入m獲取文章目錄

推薦↓↓↓

Web開發

更多推薦18個技術類公眾微信

涵蓋:程式人生、演算法與資料結構、黑客技術與網絡安全、大資料技術、前端開發、Java、Python、Web開發、安卓開發、iOS開發、C/C++、.NET、Linux、資料庫、運維等。

赞(0)

分享創造快樂