歡迎光臨
每天分享高質量文章

說說 MQ 之 RocketMQ ( 三 )

(點選上方公眾號,可快速關註)


來源:Valleylord ,

valleylord.github.io/post/201607-mq-rocketmq/

RocketMQ 的主備樣式

按之前所說,只有 RocketMQ 的多主多從非同步複製是可以生產使用的,因此只在這個場景下測試。另外,訊息採用 Push 順序樣式消費。

假設叢集採用2主2備的樣式,需要啟動4個 Broker,配置檔案如下,

brokerName=broker-a

brokerId=0

listenPort=10911

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog

brokerRole=ASYNC_MASTER

brokerName=broker-a

brokerId=1

listenPort=10921

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slave

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlog

brokerRole=SLAVE

brokerName=broker-b

brokerId=0

listenPort=20911

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlog

brokerRole=ASYNC_MASTER

brokerRole=ASYNC_MASTER

brokerName=broker-b

brokerId=1

listenPort=20921

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slave

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlog

brokerRole=SLAVE

另外,每個機構共通的配置項如下,

brokerClusterName=DefaultCluster

brokerIP1=192.168.232.23

namesrvAddr=192.168.232.23:9876

deleteWhen=04

fileReservedTime=120

flushDiskType=ASYNC_FLUSH

其他設定均採用預設。啟動 NameServer 和所有 Broker,並試執行一下 Producer,然後看一下 TestTopic1 當前的情況,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1

{

        “brokerDatas”:[

                {

                        “brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″

                        },

                        “brokerName”:”broker-b”

                },

                {

                        “brokerAddrs”:{0:”192.168.232.23:10911″,1:”192.168.232.23:10921″

                        },

                        “brokerName”:”broker-a”

                }

        ],

        “filterServerTable”:{},

        “queueDatas”:[

                {

                        “brokerName”:”broker-a”,

                        “perm”:6,

                        “readQueueNums”:4,

                        “topicSynFlag”:0,

                        “writeQueueNums”:4

                },

                {

                        “brokerName”:”broker-b”,

                        “perm”:6,

                        “readQueueNums”:4,

                        “topicSynFlag”:0,

                        “writeQueueNums”:4

                }

        ]

}

可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機也在執行。下麵開始主備切換的實驗,分別啟動 Consumer 和 Producer 行程,訊息採用 Pull 順序樣式消費。在訊息傳送接收過程中,使用 kill -9 停掉 broker-a 的主行程,模擬突然宕機。此時,TestTopic1 的狀態如下,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1

{

        “brokerDatas”:[

                {

                        “brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″

                        },

                        “brokerName”:”broker-b”

                },

                {

                        “brokerAddrs”:{1:”192.168.232.23:10921″

                        },

                        “brokerName”:”broker-a”

                }

        ],

        “filterServerTable”:{},

        “queueDatas”:[

                {

                        “brokerName”:”broker-a”,

                        “perm”:6,

                        “readQueueNums”:4,

                        “topicSynFlag”:0,

                        “writeQueueNums”:4

                },

                {

                        “brokerName”:”broker-b”,

                        “perm”:6,

                        “readQueueNums”:4,

                        “topicSynFlag”:0,

                        “writeQueueNums”:4

                }

        ]

}

broker-a 的節點已經減少為只有1個從節點。然後啟動broker-a 的主節點,模擬恢復,再看一下 TestTopic1 的狀態,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1

{

        “brokerDatas”:[

                {

                        “brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″

                        },

                        “brokerName”:”broker-b”

                },

                {

                        “brokerAddrs”:{0:”192.168.232.23:10911″,1:”192.168.232.23:10921″

                        },

                        “brokerName”:”broker-a”

                }

        ],

        “filterServerTable”:{},

        “queueDatas”:[

                {

                        “brokerName”:”broker-a”,

                        “perm”:6,

                        “readQueueNums”:4,

                        “topicSynFlag”:0,

                        “writeQueueNums”:4

                },

                {

                        “brokerName”:”broker-b”,

                        “perm”:6,

                        “readQueueNums”:4,

                        “topicSynFlag”:0,

                        “writeQueueNums”:4

                }

        ]

}

