歡迎光臨
每天分享高質量文章

RocketMQ學習-訊息釋出和訂閱

前面一篇文章分析了broker的啟動過程,瀏覽了broker的基本功能。接下來的幾篇文章,準備按照十分鐘入門RocketMQ一文中提到的一系列特性,依次進行學習。這篇文章準備分析RocketMQ作為MQ的最基本的功能:訊息的釋出(publish)和訂閱(subscribe)。首先,我參考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控這篇文章完成了一個簡單的例子。

一、RocketMQ訊息模型

在部署RocketMQ的時候,先啟動name server,再啟動broker,這時候broker會將自己註冊到name server。應用程式中的producer啟動的時候,首先連線一臺name server,獲取broker的地址串列;然後再和broker建立連線,接下來就可以傳送訊息了。其中:一個producer只與一個name server連線,一個producer會跟所有broker建立連線,每個連線都會有心跳檢測機制。

producer會輪詢向指定topic的mq集合傳送訊息。

consumer有兩種消費樣式:叢集消費和廣播消費。叢集消費:多個consumer平均消費該topic下所有mq的訊息,即某個訊息在某個message queue中被一個consumer消費後,其他消費者就不會消費到它;廣播消費:所有consumer可以消費到發到這個topic下的所有訊息。

consumer有兩種獲取訊息的樣式:推樣式和拉樣式,在RocketMQ中,從技術實現角度看,推樣式也是在拉樣式上做了一層封裝。

二、訊息傳送

生產者Demo

首先給出程式碼,

  1. package com.javadu.chapter8rocketmq.message;

  2. import org.apache.rocketmq.client.exception.MQBrokerException;

  3. import org.apache.rocketmq.client.exception.MQClientException;

  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;

  5. import org.apache.rocketmq.client.producer.SendResult;

  6. import org.apache.rocketmq.common.message.Message;

  7. import org.apache.rocketmq.remoting.common.RemotingHelper;

  8. import org.apache.rocketmq.remoting.exception.RemotingException;

  9. import org.springframework.beans.factory.annotation.Value;

  10. import org.springframework.stereotype.Component;

  11. import java.io.UnsupportedEncodingException;

  12. import javax.annotation.PostConstruct;

  13. /**

  14. * 作用: 同步傳送訊息

  15. * User: duqi

  16. * Date: 2018/3/29

  17. * Time: 13:52

  18. */

  19. @Component

  20. public class ProducerDemo {

  21.    @Value("${apache.rocketmq.producer.producerGroup}")

  22.    private String producerGroup;

  23.    @Value("${apache.rocketmq.namesrvAddr}")

  24.    private String namesrvAddr;

  25.    @PostConstruct

  26.    public void defaultMQProducer() {

  27.        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);

  28.        defaultMQProducer.setNamesrvAddr(namesrvAddr);

  29.        try {

  30.            defaultMQProducer.start();

  31.            Message message = new Message("TopicTest", "TagA",

  32.                                          "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

  33.            for (int i = 0; i < 100; i++) {

  34.                SendResult sendResult = defaultMQProducer.send(message);

  35.                System.out.println("傳送訊息結果, msgId:" + sendResult.getMsgId() +

  36.                                   ", 傳送狀態:" + sendResult.getSendStatus());

  37.            }

  38.        } catch (MQClientException | UnsupportedEncodingException | InterruptedException

  39.            | RemotingException | MQBrokerException e) {

  40.            e.printStackTrace();

  41.        } finally {

  42.            defaultMQProducer.shutdown();

  43.        }

  44.    }

  45. }

生產者中有兩個屬性:

  • name server的地址,用於獲得broker的相關資訊

  • 生產者集合producerGroup,在同一個producer group中有不同的producer實體,如果最早一個producer奔潰,則broker會通知該組內的其他producer實體進行事務提交或回滾。

