歡迎光臨
每天分享高質量文章

【訊息佇列 MQ 專欄】RabbitMQ

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

關於訊息佇列,從前年開始斷斷續續看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術選型,是時候把這塊的知識整理記錄一下了。

市面上的訊息佇列產品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,去年底阿裡巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 資料庫也支援 MQ 功能。總之這塊知名的產品就有十幾種,就我自己的使用經驗和興趣只打算談談 RabbitMQ、Kafka 和 ActiveMQ ,本文先講 RabbitMQ ,在此之前先看下訊息佇列的相關概念。

什麼叫訊息佇列

訊息(Message)是指在應用間傳送的資料。訊息可以非常簡單,比如只包含文字字串,也可以更複雜,可能包含嵌入物件。

訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即傳回,由訊息系統來確保訊息的可靠傳遞。訊息釋出者只管把訊息釋出到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 中取訊息而不管是誰釋出的。這樣釋出者和使用者都不用知道對方的存在。

為何用訊息佇列

從上面的描述中可以看出訊息佇列是一種應用間的非同步協作機制,那什麼時候需要使用 MQ 呢?

以常見的訂單系統為例,使用者點選【下單】按鈕之後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發簡訊通知。在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的效能,這時可以將一些不需要立即生效的操作拆分出來非同步執行,比如發放紅包、發簡訊通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之後傳送一條訊息到 MQ 讓主流程快速完結,而由另外的單獨執行緒拉取MQ的訊息(或者由 MQ 推送訊息),當發現 MQ 中有發紅包或發簡訊之類的訊息時,執行相應的業務邏輯。

以上是用於業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。

RabbitMQ 特點

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

AMQP :Advanced Message Queue,高階訊息佇列協議。它是應用層協議的一個開放標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。

RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。具體特點包括:

  1. 可靠性(Reliability)RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。

  2. 靈活的路由(Flexible Routing)在訊息進入佇列之前,透過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ 已經提供了一些內建的 Exchange 來實現。針對更複雜的路由功能,可以將多個 Exchange 系結在一起,也透過外掛機制實現自己的 Exchange 。

  3. 訊息叢集(Clustering)多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。

  4. 高可用(Highly Available Queues)佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。

  5. 多種協議(Multi-protocol)RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。

  6. 多語言客戶端(Many Clients)RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。

  7. 管理介面(Management UI)RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方面。

  8. 跟蹤機制(Tracing)如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。

  9. 外掛機制(Plugin System)RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。

RabbitMQ 中的概念

訊息模型

所有 MQ 產品從模型抽象上來說都是一樣的過程:消費者(consumer)訂閱某個佇列。生產者(producer)建立訊息,然後釋出到佇列(queue)中,最後將訊息傳送到監聽的消費者。

訊息流

RabbitMQ 基本概念

上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念:

RabbitMQ 內部結構
  1. Message訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。

  2. Publisher訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。

  3. Exchange交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。

  4. Binding系結,用於訊息佇列和交換器之間的關聯。一個系結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由系結構成的路由表。

  5. Queue訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。

  6. Connection網路連線,比如一個TCP連線。

  7. Channel通道,多路復用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內地虛擬連線,AMQP 命令都是透過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是透過通道完成。因為對於作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以復用一條 TCP 連線。

  8. Consumer訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。

  9. Virtual Host虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、系結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。

  10. Broker表示訊息佇列伺服器物體。

AMQP 中的訊息路由

AMQP 中訊息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把訊息釋出到 Exchange 上,訊息最終到達佇列並被消費者接收,而 Binding 決定交換器的訊息應該傳送到那個佇列。

AMQP 的訊息路由過程

