歡迎光臨
每天分享高質量文章

訊息驅動—— Spring Cloud Stream

(給ImportNew加星標,提高Java技能)

作者:sprinkle_liz

www.jianshu.com/p/1621becd3c4a

 

在進入正題之前,我們先來設想一個場景。有兩個人在對話,其中一個人問了另一個人一個問題,這個問題比不簡單,得出正確答案需要花點時間。那麼問題來了,問問題的一方,在對方給出答案之前,會怎麼做呢?是一直做在那裡等呢,還是去做其它待會必須去做的事呢,比如boss昨天才交代今天解決的bug;等到對方回覆後,再根據回覆的內容繼續談話或者做其它。如果是我,我會選擇第二種(我還是很敬業的);當然若對方是美女,那我不介意在坐在她面前欣賞她思考的樣子(哈哈,不小心暴露了)。

 

設想上面的場景的目的,並不是想說明我有多敬業(饑渴),我想表達的是:我們與世界的互動並不是同步的、線性的、嚴格按照提問-回應的樣式進行的,而是訊息驅動,不斷的接發信息。當我們接收到訊息,會對這些訊息做出反應,當然,我們也經常會被打斷正在執行的主要工作。

 

這一教程將會介紹的是,如何設計並實現,能通過異步訊息與其它微服務互相交互的微服務。使用異步訊息在應用間互相通信並不是什麼新概念,新的概念是使用訊息來傳達事件狀態的改變——Event Driven Architecture(EDA),即事件驅動架構,也可以稱為Message Driven Architecture(MDA),訊息驅動架構。基於事件驅動架構,我們可以構建高度解耦的系統,需要互相通信的服務不用通過特定的庫或其它服務緊密耦合在一起。當與微服務結合時,我們只需讓服務監聽應用程式發出的事件(訊息)流,接收到事件(訊息)後作出對應的響應,就可以在應用程式中快速添加新功能。

 

Spring Cloud的子專案Spring Cloud Stream,能讓基於訊息驅動的應用開發變得更加簡單,使用它,我們可以很容易地就能實現“訊息發佈和消費”,而且會對底層訊息傳遞平臺(後文會介紹)屏蔽服務(包括發佈者和消費者)的實現細節。

 

訊息傳遞、事件驅動架構和微服務

 

在基於微服務的應用中,為什麼訊息傳遞的地位如此重要?回答這個問題時,我們會用到整個教程都涉及到的兩個服務——license和organization服務。不妨想象一下,當這些服務部署到生產環境中後,發現license服務在訪問organization服務獲取organization信息的時候,需要花費很長時間。不過幸運的是,發現了organization資料的使用樣式是:organization的資料很少變更,而且是使用主鍵從organization服務獲取資料的。所以將organization服務獲取的資料快取起來,這樣一來就能大大降低訪問license服務時的響應時間(獲取license資料時需要獲取對應的organizationId的organization資料)。

 

在使用實現快取的解決方案時,需要特別註意的有三點:

 

  1. 需要保證license服務所有實體獲取到的快取資料保持一致。這意味著不能將資料快取到license服務每個實體中,因為我們需要保證不管哪個實體來獲取資料,都要保證傳回的organization資料是一致的。
  2. 不能將organization資料快取在托管license服務的容器的記憶體中。服務托管在的容器的資源一般都是有限的,而且本地快取會引入更大的複雜性,因為必須確保本地快取與集群中的其它服務實體的快取是同步的。
  3. 當一個organization記錄發生改變,比如更新或刪除,license服務就需要去確認organization服務的哪些資料狀態發生了變更。然後license服務需要讓發生變更的organization快取資料無效並從快取中移除或更新。

 

針對上面說的三點,現在有2個解決方案。第一種,使用同步的“請求-響應”樣式;當organization資料狀態發生變更,license和organization服務通過它們各自的REST端點來回通信,比如organization呼叫license服務的端點通知license某個organization的狀態發生改變,你那邊的快取需要做處理,license處理完後響應organization服務。第二種,organization服務在資料發生變更後,organization服務將這一變更通過異步事件(訊息)發送出去;換句話說,organization服務會將某個organization記錄發生了變更這一事件發佈到一個佇列中,而license服務一直在監聽訊息傳遞平臺,當監聽到佇列中有新的事件(來自organization資料變更的事件)後,會“消費”這一事件,即對快取做對應的處理——更新或移除。下麵進一步介紹這兩種方案的實現細節。

 