此時,RocketMQ 已經恢復。

再來看看 Producer 和 Consumer 的日誌,先看 Producer 的,如下,

……

00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]

00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]

00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]

00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]

00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]

00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]

00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]

00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]

00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]

00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]

00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]

00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]

00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]

00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]

00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]

00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]

……

01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]

01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]

01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]

01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]

01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]

01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]

01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]

01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]

01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]

日誌中顯示,在傳送完00583條訊息之後,開始發生異常 connect to <192.168.232.23:10911> failed,原因應該是 broker-a 的主節點被 kill 掉。之後,從00596條訊息開始,RocketMQ 又恢復正常,原因是 broker-b 已經開始提供服務,承擔了所有的工作。然後,又重新啟動了 broker-a 主節點,由於該節點的加入,從01392條訊息開始,broker-a 又開始恢復工作。實驗中可以驗證,RocketMQ 所謂的多主多備樣式,實際上,備機被弱化到無以復加,在主節點宕機的時候,備機無法接替主機的工作,而只是將尚未傳送的資料發送出去,由剩下的主節點接替工作。也就是說,N 主 N 備的 RocketMQ 叢集中,總共有 2N 臺機器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機器的利用率太低了。

再來看一下 Consumer 的日誌,如下,

RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, 



RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

詳細日誌請檢視原文。

可以看到,Consumer 在 broker-a 宕機時間的附近,也出現了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分割槽上的順序性,但是已經某種程度上出現了一些紊亂,例如,將我在實驗之前的資料給取了出來(Hello MetaQ的訊息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,並未刪除 Topic 的資料。之後,broker-a 主機重啟之後,又恢復正常。

RocketMQ Pull樣式消費需要手動管理 offset 和指定分割槽,這個在呼叫的時候不覺得,實際執行的時候才會發現每次總是消費一個分割槽,消費完之後,才開始消費下一個分割槽,而下一個分割槽可能已經堆積了很多訊息了,手動做訊息分配又比較費事。或許,Push 順序樣式消費才是更好的選擇。

另外還有幾個比較異常的情況,實驗中有幾次出現了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 這樣的錯誤,實際上,這時候我只是關掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明檔案中說可以用來新增 Topic,而實際上不行。

補充一下:之後,我又使用 Push 順序樣式消費重做了上述實驗,結論差不多。只是因為有多執行緒的原因,日誌看起來偶爾有錯位,這個問題不大,可以解決。而且,在關閉重啟 Broker 的附近,往往伴隨著多次的訊息重發,不過,RocketMQ 也不保證訊息只收到一次就是了。訊息重覆的問題,Kafka 要比 RocketMQ 顯得不那麼嚴重一些。Push 順序樣式消費不需要指定 offset,不需要指定分割槽,第二次啟動可以自動從前一次的 offset 後開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 採用的是非同步樣式。

RocketMQ 最佳實踐

實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的檔案,裡面提到了一些系統設計的問題,例如消費者要冪等,一個應用對應一個 Topic,如此等等。這些經驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑒意義。

後記

這裡談談我對選擇 RocketMQ 還是 Kafka 的個人建議。以上已經做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷發展中的系統,開源社群比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經有了同步複製,訊息可靠性更有保障;還有,Kafka 的分割槽機制,幾乎實現了自動負載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠超出 Kafka,但這些功能並不一定都能用得上,而且多數可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復的時候,細節上要勝過 RocketMQ。當然,如果是 A 公司內部,或者所在公司使用了 A 公司的雲產品,那麼 RocketMQ 的企業級特性更多一些,或許我會選擇 RocketMQ。

系列

【關於投稿】


如果大家有原創好文投稿,請直接給公號傳送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章連結

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