Exchange 型別

Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種型別:direct、fanout、topic、essay-headers 。essay-headers 匹配 AMQP 訊息的 essay-header 而不是路由鍵,此外 essay-headers 交換器和 direct 交換器完全一致,但效能差很多,目前幾乎用不到了,所以直接看另外三種型別:

  1. direct

    direct 交換器

    訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果一個佇列系結到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的訊息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的樣式。

  2. fanout

    fanout 交換器

    每個發到 fanout 型別交換器的訊息都會分到所有系結的佇列上去。fanout 交換器不處理路由鍵,只是簡單的將佇列系結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器系結的所有佇列上。很像子網廣播,每檯子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。

  3. topic

    topic 交換器

    topic 交換器透過樣式匹配分配訊息的路由鍵屬性,將路由鍵和某個樣式進行匹配,此時佇列需要系結到一個樣式上。它將路由鍵和系結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號“#”和符號“”。#匹配0個或多個單詞,匹配不多不少一個單詞。

RabbitMQ 安裝

一般來說安裝 RabbitMQ 之前要安裝 Erlang ,可以去Erlang官網下載。接著去RabbitMQ官網下載安裝包,之後解壓縮即可。根據作業系統不同官網提供了相應的安裝說明:Windows、Debian / Ubuntu、RPM-based Linux、Mac

如果是Mac 使用者,個人推薦使用 HomeBrew 來安裝,安裝前要先更新 brew:

brew update

接著安裝 rabbitmq 伺服器:

brew install rabbitmq

這樣 RabbitMQ 就安裝好了,安裝過程中會自動其所依賴的 Erlang 。

RabbitMQ 執行和管理

  1. 啟動啟動很簡單,找到安裝後的 RabbitMQ 所在目錄下的 sbin 目錄,可以看到該目錄下有6個以 rabbitmq 開頭的可執行檔案,直接執行 rabbitmq-server 即可,下麵將 RabbitMQ 的安裝位置以 . 代替,啟動命令就是:

./sbin/rabbitmq-server

啟動正常的話會看到一些啟動過程資訊和最後的 completed with 7 plugins,這也說明啟動的時候預設載入了7個外掛。

正常啟動
  1. 後臺啟動如果想讓 RabbitMQ 以守護程式的方式在後臺執行,可以在啟動的時候加上 -detached 引數:

./sbin/rabbitmq-server -detached
  1. 查詢伺服器狀態sbin 目錄下有個特別重要的檔案叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的幾乎一站式解決方案,絕大部分的運維命令它都可以提供。查詢 RabbitMQ 伺服器的狀態資訊可以用引數 status :

./sbin/rabbitmqctl status

該命令將輸出伺服器的很多資訊,比如 RabbitMQ 和 Erlang 的版本、OS 名稱、記憶體等等

  1. 關閉 RabbitMQ 節點我們知道 RabbitMQ 是用 Erlang 語言寫的,在Erlang 中有兩個概念:節點和應用程式。節點就是 Erlang 虛擬機器的每個實體,而多個 Erlang 應用程式可以執行在同一個節點之上。節點之間可以進行本地通訊(不管他們是不是執行在同一臺伺服器之上)。比如一個執行在節點A上的應用程式可以呼叫節點B上應用程式的方法,就好像呼叫本地函式一樣。如果應用程式由於某些原因奔潰,Erlang 節點會自動嘗試重啟應用程式。如果要關閉整個 RabbitMQ 節點可以用引數 stop :

./sbin/rabbitmqctl stop

它會和本地節點通訊並指示其乾凈的關閉,也可以指定關閉不同的節點,包括遠端節點,只需要傳入引數 -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 

-n node 預設 node 名稱是 rabbit@server ,如果你的主機名是 server.example.com ,那麼 node 名稱就是 rabbit@server.example.com 。

  1. 關閉 RabbitMQ 應用程式如果只想關閉應用程式,同時保持 Erlang 節點執行則可以用 stop_app:

./sbin/rabbitmqctl stop_app

這個命令在後面要講的叢集樣式中將會很有用。

  1. 啟動 RabbitMQ 應用程式

./sbin/rabbitmqctl start_app
  1. 重置 RabbitMQ 節點

./sbin/rabbitmqctl reset

該命令將清除所有的佇列。

  1. 檢視已宣告的佇列

./sbin/rabbitmqctl list_queues
  1. 檢視交換器

