歡迎光臨
每天分享高質量文章

說說 MQ 之 Kafka ( 三 )

(點選上方公眾號,可快速關註)


來源:Valleylord ,

valleylord.github.io/post/201607-mq-kafka/

Kafka 副本和叢集

在生產環境中,Kafka 總是以“叢集+分割槽”方式執行的,以保證可靠性和效能。下麵是一個3副本的 Kafka 叢集實體。

首先,需要啟動3個 Kafka Broker,Broker 的配置檔案分別如下,

broker.id=0

listeners=PLAINTEXT://192.168.232.23:9092

log.dirs=/tmp/kafka-logs

broker.id=1

listeners=PLAINTEXT://192.168.232.23:9093

log.dirs=/tmp/kafka-logs-1

broker.id=1

listeners=PLAINTEXT://192.168.232.23:9094

log.dirs=/tmp/kafka-logs-2

雖然每個 Broker 只配置了一個埠,實際上,Kafka 會多佔用一個,可能是用來 Broker 之間的複製的。另外,3個 Broker 都配置了,

zookeeper.connect=localhost:2181

delete.topic.enable=true

在同一個 Zookeeper 上的 Broker 會被歸類到一個叢集中。註意,這些配置中並沒有指定哪一個 Broker 是主節點,哪些 Broker 是從節點,Kafka 採用的辦法是從可選的 Broker 中,選出每個分割槽的 Leader。也就是說,對某個 Topic 來說,可能0節點是 Leader,另外一些 Topic,可能1節點是 Leader;甚至,如果 topic1 有2個分割槽的話,分割槽1的 Leader 是0節點,分割槽2的 Leader 是1節點。

這種對等的設計,對於故障恢復是十分有用的,在節點崩潰的時候,Kafka 會自動選舉出可用的從節點,將其升級為主節點。在崩潰的節點恢復,加入叢集之後,Kafka 又會將這個節點加入到可用節點,並自動選舉出新的主節點。

實驗如下,先新建一個3副本,2分割槽的 Topic,

bin/kafka-topics.sh –create –zookeeper 192.168.232.23:2181 –replication-factor 3 –partitions 2 –topic topic1

初始狀況下,topic1 的狀態如下,

$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

對於上面的輸出,即使沒有檔案,也可以看懂大概:topic1 有2個分割槽,Partition 0 和 Partition 1,Leader 分別在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示處於同步狀態中的 Broker,如果有 Broker 宕機了,那麼 Replicas 不會變,但是 Isr 會僅顯示沒有宕機的 Broker,詳見下麵的實驗。

然後分2個執行緒,執行之前寫的 Producer 和 Consumer 的示例程式碼,Producer 採用非同步傳送,訊息採用同步複製。在有訊息傳送的情況下,kill -9 停掉其中2個 Broker(Broker 0 和 Broker 1),模擬突然宕機。此時,topic1 狀態如下,

$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2

        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2

可見,Kafka 已經選出了新的 Leader,訊息傳送沒有中斷。接著再啟動被停掉的那兩個 Broker,並檢視 topic1 的狀態,如下,

$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0

        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2,1,0

$ bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1

Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:

        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0

        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 2,1,0

可以發現, 有一個短暫的時間,topic1 的兩個分割槽的 Leader 都是 Broker 2,但是在 Kafka 重新選舉之後,分割槽1的 Leader 變為 Broker 1。說明 Kafka 傾向於用不同的 Broker 做分割槽的 Leader,這樣更能達到負載均衡的效果。

再來看看 Producer 和 Consumer 的日誌,下麵這個片段是2個 Broker 宕機前後的日誌,

……

Send     message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms

Received message: (00438, Message_00438) at offset 216

Send     message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms

Send     message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms

Received message: (00441, Message_00441) at offset 221

Received message: (00439, Message_00439) at offset 217

Send     message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms

Send     message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms

Received message: (00440, Message_00440) at offset 218

Received message: (00443, Message_00443) at offset 219

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

Received message: (00442, Message_00442) at offset 222

Send     message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms

Send     message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms

Send     message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms

Send     message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms

Send     message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms

Send     message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms

Send     message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms

Send     message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms

Send     message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms

Send     message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms

……

Send     message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms

Received message: (00631, Message_00631) at offset 310

Received message: (00633, Message_00633) at offset 311

Send     message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms

Received message: (00634, Message_00634) at offset 312

Send     message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms

Received message: (00639, Message_00639) at offset 313

Send     message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms

Received message: (00641, Message_00641) at offset 314

Send     message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms

Received message: (00643, Message_00643) at offset 315

……

出現錯誤的時候,Producer 丟擲了 NetworkException 異常。其中有3589條 Received 日誌,3583條 Send 日誌,7條 NetworkException 異常日誌,傳送訊息的最大序號是3590,接收訊息的最大序號是3589,有以下幾個值得註意的地方,

  1. 宕機之前,訊息的接收並不是順序的,這是因為 topic1 有2個分割槽,Kafka 只保證分割槽上的有序;

  2. 宕機之後,出現了長段的傳送日誌而沒有接收日誌,說明 Kafka 此時正在選舉,選舉的過程會阻塞消費者;

  3. 從接收訊息的條數和序號來看,所有的訊息都收到了,沒有丟(沒有收到3590的訊息可能是因為強制退出 client 行程的原因),傳送的過程的7個異常應該只是虛警,7條異常對應序號444~450,3583條 Send 訊息再加上這7條,與總訊息3590條一致;

從這個實驗中,可以看到,雖然 Kafka 不保證訊息重覆傳送,但是卻在儘量保證沒有訊息被重覆傳送,可能我的實驗場景還不夠極端,沒有做出訊息重覆的情況。

如之前所說,如果要保持完全順序性,需要使用單分割槽;如果要避免丟擲 NetworkException 異常,就使用 Producer 同步傳送。下麵,我們重做上面的例子,不同之處是使用單分割槽和 Producer 同步傳送,擷取一段 Broker 宕機時的日誌如下,

……

Sent message: (118, Message_00118)

Received message: (00118, Message_00118) at offset 117

Received message: (00119, Message_00119) at offset 118

Sent message: (119, Message_00119)

Sent message: (120, Message_00120)

Received message: (00120, Message_00120) at offset 119

Sent message: (121, Message_00121)

Received message: (00121, Message_00121) at offset 120

Sent message: (122, Message_00122)

Sent message: (123, Message_00123)

Sent message: (124, Message_00124)

Sent message: (125, Message_00125)

Sent message: (126, Message_00126)

Sent message: (127, Message_00127)

……

可見,由於採用同步傳送,Broker 宕機並沒有造成丟擲異常,另外,由於使用單分割槽,順序性也得到了保證,全域性沒有出現亂序的情況。

綜上,是否使用多分割槽更多的是對順序性的要求,而使用 Producer 同步傳送還是非同步傳送,更多是出於重覆訊息的考慮,如果非同步傳送丟擲異常,在保證不丟訊息的前提下,勢必要重發訊息,這就會導致收到重覆訊息。多分割槽和 Producer 非同步傳送,會帶來效能的提升,但是也會引入非順序性,重覆訊息等問題,如何取捨要看應用的需求。

Kafka 最佳實踐

Kafka 在一些應用場景中,有一些前人總結的最佳實踐 8 9。對最佳實踐,我的看法是,對於自己比較熟悉,有把握的部分,可以按自己的步驟進行;對一些自己不清楚的領域,可以借鑒其中的一些內容,至少不會錯的特別厲害。有文章10說,Kafka 在分割槽比較多的時候,相應時間會變長,這個現象值得在實踐中註意。

後記

在 Kafka 與 RocketMQ 的對比中,RocketMQ 的一個核心功能就是可以支援同步刷盤,此時,即使突然斷電,也可以保證訊息不丟;而 Kafka 採用的是非同步刷盤,即使傳回寫入成功,也只是寫入緩衝區成功,並非已經持久化。因此,如果出現斷電或 kill -9 的情況,Kafka 記憶體中的訊息可能丟失。另外,同步刷盤的效率是比較低下的,一般生產中估計也不會使用,可以用優雅關閉的方式來關閉行程。如果不考慮這些極端情況的話,Kafka 基本是一個很可靠的訊息中介軟體。

參考文章

  • http://kafka.apache.org/documentation.html

  • http://www.jianshu.com/p/453c6e7ff81c

  • http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章

  • http://developer.51cto.com/art/201501/464491.htm

  • https://segmentfault.com/q/1010000004292925

  • http://www.cnblogs.com/gnivor/p/5318319.html

  • http://www.cnblogs.com/davidwang456/p/4313784.html

  • http://www.jianshu.com/p/8689901720fd

  • http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/

  • How to choose the number of topics/partitions in a Kafka cluster?

系列

【關於投稿】


如果大家有原創好文投稿,請直接給公號傳送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章連結

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