歡迎光臨
每天分享高質量文章

說說 MQ 之 RocketMQ ( 二 )

(點擊上方公眾號,可快速關註)


來源:Valleylord ,

valleylord.github.io/post/201607-mq-rocketmq/

RocketMQ 的 Java API

RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,

  1. 廣播消費,這個在之前已經提到過;

  2. 訊息過濾,支持簡單的 Message Tag 過濾,也支持按 Message Header、body 過濾;

  3. 順序消費和亂序消費,之前也提到過,這裡的順序消費應該指的是普通順序性,這一點與 Kafka 相同;

  4. Pull 樣式消費,這個是相對 Push 樣式來說的,Kafka 就是 Pull 樣式消費;

  5. 事務訊息,這個好像沒有開源,但是 example 代碼中有示例,總之,不推薦用;

  6. Tag,RocketMQ 在 Topic 下麵又分了一層 Tag,用於表示訊息類別,可以用來過濾,但是順序性還是以 Topic 來看;

單看功能的話,即使不算事務訊息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 樣式消費 + 順序消費這2個功能。RocketMQ 的代碼示例在 rocketmq-example 中,註意,代碼是不能直接運行的,因為所有的代碼都少了設置 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr(“192.168.232.23:9876”);。

先來看一下生產者的 API,比較簡單,只有一種,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.MessageQueueSelector;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

import com.alibaba.rocketmq.common.message.MessageQueue;

import java.util.List;

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);

        producer.setNamesrvAddr(“192.168.232.23:9876”);

        producer.start();

        for (int i = 0; i < 10; i++)

            try {

                {

                    Message msg = new Message(“TopicTest1”,// topic

                        “TagA”,// tag

                        “OrderID188”,// key

                        (“RocketMQ “+String.format(“%05d”, i)).getBytes());// body

                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

                        @Override

                        public MessageQueue select(List mqs, Message msg, Object arg) {

                            Integer id = (Integer) arg;

                            int index = id % mqs.size();

                            return mqs.get(index);

                        }

                    }, i));

                    System.out.println(String.format(“%05d”, i)+sendResult);

                }

            }

            catch (Exception e) {

                e.printStackTrace();

            }

        producer.shutdown();

    }

}

可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和異步兩種樣式,但都是阻塞樣式,send 方法傳回發送狀態的 Future,可以通過 Future 的 get 方法阻塞獲得發送狀態。而 RocketMQ 採用的是同步非阻塞樣式,發送之後立刻傳回發送狀態(而不是 Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成並重連,最後傳回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。

另外,RocketMQ 可以通過指定 MessageQueueSelector 類的實現來指定將訊息發送到哪個分割槽去,Kafka 是通過指定生產者的 partitioner.class 引數來實現的,靈活性上 RocketMQ 略勝一籌。

再來看消費者的API,由於 RocketMQ 的功能比較多,我們先看 Pull 樣式消費的API,如下,

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.consumer.store.OffsetStore;

import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.message.MessageQueue;

public class PullConsumer {

    private static final Map offseTable = new HashMap();

    public static void main(String[] args) throws MQClientException {

        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(“please_rename_unique_group_name_5”);

        consumer.setNamesrvAddr(“192.168.232.23:9876”);

        consumer.start();

        Set mqs = consumer.fetchSubscribeMessageQueues(“TopicTest1”);

        for (MessageQueue mq : mqs) {

            System.out.println(“Consume from the queue: ” + mq);

            SINGLE_MQ: while (true) {

                try {

                    long offset = consumer.fetchConsumeOffset(mq, true);

                    PullResult pullResult =

                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

                    if (null != pullResult.getMsgFoundList()) {

                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {

                            System.out.print(new String(messageExt.getBody()));

                            System.out.print(pullResult);

                            System.out.println(messageExt);

                        }

                    }

                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

                    switch (pullResult.getPullStatus()) {

                    case FOUND:

                        // TODO

                        break;

                    case NO_MATCHED_MSG:

                        break;

                    case NO_NEW_MSG:

                        break SINGLE_MQ;

                    case OFFSET_ILLEGAL:

                        break;

                    default:

                        break;

                    }

                }

                catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }

        consumer.shutdown();

    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {

        offseTable.put(mq, offset);

    }

    private static long getMessageQueueOffset(MessageQueue mq) {

        Long offset = offseTable.get(mq);

        if (offset != null)

            return offset;

        return 0;

    }

}

這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分割槽,而 Kafka 可以自動管理(當然也可以手動管理),並且不需要指定分割槽(分割槽是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 接口,提供了兩種管理方式,本地檔案和遠程 Broker。這部分感覺兩者差不多。

下麵再看看 Push 樣式順序消費,代碼如下,

import java.util.List;

import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name_3”);

        consumer.setNamesrvAddr(“192.168.232.23:9876”);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe(“TopicTest1”, “TagA || TagC || TagD”);

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);

            @Override

            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {

                context.setAutoCommit(false);

                System.out.println(Thread.currentThread().getName() + ” Receive New Messages: ” + msgs);

                this.consumeTimes.incrementAndGet();

                if ((this.consumeTimes.get() % 2) == 0) {

                    return ConsumeOrderlyStatus.SUCCESS;

                }

                else if ((this.consumeTimes.get() % 3) == 0) {

                    return ConsumeOrderlyStatus.ROLLBACK;

                }

                else if ((this.consumeTimes.get() % 4) == 0) {

                    return ConsumeOrderlyStatus.COMMIT;

                }

                else if ((this.consumeTimes.get() % 5) == 0) {

                    context.setSuspendCurrentQueueTimeMillis(3000);

                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

                }

                return ConsumeOrderlyStatus.SUCCESS;

            }

        });

        consumer.start();

        System.out.println(“Consumer Started.”);

    }

}