RocketMQ中的訊息,使用Message表示,程式碼定義如下:

  1. public class Message implements Serializable {

  2.    private static final long serialVersionUID = 8445773977080406428L;

  3.    private String topic;

  4.    private int flag;

  5.    private Map<String, String> properties;

  6.    private byte[] body;

  7.    public Message() {

  8.    }

  9.    //省略了getter和setter方法

  10. }

  • topic:該訊息將要往哪個topic發

  • flag:可以用作訊息過濾

  • properties:暫時沒理解【TODO】

  • body:訊息內容

每個訊息傳送完後,會得到一個SendResult物件,看下該物件的結構:

  1. public class SendResult {

  2.    //傳送狀態

  3.    private SendStatus sendStatus;

  4.    //訊息ID,用於訊息去重、訊息跟蹤

  5.    private String msgId;

  6.    private MessageQueue messageQueue;

  7.    private long queueOffset;

  8.    //事務ID

  9.    private String transactionId;

  10.    private String offsetMsgId;

  11.    private String regionId;

  12.    //是否需要跟蹤

  13.    private boolean traceOn = true;

  14.    public SendResult() {

  15.    }

  16.    //省略了建構式、getter和setter等一系列方法

  17. }

在這個demo中,我們是將訊息內容和訊息狀態一併列印到控制檯。

訊息傳送原始碼分析

在RocketMQ中的client模組的包結構如下,可以看出,作者並沒有將介面的定義和實現放在一個包下(這在我們的業務應用中是常見的做法,不一定合理)。producer和consumer包下分別定義了生產者和消費者的介面,將具體的實現放在impl包中。

首先關註producer包裡的內容,幾個主要的類如下:DefaultMQProducer是生產者的預設實現、MQAdmin用於定義一些管理介面、MQProducer用於定義一些生產者特有的介面。

