歡迎光臨
每天分享高質量文章

在Asp.Net Core中集成Kafka

  在我們的業務中,我們通常需要在自己的業務子系統之間相互發送訊息,一端去發送訊息另一端去消費當前訊息,這就涉及到使用訊息佇列MQ的一些內容,訊息佇列成熟的框架有多種,這裡你可以讀這篇文章來瞭解這些MQ的不同,這篇文章的主要目的是用來系統講述如何在Asp.Net Core中使用Kafka,整篇文章將介紹如何寫訊息發送方代碼、消費方代碼、配套的工具的使用,希望讀完這篇文章之後對整個訊息的運行機制有一定的理解,在這裡通過一張圖來簡要瞭解一下訊息佇列中的一些概念。

圖一 Kafka訊息佇列

一 安裝NUGET包

在寫代碼之前首先要做的就是安裝nuget包了,我們這裡使用的是Confluent.Kafka 1.0.0-RC4版本,具體專案要根據具體的時間來確定取用包的版本,這些包可能更新比較快。

圖二 取用Kafka包依賴

二 訊息發送方(Producer)

1 在專案中添加所有觸發事件的接口 IIntegrationEvent,後面所有的觸發事件都是繼承自這個接口。

///
    /// 集成事件的接口定義
    ///
    public interface IIntegrationEvent {
        string Key { get; set; }
    }

2 定義Kafka生產者

///
    /// Kafka 生產者的 Domain Service
    ///
    public class KafkaProducer : DomainService {
        private readonly IConfiguration _config;
        private readonly ILogger _logger;
        public KafkaProducer(IConfiguration config,
                             ILogger logger) {
            _config = config;
            _logger = logger;
        }
        ///
        /// 發送事件
        ///
        ///
        public void Produce(IIntegrationEvent @event) {
            var topic = _config.GetValue($”Kafka:Topics:{@event.GetType().Name}”);
            var producerConfig = new ProducerConfig {
                BootstrapServers = _config.GetValue(“Kafka:BootstrapServers”),
                MessageTimeoutMs = _config.GetValue(“Kafka:MessageTimeoutMs”)
            };
            var builder = new ProducerBuilder(producerConfig);
            using (var producer = builder.Build()) {
                try {
                    var json = JsonConvert.SerializeObject(@event);
                    var dr = producer.ProduceAsync(topic, new Message { Key = @event.Key, Value = json }).GetAwaiter().GetResult();
                    _logger.LogDebug(“發送事件 {0} 到 {1} 成功”, dr.Value, dr.TopicPartitionOffset);
                } catch (ProduceException ex) {
                    _logger.LogError(ex, “發送事件到 {0} 失敗,原因 {1} “, topic, ex.Error.Reason);
                }
            }
        }
    }

在這裡我們的Producer根據業務的需要定義在領域服務中,這裡面最關鍵的就是Produce方法了,該方法的引數是繼承自IIntegrationEvent 接口的各種各樣事件,在這個方法中,我們獲取配置在appsetting.json中配置的各種Topic以及Kafka服務器的地址,具體的配置如下方截圖所示。

圖三 配置服務器地址以及各種Topic

通過當前配置我們就知道我們的訊息要發往何處,然後我們就可以創建一個producer來將我們的事件(實際上是定義的資料結構)序列化成Json,然後通過異步的方式發送出去,這裡需要註意我們創建的Producer要放在一個using塊中,這樣在創建完成併發送訊息之後就會釋放當前生產者。這裡如果發送失敗會在當前日誌中記錄發送的值以及錯誤的原因從而便於進行除錯。這裡舉出其中的一個事件RepairContractFinishedEvent為例來說明。

///
    /// 維修合同完成的事件
    ///
    public class RepairContractFinishedEvent : IIntegrationEvent {
        public RepairContract RepairContract { get; set; }
        //一個維修合同會對應多個調整單
        public List RepairContractAdjusts { get; set; }
        public string Key { get; set; }
    }

這個裡面RepairContract以及List集合都是我們定義的一種資料結構。

最後我們來看看在具體的領域層中我們該如何觸發此事件的,這裡我們也定義了一個叫做IRepairContractEventManager接口的領域服務,併在裡面定義了一個叫做Finished的接口,然後在RepairContractEventManager中實現該方法。