同步的“請求-響應”樣式

 

為快取organization資料,我們使用Redis資料庫來儲存。redis是一個分佈式的以鍵值對的形式來儲存資料的資料庫。下圖說明瞭如何使用“請求-響應”樣式來實現快取的。

 

 

上圖中,當用戶請求license服務,license服務也需要查詢獲取organization資料。license服務會優先根據organization Id從集群redis中獲取資料,如果找不到,license服務則會向organization服務發送請求,得到正確傳回結果之後,傳回license資料給用戶之前,license服務會把organization資料儲存到redis中。此時,若有人通過organization的端點更新或刪除organization記錄,那麼organization服務在完成業務邏輯後,還需要訪問license服務,告知對應的organization記錄已經無效,需要更新或從快取中刪除。這一步中,至少存在三個問題/隱患:

 

  1. organization和license服務高度耦合。
  2. 這種耦合會使服務間交互的靈活性變脆弱。如果license服務提供的更新快取的端點變更,那麼organization服務也需要跟著改變。
  3. 這種方案極不靈活。因為我們無法在不修改organization服務原始碼的情況下,給“organization資料的變更”這一事件添加新的消費者,即當organization資料變更後,別的服務也做相應的業務邏輯。

 

下麵對這3點做進一步分析。

 

服務間高度耦合

 

在之前的圖中,我們可以看到license和organization服務間緊密耦合。license服務依賴organization服務獲取資料,另外,當organization記錄發生變更後,organization服務還需要訪問license服務,通知快取中的哪個organization已經失效,所以organization服務需要license服務暴露一個端點來完成這一需求,這樣一來,organization服務也跟license服務耦合在一起。當然,還有另一種做法,organization服務直接與license服務的redis服務器交互,讓redis中的某條記錄失效。

 

但是,organization服務直接與license服務的redis服務器交互這一做法在微服務環境中本身就是一大禁忌。當然,肯定會有人反駁說:快取中的資料本身就是屬於organization服務的,license服務只是在特定背景關係使用它們或者圍繞這些資料構建業務。但是,讓organization服務直接license服務redis中的資料打交道,會使organization服務依賴於license服務的redis,也極易打破license服務已經建立實現的規則。

 

服務變得脆弱

 

license和organization服務的高度耦合會使這兩個服務變得更脆弱。如果license服務掛掉或處理能力下降,organization服務也會因此受到影響,因為organization服務現在需要直接與license服務交互,即依賴於license服務。

 

為organization服務添加消費者不靈活

 

如果有另一個服務也需要“監聽”organization資料的改變,那麼必須在organization服務為這一服務添加一個遠程呼叫邏輯。這意味著organization必須添加新的代碼並重新編譯、部署。想一想,如果以後有多個這樣的服務(需要消費organization資料變更事件),甚至許多類似organization服務和license服務這樣高度耦合在一起的服務群,並且使用同步的“請求-響應”樣式,那麼應用內服務的“織網”表演將從此開始。最後你會發現,這個應用的失敗,這張“網”占了很重的分量。

 

使用訊息傳遞實現服務間的交互

 

使用訊息傳遞方案,將會在license服務和organization服務間引入一個訊息佇列。該佇列並不是用來從organization服務獲取資料,而是當organization服務資料發生變更後,可以將這一訊息發佈到佇列中。下圖說明瞭具體細節:

 

 

上圖展示的樣式,每一次organization資料發生變更,organization服務會發佈訊息到佇列中。而license一直在監聽來自organization的資料變更訊息,當發現佇列中發佈新的訊息後會立刻消費,即根據訊息內容執行相應的邏輯,如更新快取或直接失效。在這一樣式中,訊息佇列扮演的是一個處於license服務和organization服務的中間人的角色,即前文提到的“底層訊息傳遞平臺”。這一樣式能帶來許多好處,可以簡單概括為以下4點:

 

  • 低耦合
  • 持久性
  • 高可擴展性
  • 高靈活性

 

低耦合

 

一個微服務應用可以由許多個小的、分散的服務組成,這些服務間大都需要與其他服務交互,而且可能對其它服務管理的資料“感興趣”。之前提到使用同步的方式,同步的HTTP響應會讓license和organization兩個服務對彼此產生極大的依賴。雖然,我們無法完全消除這種依賴,但可以儘量讓這種依賴減弱,服務只暴露直接管理自己資料的端點。訊息傳遞模型可以解耦兩個服務間的依賴,因為對organization服務來說,當需要發佈資料狀態變更的訊息,只需將該訊息發佈到訊息佇列中;而對license服務而言,只負責對訊息進行消費,並不關心是誰發佈的訊息。

 