在ProducerDemo中,透過`defaultMQProducer.start();啟動生產者,接下來看下start()方法的過程:

  • 根據服務狀態決定接下來的動作

  • 對於CREATE_JUST狀態

  • 設定服務狀態

  • 檢查配置

  • 獲取或建立MQClientInstance實體

  • 將生產者註冊到指定的producerGroup,即producerTable這個資料結構中,是一個map

  • 填充topicPublishInfoTable資料結構

  • 啟動生產者

  • 對於RUNNING、STARTFAILED和SHUTDOWNALREADY,丟擲異常

  1. public void start(final boolean startFactory) throws MQClientException {

  2.         //根據當前的服務狀態決定接下來的動作

  3.        switch (this.serviceState) {

  4.            case CREATE_JUST:

  5.                this.serviceState = ServiceState.START_FAILED;

  6.                this.checkConfig();

  7.                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

  8.                    this.defaultMQProducer.changeInstanceNameToPID();

  9.                }

  10.                //建立一個客戶端工廠

  11.                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

  12.                //將生產者註冊到指定producer group

  13.                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

  14.                if (!registerOK) {

  15.                    this.serviceState = ServiceState.CREATE_JUST;

  16.                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()

  17.                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

  18.                        null);

  19.                }

  20.                //填充topicPublishInfoTable

  21.                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

  22.                if (startFactory) {

  23.                    mQClientFactory.start();

  24.                }

  25.                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),

  26.                    this.defaultMQProducer.isSendMessageWithVIPChannel());

  27.                this.serviceState = ServiceState.RUNNING;

  28.                break;

  29.            case RUNNING:

  30.            case START_FAILED:

  31.            case SHUTDOWN_ALREADY:

  32.                throw new MQClientException("The producer service state not OK, maybe started once, "

  33.                    + this.serviceState

  34.                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

  35.                    null);

  36.            default:

  37.                break;

  38.        }

  39.         //給該producer連線的所有broker傳送心跳訊息

  40.        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  41.    }

順著 mQClientFactory.start()往下跟,可以進一步瞭解生產者的細節,主要步驟有:

  • 建立請求響應通道

  • 啟動各種定時任務,例如:每隔2分鐘向name server拉取一次broker叢集的地址,這意味著如果某個broker宕機了,生產者在這兩分鐘之內的訊息是投遞失敗的;定期從name server拉取topic等路由資訊;定期清理失效的broker以及向broker傳送心跳訊息等。

  • 啟動拉服務、負載均衡服務、推服務等服務,這三個服務跟消費者有關。這裡設計上不太明瞭,將消費者和生產者的啟動邏輯放在一起了。看pullMessageService和rebalanceService和初始化,它們是根據MQClientInstance初始化的,而MQClientInstance又是根據ClientConfig來配置的。

  1.  public void start() throws MQClientException {

  2.        synchronized (this) {

  3.            switch (this.serviceState) {

  4.                case CREATE_JUST:

  5.                    this.serviceState = ServiceState.START_FAILED;

  6.                    // If not specified,looking address from name server

  7.                    if (null == this.clientConfig.getNamesrvAddr()) {

  8.                        this.mQClientAPIImpl.fetchNameServerAddr();

  9.                    }

  10.                    // Start request-response channel

  11.                    this.mQClientAPIImpl.start();

  12.                    // Start various schedule tasks

  13.                    this.startScheduledTask();

  14.                    // Start pull service

  15.                    this.pullMessageService.start();

  16.                    // Start rebalance service

  17.                    this.rebalanceService.start();

  18.                    // Start push service

  19.                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

  20.                    log.info("the client factory [{}] start OK", this.clientId);

  21.                    this.serviceState = ServiceState.RUNNING;

  22.                    break;

  23.                case RUNNING:

  24.                    break;

  25.                case SHUTDOWN_ALREADY:

  26.                    break;

  27.                case START_FAILED:

  28.                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

  29.                default:

  30.                    break;

  31.            }

  32.        }

  33.    }

生產者啟動後,接下來看下訊息的傳送過程,如下圖所示,DefaultMQProducer提供了很多傳送訊息的方法,可以實現同步發訊息、非同步發訊息、指定訊息佇列、OneWay訊息、事務訊息等。

這裡我們只看最簡單的 send(Messagemessage)方法,最終在DefaultMQProducerImpl中實現:

  1.    private SendResult sendDefaultImpl(

  2.        Message msg,

  3.        final CommunicationMode communicationMode,

  4.        final SendCallback sendCallback,

  5.        final long timeout

  6.    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

  7.        //確認生產者狀態正常

  8.        this.makeSureStateOK();

  9.        //檢查訊息的合法性

  10.        Validators.checkMessage(msg, this.defaultMQProducer);

  11.        final long invokeID = random.nextLong();

  12.        long beginTimestampFirst = System.currentTimeMillis();

  13.        long beginTimestampPrev = beginTimestampFirst;

  14.        long endTimestamp = beginTimestampFirst;

  15.        //獲取訊息的目的地:Topic資訊

  16.        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

  17.        if (topicPublishInfo != null && topicPublishInfo.ok()) {

  18.            MessageQueue mq = null;

  19.            Exception exception = null;

  20.            SendResult sendResult = null;

  21.            //計算出訊息的投遞次數,如果是同步投遞,則是1+重試次數,如果不是同步投遞,則只需要投遞一次

  22.            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

  23.            int times = 0;

  24.            String[] brokersSent = new String[timesTotal];

  25.            //一個broker叢集有不同的broker節點,lastBrokerName記錄了上次投遞的broker節點,每個broker節點

  26.            for (; times < timesTotal; times++) {

  27.                String lastBrokerName = null == mq ? null : mq.getBrokerName();

  28.                //選擇一個要傳送的訊息佇列

  29.                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

  30.                if (mqSelected != null) {

  31.                    mq = mqSelected;

  32.                    brokersSent[times] = mq.getBrokerName();

  33.                    try {

  34.                        beginTimestampPrev = System.currentTimeMillis();

  35.                        //投遞訊息

  36.                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);

  37.                        endTimestamp = System.currentTimeMillis();

  38.                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

  39.                        //根據訊息傳送樣式,對訊息傳送結果做不同的處理

  40.                        switch (communicationMode) {

  41.                            case ASYNC:

  42.                                return null;

  43.                            case ONEWAY:

  44.                                return null;

  45.                            case SYNC:

  46.                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {

  47.                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {

  48.                                        continue;

  49.                                    }

  50.                                }

  51.                                return sendResult;

  52.                            default:

  53.                                break;

  54.                        }

  55.                    } catch (RemotingException e) {

  56.                        endTimestamp = System.currentTimeMillis();

  57.                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

  58.                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

  59.                        log.warn(msg.toString());

  60.                        exception = e;

  61.                        continue;

  62.                    } catch (MQClientException e) {

  63.                        endTimestamp = System.currentTimeMillis();

  64.                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

  65.                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

  66.                        log.warn(msg.toString());

  67.                        exception = e;

  68.                        continue;

  69.                    } catch (MQBrokerException e) {

  70.                        endTimestamp = System.currentTimeMillis();

  71.                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

  72.                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

  73.                        log.warn(msg.toString());

  74.                        exception = e;

  75.                        switch (e.getResponseCode()) {

  76.                            case ResponseCode.TOPIC_NOT_EXIST:

  77.                            case ResponseCode.SERVICE_NOT_AVAILABLE:

  78.                            case ResponseCode.SYSTEM_ERROR:

  79.                            case ResponseCode.NO_PERMISSION:

  80.                            case ResponseCode.NO_BUYER_ID:

  81.                            case ResponseCode.NOT_IN_CURRENT_UNIT:

  82.                                continue;

  83.                            default:

  84.                                if (sendResult != null) {

  85.                                    return sendResult;

  86.                                }

  87.                                throw e;

  88.                        }

  89.                    } catch (InterruptedException e) {

  90.                        endTimestamp = System.currentTimeMillis();

  91.                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

  92.                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

  93.                        log.warn(msg.toString());

  94.                        log.warn("sendKernelImpl exception", e);

  95.                        log.warn(msg.toString());

  96.                        throw e;

  97.                    }

  98.                } else {

  99.                    break;

  100.                }

  101.            }

  102.            if (sendResult != null) {

  103.                return sendResult;

  104.            }

  105.            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",

  106.                times,

  107.                System.currentTimeMillis() - beginTimestampFirst,

  108.                msg.getTopic(),

  109.                Arrays.toString(brokersSent));

  110.            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

  111.            MQClientException mqClientException = new MQClientException(info, exception);

  112.            if (exception instanceof MQBrokerException) {

  113.                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());

  114.            } else if (exception instanceof RemotingConnectException) {

  115.                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);

  116.            } else if (exception instanceof RemotingTimeoutException) {

  117.                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);

  118.            } else if (exception instanceof MQClientException) {

  119.                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);

  120.            }

  121.            throw mqClientException;

  122.        }

  123.        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();

  124.        if (null == nsList || nsList.isEmpty()) {

  125.            throw new MQClientException(

  126.                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);

  127.        }

  128.        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),

  129.            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

  130.    }

傳送訊息的主要過程如下:

  • 首先檢查生產者和訊息的合法性

  • 然後獲取訊息傳送的資訊,該資訊存放在TopicPublishInfo物件中:

  1. public class TopicPublishInfo {

  2.    //是否順序訊息

  3.    private boolean orderTopic = false;

  4.    private boolean haveTopicRouterInfo = false;

  5.    //維護該topic下用於的訊息佇列串列

  6.    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();

  7.    //計算下一次該投遞的佇列,這裡應用ThreadLocal,即使是同一臺機器中,每個producer實體都有自己的佇列

  8.    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

  9.    private TopicRouteData topicRouteData;

  10.    //省略了getter和setter方法

  11.    //選擇指定lastBrokerName上的下一個mq

  12.    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {

  13.        if (lastBrokerName == null) {

  14.            return selectOneMessageQueue();

  15.        } else {

  16.            int index = this.sendWhichQueue.getAndIncrement();

  17.            for (int i = 0; i < this.messageQueueList.size(); i++) {

  18.                int pos = Math.abs(index++) % this.messageQueueList.size();

  19.                if (pos < 0)

  20.                    pos = 0;

  21.                MessageQueue mq = this.messageQueueList.get(pos);

  22.                if (!mq.getBrokerName().equals(lastBrokerName)) {

  23.                    return mq;

  24.                }

  25.            }

  26.            return selectOneMessageQueue();

  27.        }

  28.    }

  29.    //選擇當前broker節點的下一個mq

  30.    public MessageQueue selectOneMessageQueue() {

  31.        int index = this.sendWhichQueue.getAndIncrement();

  32.        int pos = Math.abs(index) % this.messageQueueList.size();

  33.        if (pos < 0)

  34.            pos = 0;

  35.        return this.messageQueueList.get(pos);

  36.    }

  37. }

  • 選擇要傳送給該topic下的那個MessageQueue,選擇的邏輯分兩種情況:(1)預設情況,在上次投遞的broker節點上,輪詢到下一個message queue來傳送;(2)sendLatencyFaultEnable這個值設定為true的時候,這塊沒太看懂。

  • 投遞訊息

  • 根據訊息佇列執行樣式,針對投遞結果做不同的處理。

二、訊息消費

消費者Demo

消費者裡有個屬性需要看下:

  • consumerGroup:位於同一個consumerGroup中的consumer實體和producerGroup中的各個produer實體承擔的角色類似;consumerGroup中的實體還可以實現負載均衡和容災。PS:處於同一個consumerGroup裡的consumer實體一定是訂閱了同一個topic。

  • nameServer的地址:name server地址,用於獲取broker、topic資訊

消費者Demo裡做了以下幾個事情:

  • 設定配置屬性

  • 設定訂閱的topic,可以指定tag

  • 設定第一次啟動的時候,從message queue的哪裡開始消費

  • 設定訊息處理器

  • 啟動消費者

  1. package com.javadu.chapter8rocketmq.message;

  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

  5. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

  6. import org.apache.rocketmq.common.message.MessageExt;

  7. import org.apache.rocketmq.remoting.common.RemotingHelper;

  8. import org.springframework.beans.factory.annotation.Value;

  9. import org.springframework.stereotype.Component;

  10. import javax.annotation.PostConstruct;

  11. /**

  12. * 作用:

  13. * User: duqi

  14. * Date: 2018/3/29

  15. * Time: 14:00

  16. */

  17. @Component

  18. public class ConsumerDemo {

  19.    /**

  20.     * 消費者的組名

  21.     */

  22.    @Value("${apache.rocketmq.consumer.consumerGroup}")

  23.    private String consumerGroup;

  24.    /**

  25.     * NameServer 地址

  26.     */

  27.    @Value("${apache.rocketmq.namesrvAddr}")

  28.    private String namesrvAddr;

  29.    @PostConstruct

  30.    public void defaultMQPushConsumer() {

  31.        //消費者的組名

  32.        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

  33.        //指定NameServer地址,多個地址以 ; 隔開

  34.        consumer.setNamesrvAddr(namesrvAddr);

  35.        try {

  36.            //訂閱PushTopic下Tag為push的訊息

  37.            consumer.subscribe("TopicTest", "TagA");

  38.            //設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費

  39.            //如果非第一次啟動,那麼按照上次消費的位置繼續消費

  40.            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  41.            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {

  42.                try {

  43.                    for (MessageExt messageExt : list) {

  44.                        //輸出訊息內容

  45.                        System.out.println("messageExt: " + messageExt);

  46.                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

  47.                        //輸出訊息內容

  48.                        System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);

  49.                    }

  50.                } catch (Exception e) {

  51.                    e.printStackTrace();

  52.                    //稍後再試

  53.                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

  54.                }

  55.                //消費成功

  56.                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

  57.            });

  58.            consumer.start();

  59.        } catch (Exception e) {

  60.            e.printStackTrace();

  61.        }

  62.    }

  63. }

消費者原始碼分析

前面分析過了,RocketMQ中的client模組統一提供了生產者和消費者客戶端,這塊我們看下消費者的幾個主要的類。前面提到過,RocketMQ實際上都是拉樣式,這裡的DefaultMQPushConsumer實現了推樣式,也只是對拉訊息服務做了一層封裝,即拉到訊息的時候觸發業務消費者註冊到這裡的callback,而具體拉訊息的服務是由PullMessageService實現的,這個細節後續再研究。

在ConsumerDemo中,設定好配置資訊後,會進行topic訂閱,呼叫了DefaultMQPushConsumer的subscribe方法,原始碼如下:

  1.    /**

  2.     * Subscribe a topic to consuming subscription.

  3.     *

  4.     * @param topic topic to subscribe.

  5.     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"

  6.     * if null or * expression,meaning subscribe all

  7.     * @throws MQClientException if there is any client error.

  8.     */

  9.    @Override

  10.    public void subscribe(String topic, String subExpression) throws MQClientException {

  11.        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);

  12.    }

第一個引數是topic資訊,第二個引數用於用於訊息過濾tag欄位。真正的訂閱發生在DefaultMQPushConsumerImpl中,程式碼如下:

  1.    public void subscribe(String topic, String subExpression) throws MQClientException {

  2.        try {

  3.            //構建包含訂閱資訊的物件,並放入負載平衡元件維護的map中,以topic為key

  4.            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),

  5.                topic, subExpression);

  6.            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

  7.            //如果已經跟broker叢集建立連線,則給所有的broker節點傳送心跳訊息

  8.            if (this.mQClientFactory != null) {

  9.                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  10.            }

  11.        } catch (Exception e) {

  12.            throw new MQClientException("subscription exception", e);

  13.        }

  14.    }

在ConsumerDemo中,接下里會設定消費者首次啟動時消費訊息的起始位置,這涉及到DefaultMQPushConsumer中的一個屬性——consumeFromWhere,這個值有三個可能的值

  • CONSUMEFROMLAST_OFFSET,預設值,表示從上次停止時的地方開始消費

  • CONSUMEFROMFIRST_OFFSET,從佇列的頭部開始消費

  • CONSUMEFROMTIMESTAMP,從指定的時間點開始消費

ConsumerDemo接下來會註冊一個callback,當訊息到達的時候就處理訊息(最新的訊息監聽者支援併發消費):

  1.    /**

  2.     * Register a callback to execute on message arrival for concurrent consuming.

  3.     *

  4.     * @param messageListener message handling callback.

  5.     */

  6.    @Override

  7.    public void registerMessageListener(MessageListenerConcurrently messageListener) {

  8.        this.messageListener = messageListener;

  9.        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);

  10.    }

最後,我們看下ConsumerDemo的啟動過程,即DefaultMQPushConsumerImpl的start方法,主要做了下麵幾件事:

  • 檢查配置

  • 將訂閱資訊複製到負載均衡元件(rebalanceImpl)中;

  • 負載均衡元件的幾個屬性的設定

  • 處理不同訊息樣式(叢集樣式或廣播樣式)的配置

  • 處理順序消費和併發消費的不同配置

  • 將消費者資訊和consumer group註冊到MQ客戶端實體的consumerTable中

  • 啟動消費者客戶端

參考資料

  1. 分散式開放訊息系統(RocketMQ)的原理與實踐

  2. 買好車提供的rocketmq-spring-boot-starter

  3. Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