歡迎光臨
每天分享高質量文章

.NET Core 使用RabbitMQ

作者:莫問今朝乄

鏈接:http://www.cnblogs.com/yan7/p/9498685.html

一、什麼是RabbitMQ

RabbitMQ是一個開源的,基於AMQP(Advanced Message Queuing Protocol)協議的完整,可復用的企業級訊息佇列(Message Queue 一種應用程式與應用程式之間的一種通信方法)系統,RabbitMQ可以實現點對點,發佈訂閱等訊息處理樣式

二、安裝RabbitMQ

網上有許多RabbitMQ的安裝博客,所以在此不介紹。

三、.NET Core中使用RabbitMQ

RabbitMQ從信息接收者角度可以看做三種樣式,一對一,一對多(此一對多並不是發佈訂閱,而是每條信息只有一個接收者)和發佈訂閱。

 

其中一對一是簡單佇列樣式,一對多是Worker樣式,而發佈訂閱包括發佈訂閱樣式,路由樣式和通配符樣式,為什麼說發佈訂閱樣式包含三種樣式呢,其實發佈訂閱,路由,通配符三種樣式都是使用只是交換機(Exchange)型別不一致

 

3.1、簡單佇列

 

首先,我們需要創建兩個控制台專案.Send(發送者)和Receive(接收者),然後為兩個專案安裝RabbitMQ.Client驅動

 

install-package rabbitmq.client

 

然後在Send和Receive專案中編寫我們的訊息佇列代碼

 

發送者代碼

 

using RabbitMQ.Client;
using System;
using System.Text;
namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory conFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection con = conFactory.CreateConnection())//創建連接物件
            {
                using (IModel channel = con.CreateModel())//創建連接會話物件
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //宣告一個佇列
                    channel.QueueDeclare(
                      queue: queueName,//訊息佇列名稱
                      durable: false,//是否快取
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    while (true)
                    {
                        Console.WriteLine("訊息內容:");
                        String message = Console.ReadLine();
                        //訊息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //發送訊息
                        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                        Console.WriteLine("成功發送訊息:" + message);
                    }
                }
            }
        }
    }
}

 

可以看到RabbitMQ使用了IConnectionFactory,IConnection和IModel來創建鏈接和通信管道,IConnection實體物件只負責與Rabbit的連接,而發送接收這些實際操作全部由會話通道進行,而後使用QueneDeclare方法進行創建訊息佇列,創建完成後可以在RabbitMQ的管理工具中看到此佇列,QueneDelare方法需要一個訊息佇列名稱的必須引數.後面那些引數則代表快取,引數等信息。

 

最後使用BasicPublish來發送訊息,在一對一中routingKey必須和 queueName一致

 

接收者代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Receive1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //宣告一個佇列
                    channel.QueueDeclare(
                      queue: queueName,//訊息佇列名稱
                      durable: false,//是否快取
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //創建消費者物件
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                    };
                    //消費者開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

在接收者中是定義一個EventingBasicConsumer物件的消費者(接收者),這個消費者與會話物件關聯,然後定義接收事件,輸出從訊息佇列中接收的資料,最後使用會話物件的BasicConsume方法來啟動消費者監聽.消費者的定義也是如此簡單.

 

不過註意一點,可以看到在接收者代碼中也有宣告佇列的方法,其實這句代碼可以去掉,但是如果去掉的話接收者在程式啟動時監聽佇列,而此時這個佇列還未存在,所以會出異常,所以往往會在消費者中也添加一個宣告佇列方法

 

此時,簡單訊息佇列傳輸就算寫好了,我們可以運行代碼就行測試

 

 

 

3.2、Worker樣式

 

Worker樣式其實是一對多的樣式,但是這個一對多並不是像發佈訂閱那種,而是信息以順序的傳輸給每個接收者,我們可以使用上個例子來運行worker樣式甚至,只需要運行多個接收者即可

 

 

可以看到運行兩個接收者,然後發送者發送了1-5這五個訊息,第一個接收者接收的是奇數,而第二個接收者接收的是偶數,但是現在的worker存在這很大的問題,

 

1、丟失資料:一旦其中一個宕機,那麼另外接收者的無法接收原本這個接收者所要接收的資料

 

2、無法實現能者多勞:如果其中的接收者接收的較慢,那麼便會極大的浪費性能,所以需要實現接收快的多接收

 

下麵針對上面的兩個問題進行處理

 

首先我們先來看一下所說的宕機丟失資料一說,我們在上個例子Receive接收事件中添加執行緒等待

consumer.Received += (model, ea) =>
 {
     Thread.Sleep(1000);//等待1秒,
     byte[] message = ea.Body;//接收到的訊息
     Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
 };

 

然後再次啟動兩個接收者進行測試

 

 