./sbin/rabbitmqctl list_exchanges

該命令還可以附加引數,比如列出交換器的名稱、型別、是否持久化、是否自動刪除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
  1. 檢視系結

./sbin/rabbitmqctl list_bindings

Java 客戶端訪問

RabbitMQ 支援多種語言訪問,以 Java 為例看下一般使用 RabbitMQ 的步驟。

  1. maven工程的pom檔案中新增依賴

<dependency>
   <groupId>com.rabbitmqgroupId>


   <artifactId>amqp-clientartifactId>
   <version>4.1.0version>
dependency>

  1. 訊息生產者

package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
   public static void main(String[] args) throws IOException, TimeoutException {
       //建立連線工廠
       ConnectionFactory factory = new ConnectionFactory();
       factory.setUsername("guest");
       factory.setPassword("guest");
       //設定 RabbitMQ 地址
       factory.setHost("localhost");
       //建立到代理伺服器到連線
       Connection conn = factory.newConnection();
       //獲得通道
       Channel channel = conn.createChannel();
       //宣告交換器
       String exchangeName = "hello-exchange";
       channel.exchangeDeclare(exchangeName, "direct", true);
       String routingKey = "hola";
       //釋出訊息
       byte[] messageBodyBytes = "quit".getBytes();
       channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
       channel.close();
       conn.close();
   }
}
  1. 訊息消費者

package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
   public static void main(String[] args) throws IOException, TimeoutException {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setUsername("guest");
       factory.setPassword("guest");
       factory.setHost("localhost");
       //建立到代理伺服器到連線
       Connection conn = factory.newConnection();
       //獲得通道
       final Channel channel = conn.createChannel();
       //宣告交換器
       String exchangeName = "hello-exchange";
       channel.exchangeDeclare(exchangeName, "direct", true);
       //宣告佇列
       String queueName = channel.queueDeclare().getQueue();
       String routingKey = "hola";
       //系結佇列,透過鍵 hola 將佇列和交換器系結起來
       channel.queueBind(queueName, exchangeName, routingKey);
       while(true) {
           //消費訊息
           boolean autoAck = false;
           String consumerTag = "";
           channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
               @Override
               public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body)
throws IOException
{
                   String routingKey = envelope.getRoutingKey();
                   String contentType = properties.getContentType();
                   System.out.println("消費的路由鍵:" + routingKey);
                   System.out.println("消費的內容型別:" + contentType);
                   long deliveryTag = envelope.getDeliveryTag();
                   //確認訊息
                   channel.basicAck(deliveryTag, false);
                   System.out.println("消費的訊息體內容:");
                   String bodyStr = new String(body, "UTF-8");
                   System.out.println(bodyStr);
               }
           });
       }
   }
}
  1. 啟動 RabbitMQ 伺服器

./sbin/rabbitmq-server
  1. 執行 Consumer先執行 Consumer ,這樣當生產者傳送訊息的時候能在消費者後端看到訊息記錄。

  2. 執行 Producer接著執行 Producer ,釋出一條訊息,在 Consumer 的控制檯能看到接收的訊息:

    Consumer 控制檯

RabbitMQ 叢集

RabbitMQ 最優秀的功能之一就是內建叢集,這個功能設計的目的是允許消費者和生產者在節點崩潰的情況下繼續執行,以及透過新增更多的節點來線性擴充套件訊息通訊吞吐量。RabbitMQ 內部利用 Erlang 提供的分散式通訊框架 OTP 來滿足上述需求,使客戶端在失去一個 RabbitMQ 節點連線的情況下,還是能夠重新連線到叢集中的任何其他節點繼續生產、消費訊息。

RabbitMQ 叢集中的一些概念

RabbitMQ 會始終記錄以下四種型別的內部元資料:

  1. 佇列元資料包括佇列名稱和它們的屬性,比如是否可持久化,是否自動刪除

  2. 交換器元資料交換器名稱、型別、屬性

  3. 系結元資料內部是一張表格記錄如何將訊息路由到佇列

  4. vhost 元資料為 vhost 內部的佇列、交換器、系結提供名稱空間和安全屬性