public class RepairContractEventManager : DomainService, IRepairContractEventManager {
       private readonly KafkaProducer _producer;
       private readonly IRepository _repairContractRepository;
       private readonly IRepository _repairContractAdjustRepository;
       public RepairContractEventManager(KafkaProducer producer,
                                         IRepository repairContractRepository,
                                         IRepository repairContractAdjustRepository) {
           _producer = producer;
           _repairContractRepository = repairContractRepository;
           _repairContractAdjustRepository = repairContractAdjustRepository;
       }
       public void Finished(Guid repairContractId) {
           var repairContract = _repairContractRepository.GetAll()
               .Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials)
               .SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();
           var repairContractAdjusts = _repairContractAdjustRepository.GetAll()
               .Include(a => a.WorkItems).ThenInclude(w => w.Materials)
               .Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();
           var @event = new RepairContractFinishedEvent {
               Key = repairContract?.Code,
               RepairContract = repairContract,
               RepairContractAdjusts = repairContractAdjusts
           };
           _producer.Produce(@event);
       }
   }

 這段代碼就是組裝RepairContractFinishedEvent的具體實現過程,然後呼叫我們之前創建的KafkaProducer物件然後將訊息發送出去,這樣在需要觸發當前RepairContractFinishedEvent 的地方來註入IRepairContractEventManager接口,然後調對應的Finished方法,這樣就完成了整個訊息的發送的過程了。

三 查看訊息的發送

在發送完訊息後我們可以到Kafka 集群 Control Center中查找我們發送的所有訊息。選擇其中的一條訊息,雙擊,然後選擇INSPECT來查看發送的訊息

圖四 Kafka Control Center中查看發送訊息

四 訊息的接收方(Consumer)

在正確創建訊息的發送方後緊接著就是定義訊息的接收方了,訊息的接收方顧名思義就是消費剛纔訊息的一方,這裡的步驟和發送類似,但是也有很大的不同,訊息的消費方核心是一個後臺服務,並且在單獨的執行緒中監聽來自發送方的訊息,併進行消費,這裡我們先定義一個叫做KafkaConsumerHostedService的基類,我們具體來看看代碼。

///
    /// Kafka 消費者的後臺服務基礎類
    ///
    /// 事件型別
    public abstract class KafkaConsumerHostedService : BackgroundService where T : IIntegrationEvent {
        protected readonly IServiceProvider _services;
        protected readonly IConfiguration _config;
        protected readonly ILogger> _logger;
        public KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger> logger) {
            _services = services;
            _config = config;
            _logger = logger;
        }
        ///
        /// 消費該事件,比如呼叫 Application Service 持久化資料等
        ///
        ///事件內容
        protected abstract void DoWork(T @event);
        ///
        /// 構造 Kafka 消費者實體,監聽指定 Topic,獲得最新的事件
        ///
        ///終止標識
        ///
        protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
            await Task.Factory.StartNew(() => {
                var topic = _config.GetValue($”Kafka:Topics:{typeof(T).Name}”);
                var consumerConfig = new ConsumerConfig {
                    BootstrapServers = _config.GetValue(“Kafka:BootstrapServers”),
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    GroupId = _config.GetValue(“Application:Name”),
                    EnableAutoCommit = true,
                };
                var builder = new ConsumerBuilder(consumerConfig);
                using (var consumer = builder.Build()) {
                    consumer.Subscribe(topic);
                    while (!stoppingToken.IsCancellationRequested) {
                        try {
                            var result = consumer.Consume(stoppingToken);
                            var @event = JsonConvert.DeserializeObject(result.Value);
                            DoWork(@event);
                            //consumer.StoreOffset(result);
                        } catch (OperationCanceledException ex) {
                            consumer.Close();
                            _logger.LogDebug(ex, “Kafka 消費者結束,退出後臺執行緒”);
                        } catch (AbpValidationException ex) {
                            _logger.LogError(ex, $”Kafka {GetValidationErrorNarrative(ex)}”);
                        } catch (ConsumeException ex) {
                            _logger.LogError(ex, “Kafka 消費者產生異常”);
                        } catch (KafkaException ex) {
                            _logger.LogError(ex, “Kafka 產生異常”);
                        } catch (ValidationException ex) {
                            _logger.LogError(ex, “Kafka 訊息驗證失敗”);
                        } catch (Exception ex) {
                            _logger.LogError(ex, “Kafka 捕獲意外異常”);
                        }
                    }
                }
            }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
        private string GetValidationErrorNarrative(AbpValidationException validationException) {
            var detailBuilder = new StringBuilder();
            detailBuilder.AppendLine(“驗證過程中檢測到以下錯誤”);
            foreach (var validationResult in validationException.ValidationErrors) {
                detailBuilder.AppendFormat(” – {0}”, validationResult.ErrorMessage);
                detailBuilder.AppendLine();
            }
            return detailBuilder.ToString();
        }
    }