可以看到發送者發送了1-9的數字,第二個接收者在接收資料途中宕機,第一個接收者也並沒有去接收第二個接收者宕機後的資料,有的時候我們會有當接收者宕機後,其餘資料交給其它接收者進行消費,那麼該怎麼進行處理呢,解決這個問題得方法就是改變其訊息確認樣式

 

在Rabbit中存在兩種訊息確認樣式,

 

自動確認:只要訊息從佇列獲取,無論消費者獲取到訊息後是否成功消費,都認為是訊息成功消費,也就是說上面第二個接收者其實已經消費了它所接收的資料

 

手動確認:消費從佇列中獲取訊息後,服務器會將該訊息處於不可用狀態,等待消費者反饋

 

也就是說我們只要將訊息確認樣式改為手動即可,改為手動確認方式只需改兩處,1.開啟監聽時將autoAck引數改為false,2.訊息消費成功後傳回確認

 

consumer.Received += (model, ea) =>
{
     Thread.Sleep(1000);//等待1秒,
     byte[] message = ea.Body;//接收到的訊息
     Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
     //傳回訊息確認
     channel.BasicAck(ea.DeliveryTag, true);
 };
 //消費者開啟監聽
 //將autoAck設置false 關閉自動確認
 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

 

然後再次測試便會出現下麵結果

 

 

能者多勞是建立在手動確認基礎上,下麵修改一下代碼中等待的時間

consumer.Received += (model, ea) =>
{
    Thread.Sleep((new Random().Next(1,6))*1000);//隨機等待,實現能者多勞,
    byte[] message = ea.Body;//接收到的訊息
    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
    //傳回訊息確認
    channel.BasicAck(ea.DeliveryTag, true);
};

 

然後只需要再添加BasicQos方法即可

//宣告一個佇列
channel.QueueDeclare(
  queue: queueName,//訊息佇列名稱
  durable: false,//是否快取
  exclusive: false,
  autoDelete: false,
  arguments: null
 );
//告訴Rabbit每次只能向消費者發送一條信息,再消費者未確認之前,不再向他發送信息
channel.BasicQos(0, 1, false);

 

 

可以看到此時已實現能者多勞

 

worker樣式接收者完整代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Receive1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //宣告一個佇列
                    channel.QueueDeclare(
                      queue: queueName,//訊息佇列名稱
                      durable: false,//是否快取
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //告訴Rabbit每次只能向消費者發送一條信息,再消費者未確認之前,不再向他發送信息
                    channel.BasicQos(0, 1, false);
                    //創建消費者物件
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep((new Random().Next(1,6))*1000);//隨機等待,實現能者多勞,
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //傳回訊息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //消費者開啟監聽
                    //將autoAck設置false 關閉自動確認
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
       }
    }
}

 

3.3、Exchange樣式(發佈訂閱樣式,路由樣式,通配符樣式)

 

前面說過發佈,路由,通配符這三種樣式其實可以算為一種樣式,區別僅僅是交互機型別不同.在這裡出現了一個交換機的東西,發送者將訊息發送發送到交換機,接收者創建各自的訊息佇列系結到交換機,

 

 

通過上面三幅圖可以看出這三種樣式本質就是一種訂閱樣式,路由,通配符樣式只是訂閱樣式的變種樣式。使其可以選擇發送訂閱者中的接收者。

 

註意:交換機本身並不儲存資料,資料儲存在訊息佇列中,所以如果向沒有系結訊息佇列的交換機中發送信息,那麼信息將會丟失

 

下麵依次來看一下這三種樣式

 

發佈訂閱樣式(fanout)

 

發送者代碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send3
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = String.Empty;
                    if (args.Length > 0)
                        exchangeName = args[0];
                    else
                        exchangeName = "exchange1";
                    //宣告交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    while (true)
                    {
                        Console.WriteLine("訊息內容:");
                        String message = Console.ReadLine();
                        //訊息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //發送訊息
                        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
                       Console.WriteLine("成功發送訊息:" + message);
                    }
                }
            }
        }
    }
}

 

發送者代碼與上面沒有什麼差異,只是由上面的訊息佇列宣告變成了交換機宣告(交換機型別為fanout),也就說發送者發送訊息從原來的直接發送訊息佇列變成了發送到交換機

 