持久性

 

訊息佇列的存在,可以確保訊息一定會被傳遞出去,即使消費方服務已經掛掉。也就是說,即使license服務已經不可用,organization服務依然可以將訊息發佈到訊息佇列中。這些訊息會被儲存在訊息佇列中,直到license服務可用後才開始消費這些積攢了許多的訊息。相反的,快取和訊息佇列的結合,當organization服務掛掉,license服務可以優雅的降級,因為至少有部分organization資料在license服務的快取中。有時,過時的資料總比缺少資料來得強。

 

高可擴展性

 

由於訊息發佈出去後會儲存在訊息佇列中,所以小心的發佈者並不需要等待消費者消費後傳回的響應訊息,因此它們可以繼續它們的工作。同樣的,當消費方的一個消費者(實體)已經無法儘快地從佇列中讀取訊息,那麼可以啟動更多的消費方服務來處理這些“上限溢位”的訊息。而傳統的擴展機制是增加處理訊息的執行緒數量,這樣,一個消費者就能足以應對。不幸的是,若採用這種辦法,那麼訊息的消費者的處理性能最後將受限CPU的核心數量,當服務發佈訊息頻率再次增高,處理熟讀又無法滿足需求,最後只能通過部署到性能更強大的機器上。而通過啟動更多消費者實體這種可擴展性極強的方法則非常適用於微服務樣式,因為啟動一個微服務的更多實體來相對於性能強大的機器來說是微不足道的,畢竟這些微服務隨便部署在普通機器上就能很好地運行。

 

高靈活性

 

訊息的發佈者並不知道誰會去消費這些訊息,這意味著可以很輕鬆地加入新的訊息消費者,重要的是這並不會對訊息的發佈者有任何影響。這是一個非常強大的優勢,因為完全可以在添加擁有新功能的微服務到應用中的情況下,不會影響到其它已存在的微服務。新添加的服務只需監聽事件的發佈然後對其做出響應即可。

 

Spring Cloud Stream

 

Spring Cloud Stream可以很容易實現將訊息傳遞樣式應用到基於Spring的微服務應用中。Spring Cloud Stream子專案官網https://cloud.spring.io/spring-cloud-stream/。Spring Cloud Stream是一個註解驅動框架,所以可以使用簡單的幾個註解就能在應用中構建訊息的發佈者和消費者。

 

Spring Cloud Stream還支持我們將訊息傳遞平臺的實現細節抽象出來,Spring Cloud 只提供與平臺無關的接口。這意味著可以將訊息傳遞平臺詳細的實現細節從應用代碼中抽離出來,然後使用已經實現訊息傳遞的平臺,Spring Cloud Stream支持的訊息傳遞平臺包括Apache Kafka和RabbitMQ,這樣,應用中就可以直接使用與具體平臺無關Spring接口實現訊息的發佈和消費(本教程會對RabbitMQ訊息總線的使用進行介紹,因為Kafka我也不怎麼熟悉,囧)。

 

為了瞭解Spring Cloud Stream,我們首先對Spring Cloud Stream的架構進行介紹並熟悉Spring Cloud Steam相關術語的含義。如果是第一次接觸訊息傳遞平臺,先打個預防針,接下來涉及的東西學習起來可能會有點吃力。

 

Spring  Cloud Stream架構

 

首先,我們假設兩個服務通過訊息傳遞進行交互來介紹Spring Cloud Steam的架構。一個是訊息發佈者,另一個是訊息消費者。如下圖,借助Spring Cloud Stream來實現訊息的傳遞:

 

Spring Cloud Stream架構

 

Spring Cloud中訊息的發佈和消費涉及到4個組件:

 

  • Source
  • Channel
  • Binder
  • Sink

 

Source

 

當服務發佈訊息前的前置業務完成後會通過Source將訊息發佈出去。Source是一個Spring註解接口,它可以將代表訊息主體的POJO物件發佈到訊息管道(Channel)中,發佈之前會把該訊息物件序列化(預設使用JSON)。

 

Channel

 