在單一節點中,RabbitMQ 會將所有這些資訊儲存在記憶體中,同時將標記為可持久化的佇列、交換器、系結儲存到硬碟上。存到硬碟上可以確保佇列和交換器在節點重啟後能夠重建。而在叢集樣式下同樣也提供兩種選擇:存到硬碟上(獨立節點的預設設定),存在記憶體中。

如果在叢集中建立佇列,叢集只會在單個節點而不是所有節點上建立完整的佇列資訊(元資料、狀態、內容)。結果是隻有佇列的所有者節點知道有關佇列的所有資訊,因此當叢集節點崩潰時,該節點的佇列和系結就消失了,並且任何匹配該佇列的系結的新訊息也丟失了。還好RabbitMQ 2.6.0之後提供了映象佇列以避免叢集節點故障導致的佇列內容不可用。

RabbitMQ 叢集中可以共享 user、vhost、exchange等,所有的資料和狀態都是必須在所有節點上複製的,例外就是上面所說的訊息佇列。RabbitMQ 節點可以動態的加入到叢集中。

當在叢集中宣告佇列、交換器、系結的時候,這些操作會直到所有叢集節點都成功提交元資料變更後才傳回。叢集中有記憶體節點和磁碟節點兩種型別,記憶體節點雖然不寫入磁碟,但是它的執行比磁碟節點要好。記憶體節點可以提供出色的效能,磁碟節點能保障配置資訊在節點重啟後仍然可用,那叢集中如何平衡這兩者呢?

RabbitMQ 只要求叢集中至少有一個磁碟節點,所有其他節點可以是記憶體節點,當節點加入或離開叢集時,它們必須要將該變更通知到至少一個磁碟節點。如果只有一個磁碟節點,剛好又是該節點崩潰了,那麼叢集可以繼續路由訊息,但不能建立佇列、建立交換器、建立系結、新增使用者、更改許可權、新增或刪除叢集節點。換句話說叢集中的唯一磁碟節點崩潰的話,叢集仍然可以執行,但直到該節點恢復,否則無法更改任何東西。

RabbitMQ 叢集配置和啟動

如果是在一臺機器上同時啟動多個 RabbitMQ 節點來組建叢集的話,只用上面介紹的方式啟動第二、第三個節點將會因為節點名稱和埠衝突導致啟動失敗。所以在每次呼叫 rabbitmq-server 命令前,設定環境變數 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定唯一的節點名稱和埠。下麵的例子埠號從5672開始,每個新啟動的節點都加1,節點也分別命名為test_rabbit_1、test_rabbit_2、test_rabbit_3。

啟動第1個節點:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached

啟動第2個節點:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

啟動第2個節點前建議將 RabbitMQ 預設啟用的外掛關掉,否則會存在使用了某個外掛的埠號衝突,導致節點啟動不成功。

現在第2個節點和第1個節點都是獨立節點,它們並不知道其他節點的存在。叢集中除第一個節點外後加入的節點需要獲取叢集中的元資料,所以要先停止 Erlang 節點上執行的 RabbitMQ 應用程式,並重置該節點元資料,再加入並且獲取叢集的元資料,最後重新啟動 RabbitMQ 應用程式。

停止第2個節點的應用程式:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app

重置第2個節點元資料:

./sbin/rabbitmqctl -n test_rabbit_2 reset

第2節點加入第1個節點組成的叢集:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost

啟動第2個節點的應用程式

./sbin/rabbitmqctl -n test_rabbit_2 start_app

第3個節點的配置過程和第2個節點類似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl -n test_rabbit_3 stop_app
./sbin/rabbitmqctl -n test_rabbit_3 reset
./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost
./sbin/rabbitmqctl -n test_rabbit_3 start_app

RabbitMQ 叢集運維

停止某個指定的節點,比如停止第2個節點:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop

檢視節點3的叢集狀態:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status


知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽

一共 60 篇++

贊(0)

分享創造快樂