歡迎光臨
每天分享高質量文章

如何在ASP.NET Core中使用Azure Service Bus Queue

原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES

作者:damienbod[1]

譯文:如何在ASP.NET Core中使用Azure Service Bus Queue

地址:https://www.cnblogs.com/lwqlun/p/10760227.html

作者:Lamond Lu

原始碼:https://github.com/lamondlu/AzureServiceBusMessaging

本文展示瞭如何使用Azure Service Bus Queue, 實現2個ASP.NET Core Api應用之間的訊息傳輸。

配置Azure Service Bus Queue

你可以從官網文件中瞭解到如何配置一個Azure Service Bus Queue.

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

這裡我們使用Queue或者Topic來實現訊息傳輸。Queue是一種訊息傳輸型別,一旦一個訊息被一個消費者接收了,該訊息就會從Queue中被移除。

與Queue不同,Topic提供的是一對多的通訊方式。

架構圖

整個應用的實現如下:

Api 1負責發送訊息Api 2負責監聽Azure Service Bus,並處理接收到的訊息

實現一個Service Bus Queue

這裡我們首先需要引入Microsoft.Azure.ServiceBus[2]  程式集。Microsoft.Azure.ServiceBus[3]是Azure Service Bus的客戶端庫。針對Service Bus的連接字串我們儲存在專案的User Secret中。當部署專案的時候,我們可以使用Azure Key Valut來設置這個Secret值。

在Visual Studio中,右鍵點擊API1, API2專案屬性,選擇Manage User Secrets就可以管理當前專案使用的所有私密信息。

為了發送向Azure Service Bus Queue發送訊息,我們需要創建一個SendMessage方法,並接收一個訊息引數。這裡我們創建了一個我們自己的訊息內容型別MyPayload, 將當前該MyPayload物件序列化成Json字串, 添加到一個Message物件中。

using Microsoft.Azure.ServiceBus;using Microsoft.Extensions.Configuration;using Newtonsoft.Json;using System.Text;using System.Threading.Tasks;
namespace ServiceBusMessaging{    public class ServiceBusSender    {        private readonly QueueClient _queueClient;        private readonly IConfiguration _configuration;        private const string QUEUE_NAME = "simplequeue";
        public ServiceBusSender(IConfiguration configuration)        {            _configuration = configuration;            _queueClient = new QueueClient(            _configuration                .GetConnectionString("ServiceBusConnectionString"),                 QUEUE_NAME);        }
        public async Task SendMessage(MyPayload payload)        {            string data = JsonConvert.SerializeObject(payload);            Message message = new Message(Encoding.UTF8.GetBytes(data));
            await _queueClient.SendAsync(message);        }    }}

在API 1和API 2中,我們需要將ServiceBusSender註冊到應用程式的IOC容器中。這裡為了測試方便,我們同時註冊Swagger服務。

public void ConfigureServices(IServiceCollection services){    services.AddMvc();
    services.AddScoped();
    services.AddSwaggerGen(c =>    {        c.SwaggerDoc("v1", new Info        {            Version = "v1",            Title = "Payload View API",        });    });}

接下來,我們就可以在控制器中通過建構式註入的方式使用這個服務了。

在API1中,我們創建一個POST方法,這個方法會將API接收到Payload物件發送到Azure Service Bus Queue中。

[HttpPost][ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)][ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]public async Task Create([FromBody][Required]Payload request){    if (data.Any(d => d.Id == request.Id))    {        return Conflict($"data with id {request.Id} already exists");    }
    data.Add(request);
    // Send this to the bus for the other services    await _serviceBusSender.SendMessage(new MyPayload    {        Goals = request.Goals,        Name = request.Name,        Delete = false    });
    return Ok(request);}

從Queue中獲取訊息

為了監聽Azure Service Bus Queue, 並處理接收到的訊息,我們創建了一個新類ServiceBusConsumerServiceBusConsumer實現了IServiceBusConsumer接口。

Queue的連接字串是使用IConfiguration讀取的。RegisterOnMessageHandlerAndReceiveMessages方法負責註冊訊息處理程式ProcessMessagesAsync處理訊息。ProcessMessagesAsync方法會將得到的訊息轉換成物件,並呼叫IProcessData接口完成最終的訊息處理。

using Microsoft.Azure.ServiceBus;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.Logging;using Newtonsoft.Json;using System.Text;using System.Threading;using System.Threading.Tasks;
namespace ServiceBusMessaging{    public interface IServiceBusConsumer    {        void RegisterOnMessageHandlerAndReceiveMessages();        Task CloseQueueAsync();    }
    public class ServiceBusConsumer : IServiceBusConsumer    {        private readonly IProcessData _processData;        private readonly IConfiguration _configuration;        private readonly QueueClient _queueClient;        private const string QUEUE_NAME = "simplequeue";        private readonly ILogger _logger;
        public ServiceBusConsumer(IProcessData processData,             IConfiguration configuration,             ILogger logger)        {            _processData = processData;            _configuration = configuration;            _logger = logger;            _queueClient = new QueueClient(              _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);        }
        public void RegisterOnMessageHandlerAndReceiveMessages()        {            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)            {                MaxConcurrentCalls = 1,                AutoComplete = false            };
            _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);        }
        private async Task ProcessMessagesAsync(Message message, CancellationToken token)        {            var myPayload = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(message.Body));            _processData.Process(myPayload);            await _queueClient.CompleteAsync(message.SystemProperties.LockToken);        }
        private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)        {            _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
            _logger.LogDebug($"- Endpoint: {context.Endpoint}");            _logger.LogDebug($"- Entity Path: {context.EntityPath}");            _logger.LogDebug($"- Executing Action: {context.Action}");
            return Task.CompletedTask;        }
        public async Task CloseQueueAsync()        {            await _queueClient.CloseAsync();        }    }}

其中IProcessData接口存在於類庫專案ServiceBusMessaging中,它是用來處理訊息的。

public interface IProcessData{    void Process(MyPayload myPayload);}

在Api 2中,我們創建一個ProcessData類,它實現了IProcessData接口。

public class ProcessData : IProcessData{    public void Process(MyPayload myPayload)    {        DataServiceSimi.Data.Add(new Payload        {            Name = myPayload.Name,            Goals = myPayload.Goals        });    }}

這裡為了簡單測試,我們創建了一個靜態類DataServiceSimi,其中存放了API2中所有儲存Payload物件。同時,我們還創建了一個新的控制器ViewPayloadMessagesController,在其中添加了一個GET Action,並傳回了靜態類DataServiceSimi中的所有資料。

[Route("api/[controller]")][ApiController]public class ViewPayloadMessagesController : ControllerBase{    [HttpGet]    [ProducesResponseType(StatusCodes.Status200OK)]    public ActionResult> Get()    {        return Ok(DataServiceSimi.Data);    }}

最後我們還需要將ProcessData註冊到API2的IOC容器中。

public void ConfigureServices(IServiceCollection services){    services.AddMvc();
    services.AddSingleton();    services.AddTransient();}

最終效果

現在我們分別啟用2個Api專案,併在Api 1的Swagger文件界面,呼叫POST請求,添加一個Payload

操作完成之後,我們訪問Api 2的/api/ViewPayloadMessages, 獲得結果如下,Api 1發出的訊息出現在了Api 2的結果集中,這說明Api 2從Azure Service Bus Queue中獲取了訊息,並儲存在了自己的靜態類DataServiceSimi中。

References

[1] damienbod: https://damienbod.com/author/damienbod/
[2] Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
[3] Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus

    已同步到看一看
    赞(0)

    分享創造快樂