Channel(訊息管道)是訊息佇列的進一步抽象,它會儲存訊息生產者發佈的或者訊息消費者接收到的訊息。訊息管道的名稱一般與標的佇列名稱相關聯。然而,訊息佇列的名稱不會直接在代碼中暴露,相反管道名稱則會被用在代碼中,所以只能在配置檔案中配置,為訊息管道選取正確的訊息佇列進行讀和寫,而不是在代碼中體現。

 

Binder

 

Binder則是Spring Cloud Stream框架的一部分。它是由Spring Cloud Stream實現的用來與特殊的訊息平臺交互。因為Binder是由Spring Cloud Stream實現的,所以我們可以在不需要暴露特殊訊息平臺的類庫和API的情況下就能實現對訊息的發佈和消費。下文你將會看到它的強大之處。

 

Sink

 

在Spring Cloud Stream中,當從訊息佇列接收到一條訊息後,需要通過Sink。Sink能監聽進入管道中的訊息並將訊息反序列化成一個POJO物件。之後,訊息就能給業務邏輯使用了。

 

安裝RabbitMQ 

本教程主要使用RabbitMQ做示例。其實無論使用RabbitMQ或是Kafka,代碼是一樣的,不一樣的只是配置,在配置訊息中間件時有一點點不一樣。

 

在學習接下來的內容之前,需要先在本地安裝RabbitMQ,至於RabbitMQ的安裝,這裡就不給出了,網上的入門教程大把。煩請第一次接觸RabbitMQ的童鞋自己在網上找找。註意,安裝RabbitMQ之前還需安裝eralang。

 

訊息發佈和消費的實現

 

上面我們已經簡單介紹了Spring Cloud Stream涉及到的幾個組件,下麵開始編寫一個簡單的Spring Cloud 例子。在該例子中,我們會使用organization服務發佈訊息然後license服務消費訊息,license服務接收到訊息後只做最簡單的消費——在控制台打印日誌。

 

在organization服務實現訊息發佈者

 

接下來我們會實現,每當服務維護的organization資料發生變更(添加、更新或刪除)時,organization服務會向一個RabbitMQ topic發佈一條訊息,表明organization資料變更事件已經發生。

 

發佈出去的訊息包含與該資料變更事件相關的organization ID和資料變更行為(添加、更新或刪除)。

 

pom檔案

 

實現訊息的發佈,第一件事就是在pom檔案引入需要的啟動依賴。啟動依賴很簡單,只有一個,在organization服務的pom檔案中添加如下依賴:

 


    org.springframework.cloud
    spring-cloud-starter-stream-rabbit

 

核心類

 

引入需要的依賴後,我們就可以大展拳腳了。Show Time!

 

首先,來看下在訊息發佈端需要創建的幾個類或接口,如下圖:

 

 

其中包含兩個類一個接口,看它們的名字大概就能猜出各自的作用了。下麵來看具體原始碼:

 

OrgChangeModel類:

 

public class OrgChangeModel {
    private String type;
    private String action;
    private String organizationId;
    private String correlationId;

    public OrgChangeModel() {}

    public OrgChangeModel(String type, String action, String organizationId, String correlationId) {
        super();
        this.type   = type;
        this.action = action;
        this.organizationId = organizationId;
        this.correlationId = correlationId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }


    public String getOrganizationId() {
        return organizationId;
    }

    public void setOrganizationId(String organizationId) {
        this.organizationId = organizationId;
    }


    public String getCorrelationId() {
        return correlationId;
    }

    public void setCorrelationId(String correlationId) {
        this.correlationId = correlationId;
    }
}
若仔細觀察OrgChangePublisher的代碼,大概可以猜出,該類其實就是一個訊息模型,是一個POJO,是用來承載需要傳遞的訊息,換句話說就是訊息的載體。另外該模型在被髮送出去的時候,會被序列化成json(預設)。

 

OrgChangeSource接口:

 

public interface OrgChangeSource {

    @Output("orgChangeOutput")
    MessageChannel output();
}

 

OrgChangeSource接口很簡單,現在只有一個方法,該方法傳回的是一個MessageChannel,但不簡單的是該方法上面的註解——@output,加上@output註解,Spring Cloud Stream會自動實現一個傳回MessageChannel(訊息管道)的方法。另外,註解@output有一個屬性——value,用來自定義方法傳回的訊息管道的名稱。

 

OrgChangePublisher類:

 

@EnableBinding(OrgChangeSource.class)
public class OrgChangePublisher {