這段代碼中我們會創建一個consumer,這裡我們會在一個While迴圈中去訂閱特定Topic訊息,這裡的BootstrapServers是和發送方保持一致,並且也是在當前應用程式中的appsetting.json中進行配置的,而且這裡的consumer.Consume方法是一個阻塞式方法,當發送方發送特定事件後,這裡會接收到同樣名稱的Topic的訊息,然後將接收到的Json資料進行反序列化,然後交由後面的DoWork方法進行處理。這裡還是以之前生成者發送的RepairContractFinished事件為例,這裡也需要定義一個RepairContractFinishedEventHandler來處理生產者發送的訊息。

public class RepairContractFinishedEventHandler : KafkaConsumerHostedService {
        public RepairContractFinishedEventHandler(IServiceProvider services,
            IConfiguration config, ILogger> logger)
            : base(services, config, logger) {
        }
        ///
        /// 呼叫 Application Service,新增或更新維修合同及關聯物體
        ///
        ///待消費的事件
        protected override void DoWork(RepairContractFinishedEvent @event) {
            using (var scope = _services.CreateScope()) {
                var service = scope.ServiceProvider.GetRequiredService();
                service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);
            }
        }
    }

這裡需要特別註意的是在這裡我麽也需要定義一個繼承自IIntegrationEvent接口的事件,這裡也是定義一種資料結構,並且這裡的資料結構和生成者定義的要保持一致,否則消費方在反序列化的時候會丟失不能夠匹配的信息。

public class RepairContractFinishedEvent : IIntegrationEvent {
        public RepairContractDto RepairContract { get; set; }
        public List RepairContractAdjusts { get; set; }
        public string Key { get; set; }
    }

另外在DoWork方法中我們也需要註意代碼也需要用using包裹,從而在消費方消費完後釋放掉當前的應用服務。最後需要註意的就是我們的每一個Handle都是一個後臺服務,我們需要在Asp.Net Core的Startup的ConfigureServices進行配置,從而將當前的後臺服務添加到Asp.Net Core依賴註入容器中。

///
     /// 註冊集成事件的處理器
     ///
     ///
     private void AddIntegrationEventHandlers(IServiceCollection services) {
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
     }

最後我們也看看我們的appsetting.json的配置檔案關於kafka的配置。

“Kafka”: {
    “BootstrapServers”: “127.0.0.1:9092”,
    “MessageTimeoutMs”: 5000,
    “Topics”: {
      “RepairContractFinishedEvent”: “repair-contract-finished”,
      “AddOrUpdateProductCategoryEvent”: “add-update-product-category”,
      “AddOrUpdateDealerEvent”: “add-update-dealer”,
      “ClaimApproveEvent”: “claim-approve”,
      “ProductTransferDataEvent”: “product-update”,
      “PartUpdateEvent”: “part-update”,
      “VehicleSoldFinishedEvent”: “vehiclesold-finished”,
      “CustomerFinishedEvent”: “customer-update”,
      “VehicleInformationUpdateStatusEvent”: “add-update-vehicle-info”,
      “AddCustomerEvent”: “add-customer”
    }
  },

這裡需要註意的是發送方和接收方必須保證Topic一致,並且配置的服務器名稱端口保持一致,這樣才能夠保證訊息的準確發送和接收。最後對於服務端,這裡推薦一個VSCode的插件kafka,能夠創建併發送訊息,這樣就方便我們來發送我們需要的資料了,這裡同樣需要我們先建立一個.kafka的檔案,然後配置Kafka服務的地址和端口號。

圖五 利用VSCode Kafka插件發送訊息

已同步到看一看
赞(0)

分享創造快樂