歡迎光臨
每天分享高質量文章

.NET Core 使用RabbitMQ

作者:莫問今朝乄

連結:http://www.cnblogs.com/yan7/p/9498685.html

一、什麼是RabbitMQ

RabbitMQ是一個開源的,基於AMQP(Advanced Message Queuing Protocol)協議的完整,可復用的企業級訊息佇列(Message Queue 一種應用程式與應用程式之間的一種通訊方法)系統,RabbitMQ可以實現點對點,釋出訂閱等訊息處理樣式

二、安裝RabbitMQ

網上有許多RabbitMQ的安裝部落格,所以在此不介紹。

三、.NET Core中使用RabbitMQ

RabbitMQ從資訊接收者角度可以看做三種樣式,一對一,一對多(此一對多並不是釋出訂閱,而是每條資訊只有一個接收者)和釋出訂閱。

 

其中一對一是簡單佇列樣式,一對多是Worker樣式,而釋出訂閱包括釋出訂閱樣式,路由樣式和萬用字元樣式,為什麼說釋出訂閱樣式包含三種樣式呢,其實釋出訂閱,路由,萬用字元三種樣式都是使用只是交換機(Exchange)型別不一致

 

3.1、簡單佇列

 

首先,我們需要建立兩個控制檯專案.Send(傳送者)和Receive(接收者),然後為兩個專案安裝RabbitMQ.Client驅動

 

install-package rabbitmq.client

 

然後在Send和Receive專案中編寫我們的訊息佇列程式碼

 

傳送者程式碼

 

using RabbitMQ.Client;
using System;
using System.Text;
namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory conFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection con = conFactory.CreateConnection())//建立連線物件
            {
                using (IModel channel = con.CreateModel())//建立連線會話物件
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //宣告一個佇列
                    channel.QueueDeclare(
                      queue: queueName,//訊息佇列名稱
                      durable: false,//是否快取
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    while (true)
                    {
                        Console.WriteLine("訊息內容:");
                        String message = Console.ReadLine();
                        //訊息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //傳送訊息
                        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                        Console.WriteLine("成功傳送訊息:" + message);
                    }
                }
            }
        }
    }
}

 

可以看到RabbitMQ使用了IConnectionFactory,IConnection和IModel來建立連結和通訊管道,IConnection實體物件只負責與Rabbit的連線,而傳送接收這些實際操作全部由會話通道進行,而後使用QueneDeclare方法進行建立訊息佇列,建立完成後可以在RabbitMQ的管理工具中看到此佇列,QueneDelare方法需要一個訊息佇列名稱的必須引數.後面那些引數則代表快取,引數等資訊。

 

最後使用BasicPublish來傳送訊息,在一對一中routingKey必須和 queueName一致

 

接收者程式碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Receive1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //宣告一個佇列
                    channel.QueueDeclare(
                      queue: queueName,//訊息佇列名稱
                      durable: false,//是否快取
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //建立消費者物件
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
                    };
                    //消費者開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

在接收者中是定義一個EventingBasicConsumer物件的消費者(接收者),這個消費者與會話物件關聯,然後定義接收事件,輸出從訊息佇列中接收的資料,最後使用會話物件的BasicConsume方法來啟動消費者監聽.消費者的定義也是如此簡單.

 

不過註意一點,可以看到在接收者程式碼中也有宣告佇列的方法,其實這句程式碼可以去掉,但是如果去掉的話接收者在程式啟動時監聽佇列,而此時這個佇列還未存在,所以會出異常,所以往往會在消費者中也新增一個宣告佇列方法

 

此時,簡單訊息佇列傳輸就算寫好了,我們可以執行程式碼就行測試

 

 

 

3.2、Worker樣式

 

Worker樣式其實是一對多的樣式,但是這個一對多並不是像釋出訂閱那種,而是資訊以順序的傳輸給每個接收者,我們可以使用上個例子來執行worker樣式甚至,只需要執行多個接收者即可

 

 

可以看到執行兩個接收者,然後傳送者發送了1-5這五個訊息,第一個接收者接收的是奇數,而第二個接收者接收的是偶數,但是現在的worker存在這很大的問題,

 

1、丟失資料:一旦其中一個宕機,那麼另外接收者的無法接收原本這個接收者所要接收的資料

 

2、無法實現能者多勞:如果其中的接收者接收的較慢,那麼便會極大的浪費效能,所以需要實現接收快的多接收

 

下麵針對上面的兩個問題進行處理

 

首先我們先來看一下所說的宕機丟失資料一說,我們在上個例子Receive接收事件中新增執行緒等待

consumer.Received += (model, ea) =>
 {
     Thread.Sleep(1000);//等待1秒,
     byte[] message = ea.Body;//接收到的訊息
     Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
 };

 

然後再次啟動兩個接收者進行測試

 

 