雖然提供了 Push 樣式,RocketMQ 內部實際上還是 Pull 樣式的 MQ,Push 樣式的實現應該採用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個註意的地方,

  1. 接收訊息的監聽類要使用 MessageListenerOrderly;

  2. ConsumeFromWhere 有幾個引數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;

  3. 可以控制 offset 的提交,應該就是 context.setAutoCommit(false); 的作用;

控制 offset 提交這個特性非常有用,某種程度上擴展一下,就可以當做事務來用了,看代碼 ConsumeMessageOrderlyService 的實現,其實並沒有那麼複雜,在不啟用 AutoCommit 的時候,只有傳回 COMMIT 才 commit offset;啟用 AutoCommit 的時候,傳回 COMMIT、ROLLBACK(這個比較扯)、SUCCESS 的時候,都 commit offset。

後來發現,commit offset 功能在 Kafka 裡面也有提供,使用新的 API,呼叫 consumer.commitSync。

再看一個 Push 樣式亂序消費 + 訊息過濾的例子,消費者的代碼如下,

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“ConsumerGroupNamecc4”);

        consumer.setNamesrvAddr(“192.168.232.23:9876”);

        consumer.subscribe(“TopicTest1”, MessageFilterImpl.class.getCanonicalName());

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List msgs,

                    ConsumeConcurrentlyContext context) {

                System.out.println(Thread.currentThread().getName() + ” Receive New Messages: ” + msgs);

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

        consumer.start();

        System.out.println(“Consumer Started.”);

    }

}

這個例子與之前順序消費不同的地方在於,

  1. 接收訊息的監聽類使用的是 MessageListenerConcurrently;

  2. 回呼方法中,使用的是自動 offset commit;

  3. 訂閱的時候增加了訊息過濾類 MessageFilterImpl;

訊息過濾類 MessageFilterImpl 的代碼如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;

import com.alibaba.rocketmq.common.message.MessageExt;

public class MessageFilterImpl implements MessageFilter {

    @Override

    public boolean match(MessageExt msg) {

        String property = msg.getUserProperty(“SequenceId”);

        if (property != null) {

            int id = Integer.parseInt(property);

            if ((id % 3) == 0 && (id > 10)) {

                return true;

            }

        }

        return false;

    }

}

RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾行程;Consumer 啟動後,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照 Consumer 上傳的 Java 過濾程式做過濾,過濾完成後傳回給 Consumer。這種過濾方法可以節省網絡流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。

還有一種廣播消費樣式,比較簡單,可以去看代碼,不再列出。

總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。

【關於投稿】


如果大家有原創好文投稿,請直接給公號發送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