    private static final Logger logger = LoggerFactory.getLogger(OrgChangePublisher.class);

    @Autowired
    private OrgChangeSource source;

    public void publish(String action, String orgId) {
        OrgChangeModel model = new OrgChangeModel(
                OrgChangeModel.class.getTypeName(),
                action,
                orgId,
                UserContextHolder.getContext().getCorrelationId());
        logger.info("sending rabbitmq message {} for Organization Id: {}",
                action, orgId);
        source.output().send(MessageBuilder.withPayload(model).build());
    }
}

 

首先可以看到該類上面有一個@EnableBinding註解,下圖是官方文件對該註解的介紹:

 

 

從上圖可以知道,@EnableBinding可以讓一個Spring應用變成一個Spring Cloud Stream應用,該註解可以加在應用中的其中一個配置類上。另外,該註解中也只有一個value屬性,用來接收一個Class類(在我們看來其實是接口)陣列,這些Class類包含一個或多個接口方法,如上面的OrgChangeSource,這些方法都傳回可系結的組件(Channel)。

 

接著再來看OrgChangePublisher#publish()方法的邏輯。其中最關鍵的一步就是:

 

source.output().send(MessageBuilder.withPayload(model).build());

 

我們知道source是Spring註入的由Spring Cloud Stream幫我們實現的Source實體,(註意,只有@EnableBinding的value包含了對應的Source接口,此處為OrgChangeSource,Spring Cloud Stream才會知道需要幫我們實現,若去掉@EnableBinding註解中的OrgChangeSource.class,將無法成功啟動服務),output()方法是我們之前在OrgChangeSource中定義的接口方法,傳回的是一個實現MessageChannel接口的物件,其中只有兩個send方法。方法簽名分別為:

 

boolean send(Message> message);

boolean send(Message> message, long timeout);

 

Channel#send()方法能將一個Message發送到這個Channel中。如果發送成功傳回true。這個方法可能會無限期阻塞,取決於使用哪種實現。所以第二個方法的timeout引數就是來控制超時時間的。send方法在發送訊息時會阻塞執行緒,直到訊息發送成功或超時發送失敗。

 

而MessageBuilder是預設的訊息構建器,它的靜態方法withPayload()接收的就是我們需要發送的訊息(負載),傳回的是一個MessageBuilder物件,MessageBuilder物件呼叫build()方法最終產生一個Message物件。最後由send()方法發送到Channel中。

 

發佈訊息

 

我們的目的是在organization資料發生變更後,通過訊息傳遞將這一信息通知license服務。接下來介紹如何使用Publisher來發佈訊息。

 

修改OrganizationService類的updateOrg()和deleteOrg()方法,如下:

 

@Service
public class OrganizationService {
    @Autowired
    private OrganizationRepository orgRepository;

    @Autowired
    private OrgChangePublisher orgChangePublisher;

    public Organization getOrg(String organizationId) {
        return orgRepository.findById(organizationId);
    }
    ...
    public void updateOrg(Organization org) {
        orgRepository.save(org);
        orgChangePublisher.publish("update", org.getId());
    }

    public void deleteOrg(Organization org){
        orgRepository.delete( org.getId());
        orgChangePublisher.publish("delete", org.getId());
    }
}

 

可以看到,在OrganizationService中註入了OrgChangePublisher,然後在update()和delete()方法中使用,即organization記錄更新或刪除成功後會將訊息發佈出去。

 

配置檔案

 

當所有核心類實現後,最後一步是配置使用哪種訊息中間件及其環境,還有訊息管道的系結關係。配置如下:

 

spring:
  ...
  cloud:
    stream:
      binders:
        rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        orgChangeOutput:
          destination: orgChangeTopic
          content-type: application/json
          binder: rabbitmq

 

可以看到,主要涉及到兩個屬性的配置:spring.cloud.stream.binders和spring.cloud.stream.bindings,這兩個屬性都接收一個Map集合,即可以配置多個binder和 管道(Channel)。binders Map的鍵值是binder的名稱,這個名稱需要在bindings配置中用到;binding的鍵值是定義的管道的名稱,取註解@Output和@Input的value值,如OrgChangeSource#output()方法上註解@Output的value值”orgChangeOutput”。其它屬性的含義如下圖所示:

 

這樣,訊息發佈者的工作就全部完成,接下來是實現訊息消費者。

 

在license服務實現訊息消費者

 