可以看到傳送者發送了1-9的數字,第二個接收者在接收資料途中宕機,第一個接收者也並沒有去接收第二個接收者宕機後的資料,有的時候我們會有當接收者宕機後,其餘資料交給其它接收者進行消費,那麼該怎麼進行處理呢,解決這個問題得方法就是改變其訊息確認樣式

 

在Rabbit中存在兩種訊息確認樣式,

 

自動確認:只要訊息從佇列獲取,無論消費者獲取到訊息後是否成功消費,都認為是訊息成功消費,也就是說上面第二個接收者其實已經消費了它所接收的資料

 

手動確認:消費從佇列中獲取訊息後,伺服器會將該訊息處於不可用狀態,等待消費者反饋

 

也就是說我們只要將訊息確認樣式改為手動即可,改為手動確認方式只需改兩處,1.開啟監聽時將autoAck引數改為false,2.訊息消費成功後傳回確認

 

consumer.Received += (model, ea) =>
{
     Thread.Sleep(1000);//等待1秒,
     byte[] message = ea.Body;//接收到的訊息
     Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
     //傳回訊息確認
     channel.BasicAck(ea.DeliveryTag, true);
 };
 //消費者開啟監聽
 //將autoAck設定false 關閉自動確認
 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

 

然後再次測試便會出現下麵結果

 

 

能者多勞是建立在手動確認基礎上,下麵修改一下程式碼中等待的時間

consumer.Received += (model, ea) =>
{
    Thread.Sleep((new Random().Next(1,6))*1000);//隨機等待,實現能者多勞,
    byte[] message = ea.Body;//接收到的訊息
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
    //傳回訊息確認
    channel.BasicAck(ea.DeliveryTag, true);
};

 

然後只需要再新增BasicQos方法即可

//宣告一個佇列
channel.QueueDeclare(
  queue: queueName,//訊息佇列名稱
  durable: false,//是否快取
  exclusive: false,
  autoDelete: false,
  arguments: null
 );
//告訴Rabbit每次只能向消費者傳送一條資訊,再消費者未確認之前,不再向他傳送資訊
channel.BasicQos(0, 1, false);

 

 

可以看到此時已實現能者多勞

 

worker樣式接收者完整程式碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Receive1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //宣告一個佇列
                    channel.QueueDeclare(
                      queue: queueName,//訊息佇列名稱
                      durable: false,//是否快取
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //告訴Rabbit每次只能向消費者傳送一條資訊,再消費者未確認之前,不再向他傳送資訊
                    channel.BasicQos(0, 1, false);
                    //建立消費者物件
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep((new Random().Next(1,6))*1000);//隨機等待,實現能者多勞,
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
                        //傳回訊息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //消費者開啟監聽
                    //將autoAck設定false 關閉自動確認
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
       }
    }
}

 

3.3、Exchange樣式(釋出訂閱樣式,路由樣式,萬用字元樣式)

 

前面說過釋出,路由,萬用字元這三種樣式其實可以算為一種樣式,區別僅僅是互動機型別不同.在這裡出現了一個交換機的東西,傳送者將訊息傳送傳送到交換機,接收者建立各自的訊息佇列系結到交換機,

 

 

透過上面三幅圖可以看出這三種樣式本質就是一種訂閱樣式,路由,萬用字元樣式只是訂閱樣式的變種樣式。使其可以選擇傳送訂閱者中的接收者。

 

註意:交換機本身並不儲存資料,資料儲存在訊息佇列中,所以如果向沒有系結訊息佇列的交換機中傳送資訊,那麼資訊將會丟失

 

下麵依次來看一下這三種樣式

 

釋出訂閱樣式(fanout)

 

傳送者程式碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send3
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = String.Empty;
                    if (args.Length > 0)
                        exchangeName = args[0];
                    else
                        exchangeName = "exchange1";
                    //宣告交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    while (true)
                    {
                        Console.WriteLine("訊息內容:");
                        String message = Console.ReadLine();
                        //訊息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //傳送訊息
                        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
                       Console.WriteLine("成功傳送訊息:" + message);
                    }
                }
            }
        }
    }
}

 

傳送者程式碼與上面沒有什麼差異,只是由上面的訊息佇列宣告變成了交換機宣告(交換機型別為fanout),也就說傳送者傳送訊息從原來的直接傳送訊息佇列變成了傳送到交換機

 