接收者代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive3
{
    class Program
    {
        static void Main(string[] args)
        {
            //創建一個隨機數,以創建不同的訊息佇列
            int random = new Random().Next(1, 1000);
            Console.WriteLine("Start"+random.ToString());
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = String.Empty;
                    if (args.Length > 0)
                        exchangeName = args[0];
                    else
                        exchangeName = "exchange1";
                    //宣告交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    //訊息佇列名稱
                    String queueName = exchangeName + "_" + random.ToString();

                    //宣告佇列
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //將佇列與交換機進行系結
                    channel.QueueBind(queue: queueName, exchange: exchangeName,routingKey:"");
                    //宣告為手動確認
                    channel.BasicQos(0, 1, false);
                    //定義消費者
                    var consumer = new EventingBasicConsumer(channel);
                    //接收事件
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //傳回訊息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

可以看到接收者代碼與上面有些差異

 

首先是宣告交換機(同上面一樣,為了防止異常)

 

然後宣告訊息佇列並對交換機進行系結,在這裡使用了隨機數,目的是宣告不重覆的訊息佇列,如果是同一個訊息佇列,則就變成worker樣式,也就是說對於發佈訂閱樣式有多少接收者就有多少個訊息佇列,而這些訊息佇列共同從一個交換機中獲取資料

 

然後同時開兩個接收者,結果就如下

 

 

路由樣式(direct)

 

上面說過路由樣式是訂閱樣式的一個變種樣式,以路由進行匹配發送,例如將訊息1發送給A,B兩個訊息佇列,或者將訊息2發送給B,C兩個訊息佇列,路由樣式的交換機是direct

 

發送者代碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send3
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0) throw new ArgumentException("args");
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = "exchange2";
                    //路由名稱
                    String routeKey = args[0];
                    //宣告交換機   路由交換機型別direct
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
                    while (true)
                    {
                        Console.WriteLine("訊息內容:");
                        String message = Console.ReadLine();
                        //訊息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //發送訊息  發送到路由匹配的訊息佇列中
                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
                        Console.WriteLine("成功發送訊息:" + message);
                    }
                }
            }
        }
    }
}

 

發送者代碼相比上面只改了兩處

 

1、將交換機型別改為了direct型別

 

2、將運行時的第一個引數改成了路由名稱,然後發送資料時由指定路由的訊息佇列進行獲取資料

 

接收者代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive3
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0) throw new ArgumentException("args");
            //創建一個隨機數,以創建不同的訊息佇列
            int random = new Random().Next(1, 1000);
            Console.WriteLine("Start"+random.ToString());
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠物件
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = "exchange2";
                    //宣告交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type:"direct");
                    //訊息佇列名稱
                    String queueName = exchangeName + "_" + random.ToString();
                    //宣告佇列
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //將佇列與交換機進行系結
                    foreach (var routeKey in args)
                    {
                        //匹配多個路由
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);

                    }
                    //宣告為手動確認
                    channel.BasicQos(0, 1, false);
                    //定義消費者
                    var consumer = new EventingBasicConsumer(channel);

                    //接收事件
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的訊息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //傳回訊息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}

 

 

在接收者代碼中的改動點也是與發送者一致,但是一個接收者訊息佇列可以宣告多個路由與交換機進行系結

 

運行情況如下

 

 

通配符樣式(topic)

 

通配符樣式與路由樣式一致,只不過通配符樣式中的路由可以宣告為模糊查詢,RabbitMQ擁有兩個通配符

 

#:匹配0-n個字符陳述句

 

*:匹配一個字符陳述句

 

註意:RabbitMQ中通配符並不像正則中的單個字符,而是一個以“.”分割的字串,如 ”topic1.*“匹配的規則以topic1開始並且”.”後只有一段陳述句的路由  例:“topic1.aaa”,“topic1.bb”

 

發送者代碼

//交換機名稱
String exchangeName = "exchange3";
//路由名稱
String routeKey = args[0];
//宣告交換機   通配符型別為topic
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
while (true)
{
    Console.WriteLine("訊息內容:");
    String message = Console.ReadLine();
     //訊息內容
     byte[] body = Encoding.UTF8.GetBytes(message);
     //發送訊息  發送到路由匹配的訊息佇列中
     channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
     Console.WriteLine("成功發送訊息:" + message);
}

 

修改了兩點:交換機名稱(每個交換機只能宣告一種型別,如果還用exchang2的話就會出異常),交換機型別改為topic

 

接收者代碼

//交換機名稱
String exchangeName = "exchange3";
//宣告交換機    通配符型別為topic
channel.ExchangeDeclare(exchange: exchangeName, type:"topic");
//訊息佇列名稱
String queueName = exchangeName + "_" + random.ToString();
//宣告佇列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
//將佇列與交換機進行系結
foreach (var routeKey in args)
{//匹配多個路由
    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
}
//宣告為手動確認
channel.BasicQos(0, 1, false);
//定義消費者
var consumer = new EventingBasicConsumer(channel);
//接收事件
consumer.Received += (model, ea) =>
{
   byte[] message = ea.Body;//接收到的訊息
   Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
    //傳回訊息確認
    channel.BasicAck(ea.DeliveryTag, true);
};
//開啟監聽
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();

 

接收者修改與發送者一致

 

運行結果如下

 

 

 

赞(0)

分享創造快樂