pom檔案

 

同樣,license服務也需要引入訊息驅動的啟動依賴。跟訊息發佈者引入的依賴一樣:

 


    org.springframework.cloud
    spring-cloud-starter-stream-rabbit

 

核心類

 

下圖是訊息消費端需要創建的幾個類或接口:

 

 

OrgChangeModel類:

 

OrgChangeModel.java必須與organization服務的一樣,因為license服務會在接收到訊息後將訊息內容(Model)反序列化,所以訊息發佈端和消費端的訊息模型(Model)必須保持一致。一般會將訊息模型提取到一個公共庫,然後發佈端和消費端就可以取用同一個了。這裡就不給出原始碼了。

 

OrgChangeSink接口

 

public interface OrgChangeSink {

    @Input("orgChangeInput")
    SubscribableChannel input();
}

 

上面的接口同樣只有一個方法,與OrgChangeSource不同的是,該接口方法上的註解變成@Input,但原理與@Output差不多,註解中的”orgChangeInput”同樣的Channel(管道)的名稱。該接口同樣是交由Spring Cloud Stream來實現。

 

OrgChangeHandler

 

@EnableBinding(OrgChangeSink.class)
public class OrgChangeHandler {

    private static final Logger logger = LoggerFactory.getLogger(OrgChangeHandler.class);

    @StreamListener("orgChangeInput")
    public void handle(OrgChangeModel model) {
        logger.info("Received a message of type " + model.getType());
        switch(model.getAction()){
            case "get":
                logger.info("Received a GET event from the organization service for organization id {}", model.getOrganizationId());
                break;
            case "save":
                logger.info("Received a SAVE event from the organization service for organization id {}", model.getOrganizationId());
                break;
            case "update":
                logger.info("Received a UPDATE event from the organization service for organization id {}", model.getOrganizationId());
                break;
            case "delete":
                logger.info("Received a DELETE event from the organization service for organization id {}", model.getOrganizationId());
                break;
            default:
                logger.error("Received an UNKNOWN event from the organization service of type {}", model.getType());
                break;
        }
    }
}

 

將接收到的訊息反序列化成對應的Model後,就可以使用Model做相應的業務了。這裡只是在控制臺中打印。實際開發中就不會這麼簡單了,比如需要運算元據庫等等。

 

原始碼中還出現了一個註解@StreamListener,該註解能用來接聽某個管道,當監聽到管道中有訊息到來,機會接收然後將訊息反序列化,最後執行相應的業務,這裡為handle()方法。

 

消費端的核心代碼已經介紹完,最後還需要配置一些必要的屬性。

 

配置檔案

 

配置如下:

 

spring:
  ...
  cloud:
    stream:
      binders:
        rabbitmq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:
        orgChangeInput:
          destination: orgChangeTopic
          content-type: application/json
          group: licenseGroup
          binder: rabbitmq

 

消費端的配置與發佈端的配置絕大多數是一樣的,不一樣的是在配置OrgChangInput Channel的時候,多了一個屬性:OrgChangInput.group。實際上,一個訊息發佈者可以有多個訊息消費者,也就是說,還可以有另外的服務也去監聽organization服務發佈的同一個訊息事件。分組的作用是將不同的服務隔離開,服務間的監聽互不影響,若不分組,那麼發佈的訊息,只有其中的某個服務的能接收到並消費;而分組後,會將訊息的副本分別發送到所有監聽該訊息事件的服務對應的管道中,最後訊息會由服務的某個實體消費。

 

 

發佈並消費訊息

 

最後啟動organization服務和license服務。然後使用postman訪問http://localhost:11000/v1/organizations/e254f8c-c442-4ebe-a82a-e2fc1d1ff78a,http方法為PUT,請求體為:

 

{
    "id": "e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
    "name": "customer-crm-co",
    "contactName": "Mark Balster",
    "contactEmail": "[email protected]",
    "contactPhone": "823-555-1213"
}

 

 

然後,觀察organization服務和license服務的控制台,可以看到類似如下輸出:

 

organization服務控制台輸出:

 

 

license服務控制台輸出:

 

 

證明license服務接收到由organization服務發佈的訊息,並消費(打印日誌)了。

 

完!

 

(git上的原始碼:https://gitee.com/rain7564/spring_microservices_study/tree/master/sixth-spring-cloud-stream)

已同步到看一看
赞(0)

分享創造快樂