接收者程式碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive3
{
    class Program
    {
        static void Main(string[] args)
        {
            //建立一個隨機數,以建立不同的訊息佇列
            int random = new Random().Next(1, 1000);
            Console.WriteLine("Start"+random.ToString());
            IConnectionFactory connFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = String.Empty;
                    if (args.Length > 0)
                        exchangeName = args[0];
                    else
                        exchangeName = "exchange1";
                    //宣告交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    //訊息佇列名稱
                    String queueName = exchangeName + "_" + random.ToString();

                    //宣告佇列
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //將佇列與交換機進行系結
                    channel.QueueBind(queue: queueName, exchange: exchangeName,routingKey:"");
                    //宣告為手動確認
                    channel.BasicQos(0, 1, false);
                    //定義消費者
                    var consumer = new EventingBasicConsumer(channel);
                    //接收事件
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
                        //傳回訊息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

可以看到接收者程式碼與上面有些差異

 

首先是宣告交換機(同上面一樣,為了防止異常)

 

然後宣告訊息佇列並對交換機進行系結,在這裡使用了隨機數,目的是宣告不重覆的訊息佇列,如果是同一個訊息佇列,則就變成worker樣式,也就是說對於釋出訂閱樣式有多少接收者就有多少個訊息佇列,而這些訊息佇列共同從一個交換機中獲取資料

 

然後同時開兩個接收者,結果就如下

 

 

路由樣式(direct)

 

上面說過路由樣式是訂閱樣式的一個變種樣式,以路由進行匹配傳送,例如將訊息1傳送給A,B兩個訊息佇列,或者將訊息2傳送給B,C兩個訊息佇列,路由樣式的交換機是direct

 

傳送者程式碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send3
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0) throw new ArgumentException("args");
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = "exchange2";
                    //路由名稱
                    String routeKey = args[0];
                    //宣告交換機   路由交換機型別direct
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
                    while (true)
                    {
                        Console.WriteLine("訊息內容:");
                        String message = Console.ReadLine();
                        //訊息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //傳送訊息  傳送到路由匹配的訊息佇列中
                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
                        Console.WriteLine("成功傳送訊息:" + message);
                    }
                }
            }
        }
    }
}

 

傳送者程式碼相比上面只改了兩處

 

1、將交換機型別改為了direct型別

 

2、將執行時的第一個引數改成了路由名稱,然後傳送資料時由指定路由的訊息佇列進行獲取資料

 

接收者程式碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive3
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0) throw new ArgumentException("args");
            //建立一個隨機數,以建立不同的訊息佇列
            int random = new Random().Next(1, 1000);
            Console.WriteLine("Start"+random.ToString());
            IConnectionFactory connFactory = new ConnectionFactory//建立連線工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//埠號
                UserName = "yan",//使用者賬號
                Password = "yan"//使用者密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = "exchange2";
                    //宣告交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type:"direct");
                    //訊息佇列名稱
                    String queueName = exchangeName + "_" + random.ToString();
                    //宣告佇列
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //將佇列與交換機進行系結
                    foreach (var routeKey in args)
                    {
                        //匹配多個路由
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);

                    }
                    //宣告為手動確認
                    channel.BasicQos(0, 1, false);
                    //定義消費者
                    var consumer = new EventingBasicConsumer(channel);

                    //接收事件
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
                        //傳回訊息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

 

在接收者程式碼中的改動點也是與傳送者一致,但是一個接收者訊息佇列可以宣告多個路由與交換機進行系結

 

執行情況如下

 

 

萬用字元樣式(topic)

 

萬用字元樣式與路由樣式一致,只不過萬用字元樣式中的路由可以宣告為模糊查詢,RabbitMQ擁有兩個萬用字元

 

#:匹配0-n個字元陳述句

 

*:匹配一個字元陳述句

 

註意:RabbitMQ中萬用字元並不像正則中的單個字元,而是一個以“.”分割的字串,如 ”topic1.*“匹配的規則以topic1開始並且”.”後只有一段陳述句的路由  例:“topic1.aaa”,“topic1.bb”

 

傳送者程式碼

//交換機名稱
String exchangeName = "exchange3";
//路由名稱
String routeKey = args[0];
//宣告交換機   萬用字元型別為topic
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
while (true)
{
    Console.WriteLine("訊息內容:");
    String message = Console.ReadLine();
     //訊息內容
     byte[] body = Encoding.UTF8.GetBytes(message);
     //傳送訊息  傳送到路由匹配的訊息佇列中
     channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
     Console.WriteLine("成功傳送訊息:" + message);
}

 

修改了兩點:交換機名稱(每個交換機只能宣告一種型別,如果還用exchang2的話就會出異常),交換機型別改為topic

 

接收者程式碼

//交換機名稱
String exchangeName = "exchange3";
//宣告交換機    萬用字元型別為topic
channel.ExchangeDeclare(exchange: exchangeName, type:"topic");
//訊息佇列名稱
String queueName = exchangeName + "_" + random.ToString();
//宣告佇列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
//將佇列與交換機進行系結
foreach (var routeKey in args)
{//匹配多個路由
    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
}
//宣告為手動確認
channel.BasicQos(0, 1, false);
//定義消費者
var consumer = new EventingBasicConsumer(channel);
//接收事件
consumer.Received += (model, ea) =>
{
   byte[] message = ea.Body;//接收到的訊息
   Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message));
    //傳回訊息確認
    channel.BasicAck(ea.DeliveryTag, true);
};
//開啟監聽
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();

 

接收者修改與傳送者一致

 

執行結果如下

 

 

 

贊(0)

分享創造快樂