歡迎光臨
每天分享高質量文章

【訊息佇列 MQ 專欄】訊息佇列之 Kafka

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

Kafka 特點

Kafka 最早是由 LinkedIn 公司開發一種分散式的基於釋出/訂閱的訊息系統,之後成為 Apache 的頂級專案。主要特點如下:

1. 同時為釋出和訂閱提供高吞吐量

Kafka 的設計標的是以時間複雜度為 O(1) 的方式提供訊息持久化能力,即使對TB 級以上資料也能保證常數時間的訪問效能。即使在非常廉價的商用機器上也能做到單機支援每秒 100K 條訊息的傳輸。

2. 訊息持久化

將訊息持久化到磁碟,因此可用於批次消費,例如 ETL 以及實時應用程式。透過將資料持久化到硬碟以及 replication 防止資料丟失。

3. 分散式

支援 Server 間的訊息分割槽及分散式消費,同時保證每個 partition 內的訊息順序傳輸。這樣易於向外擴充套件,所有的producer、broker 和 consumer 都會有多個,均為分散式的。無需停機即可擴充套件機器。

4. 消費訊息採用 pull 樣式

訊息被處理的狀態是在 consumer 端維護,而不是由 server 端維護,broker 無狀態,consumer 自己儲存 offset。

5. 支援 online 和 offline 的場景。

同時支援離線資料處理和實時資料處理。

Kafka 中的基本概念

img

1. Broker

Kafka 叢集中的一臺或多臺伺服器統稱為 Broker

2. Topic

每條釋出到 Kafka 的訊息都有一個類別,這個類別被稱為 Topic 。(物理上不同Topic 的訊息分開儲存。邏輯上一個 Topic 的訊息雖然儲存於一個或多個broker上,但使用者只需指定訊息的 Topic 即可生產或消費資料而不必關心資料存於何處)

3. Partition

Topic 物理上的分組,一個 Topic 可以分為多個 Partition ,每個 Partition 是一個有序的佇列。Partition 中的每條訊息都會被分配一個有序的 id(offset)

4. Producer

訊息和資料的生產者,可以理解為往 Kafka 發訊息的客戶端

5. Consumer

訊息和資料的消費者,可以理解為從 Kafka 取訊息的客戶端

6. Consumer Group

每個 Consumer 屬於一個特定的 Consumer Group(可為每個 Consumer 指定Group Name,若不指定 Group Name 則屬於預設的 Group)。這是 Kafka 用來實現一個 Topic 訊息的廣播(發給所有的 Consumer )和單播(發給任意一個 Consumer )的手段。一個 Topic 可以有多個 Consumer Group。Topic 的訊息會複製(不是真的複製,是概念上的)到所有的 Consumer Group,但每個 Consumer Group 只會把訊息發給該 Consumer Group 中的一個 Consumer。如果要實現廣播,只要每個 Consumer 有一個獨立的 Consumer Group 就可以了。如果要實現單播只要所有的 Consumer 在同一個 Consumer Group 。用 Consumer Group 還可以將 Consumer 進行自由的分組而不需要多次傳送訊息到不同的 Topic 。

Kafka 安裝

Mac 使用者用 HomeBrew 來安裝,安裝前要先更新 brew

brew update

接著安裝 kafka

brew install kafka

安裝完成之後可以檢視 kafka 的配置檔案

cd /usr/local/etc/kafka
kafka 配置檔案

kafka 需要用到 zookeeper,HomeBrew 安裝kafka 的時候會同時安裝 zookeeper。下麵先啟動 zookeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

接著啟動 kafka

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-server-start /usr/local/etc/kafka/server.properties

建立 topic,設定 partition 數量為2,topic 的名字叫 test-topic,下麵的例子都用這個 topic

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic

檢視建立的 topic

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --list --zookeeper localhost:2181

Kafka 命令列測試

傳送訊息

cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-producer --broker-list localhost:9092 --topic test-topic

消費訊息

cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

刪除 topic

cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic

如果 kafka 啟動時載入的配置檔案中 server.properties 沒有配置delete.topic.enable=true,那麼此時的刪除並不是真正的刪除,而是把 topic 標記為:marked for deletion

檢視所有 topic

cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --zookeeper localhost:2181 --list

物理刪除 topic

登入zookeeper客戶端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli
找到topic所在的目錄:ls /brokers/topics
找到要刪除的topic,執行命令:rmr /brokers/topics/test-topic 即可,此時topic被徹底刪除

Java 客戶端訪問

1. maven工程的pom檔案中新增依賴

<dependency>
   <groupId>org.apache.kafkagroupId>


   <artifactId>kafka-clientsartifactId>
   <version>0.11.0.1version>
dependency>

2. 訊息生產者

package org.study.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class ProducerSample {
   public static void main(String[] args) {
       Map<String, Object> props = new HashMap<String, Object>();
       props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址
       props.put("bootstrap.servers", "localhost:9092");//用於建立與 kafka 叢集連線的 host/port 組。
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       String topic = "test-topic";
       Producer<String, String> producer = new KafkaProducer<String, String>(props);
       producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1"));
       producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2"));
       producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3"));
       producer.close();
   }
}

3. 訊息消費者

package org.study.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerSample {
   public static void main(String[] args) {
       String topic = "test-topic";// topic name
       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");//用於建立與 kafka 叢集連線的 host/port 組。
       props.put("group.id", "testGroup1");// Consumer Group Name
       props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自動提交
       props.put("auto.commit.interval.ms", "1000");// 自動提交 offset 到 zookeeper 的時間間隔,時間是毫秒
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       Consumer<String, String> consumer = new KafkaConsumer(props);
       consumer.subscribe(Arrays.asList(topic));
       while (true) {
           ConsumerRecords<String, String> records = consumer.poll(100);
           for (ConsumerRecord<String, String> record : records)
               System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
       }
   }
}

4. 啟動 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

5. 啟動 kafka 伺服器

kafka-server-start /usr/local/etc/kafka/server.properties

6. 執行 Consumer

先執行 Consumer ,這樣當生產者傳送訊息的時候能在消費者後端看到訊息記錄。

7. 執行 Producer

執行 Producer,釋出幾條訊息,在 Consumer 的控制檯能看到接收的訊息

Consumer 控制檯

Kafka 叢集配置

kafka 的叢集配置一般有三種,即: single node – single broker ,single node – multiple broker ,multiple node – multiple broker

前兩種實際上官網有介紹。

single node – single broker

單節點單 broker

1. 啟動 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. 啟動 kafka broker

kafka-server-start /usr/local/etc/kafka/server.properties

3. 建立一個 kafka topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker

4. 啟動 producer 傳送資訊

kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker

broker-list 和 topic 這兩個引數是必須的,broker-list 指定要連線的 broker 的地址,格式為 node_address:port 。topic 是必須的,因為需要傳送訊息給訂閱了該topic 的 consumer group 。現在可以在命令列裡輸入一些資訊,每一行會被作為一個訊息。

傳送訊息

5. 啟動 consumer 消費訊息

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker

在不同的終端視窗裡分別啟動 zookeeper、broker、producer、consumer 後,在producer 終端裡輸入訊息,訊息就會在 consumer 終端中顯示了。

訊息顯示

single node – multiple broker

單節點多 broker

1. 啟動 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. 啟動broker

如果需要在單個節點(即一臺機子)上面啟動多個 broker(這裡作為例子啟動三個 broker),需要準備多個server.properties檔案即可,所以需要複製 /usr/local/etc/kafka/server.properties 檔案。因為需要為每個 broker 指定單獨的屬性配置檔案,其中 broker.id 、 port 、 log.dir 這三個屬性必須是不同的。

新建一個 kafka-example 目錄和三個存放日誌的目錄

mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3

複製 /usr/local/etc/kafka/server.properties 檔案三份

cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties

在 broker1 的配置檔案 server-1.properties 中,相關要修改的引數為:

broker.id=1
port=9093
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1

broker2 的配置檔案 server-2.properties 中,相關要修改的引數為:

broker.id=2
port=9094
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2

broker3 的配置檔案 server-3.properties 中,相關要修改的引數為:

broker.id=3
port=9095
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3

啟動每個 broker

cd /Users/niwei/Downloads/kafka-example
kafka-server-start server-1.properties
kafka-server-start server-2.properties
kafka-server-start server-3.properties

3. 建立 topic

建立一個名為 topic-singlenode-multiplebroker 的topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker

4. 啟動 producer 傳送資訊

如果一個 producer 需要連線多個 broker 則需要傳遞引數 broker-list

kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker

5. 啟動 consumer 消費訊息

kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker
單節點多 broker 消費訊息

multiple node – multiple broker

多節點多 broker

搭建 zookeeper 叢集

1. Kafka 的叢集配置

broker.id=1  #當前機器在叢集中的唯一標識
port=9093 #當前 kafka 對外提供服務的埠,預設是 9092
host.name=192.168.121.101 #這個引數預設是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 #訊息存放的目錄,這個目錄可以配置為逗號分割的運算式
zookeeper.connect=192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 #設定 zookeeper 叢集的連線埠
num.network.threads=3 #這個是 borker 進行網路處理的執行緒數
num.io.threads=5 #這個是 borker 進行 IO 處理的執行緒數
socket.send.buffer.bytes=102400 #傳送緩衝區的大小,資料先回儲存到緩衝區了到達一定的大小後在傳送能提高效能
socket.receive.buffer.bytes=102400 #接收緩衝區的大小,當資料到達一定大小後在序列化到磁碟
socket.request.max.bytes=104857600 #這個引數是向 kafka 請求訊息或者向 kafka 傳送訊息的請求的最大數,這個值不能超過 jvm 的堆疊大小
num.partitions=1 #預設的分割槽數,一個 topic 預設1個分割槽數
log.retention.hours=24 #預設訊息的最大持久化時間,24小時
message.max.byte=5242880  #訊息儲存的最大值5M
default.replication.factor=2  #kafka 儲存訊息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880  #取訊息的最大直接數
log.segment.bytes=1073741824 #這個引數是因為 kafka 的訊息是以追加的形式落地到檔案,當超過這個值的時候,kafka 會新建一個檔案
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去檢查上面配置的 log 失效時間(log.retention.hours=24 ),到目錄檢視是否有過期的訊息如果有則刪除
log.cleaner.enable=false #是否啟用 log 壓縮,一般不用啟用,啟用的話可以提高效能

由於是多節點多 broker 的,所以每個 broker 的配置檔案 server.properties 都要按以上說明修改

2. producer 的配置修改

kafka-console-producer --broker-list 192.168.21.1:9092,192.168.21.2:9092,192.168.21.3:9092 --topic topic-multiplenode-multiplebroker

3. consumer 的配置修改

kafka-console-consumer --zookeeper 192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 --topic topic-multiplenode-multiplebroker

Kafka 高可靠性配置

Kafka 提供了很高的資料冗餘彈性,對於需要資料高可靠性的場景可以增加資料冗餘備份數(replication.factor),調高最小寫入副本數的個數(min.insync.replicas)等等,但是這樣會影響效能。反之,效能提高而可靠性則降低,使用者需要自身業務特性在彼此之間做一些權衡性選擇。

要保證資料寫入到 Kafka 是安全的、高可靠的,需要如下的配置:

1. topic 的配置

replication.factor>=3,即副本數至少是3個2<=min.insync.replicas<=replication.factor

2. broker 的配置

leader 的選舉條件 unclean.leader.election.enable=false

3. producer 的配置

request.required.acks=-1,producer.type=sync

Kafka 高吞吐量的秘訣

訊息中介軟體從功能上看就是寫入資料、讀取資料兩大類,最佳化也可以從這兩方面來看。

為了最佳化寫入速度 Kafak 採用以下技術:

1. 順序寫入

磁碟大多數都還是機械結構(SSD不在討論的範圍內),如果將訊息以隨機寫的方式存入磁碟,就需要按柱面、磁頭、扇區的方式定址,緩慢的機械運動(相對記憶體)會消耗大量時間,導致磁碟的寫入速度與記憶體寫入速度差好幾個數量級。為了規避隨機寫帶來的時間消耗,Kafka 採取了順序寫的方式儲存資料,如下圖所示:

順序寫
消費訊息

2. 記憶體對映檔案

即便是順序寫入硬碟,硬碟的訪問速度還是不可能追上記憶體。所以 Kafka 的資料並不是實時的寫入硬碟,它充分利用了現代作業系統分頁儲存來利用記憶體提高I/O效率。Memory Mapped Files (後面簡稱mmap)也被翻譯成記憶體對映檔案,在64位作業系統中一般可以表示 20G 的資料檔案,它的工作原理是直接利用作業系統的 Page 來實現檔案到物理記憶體的直接對映。完成對映之後對物理記憶體的操作會被同步到硬碟上(由作業系統在適當的時候)。透過 mmap 行程像讀寫硬碟一樣讀寫記憶體,也不必關心記憶體的大小,有虛擬記憶體為我們兜底。使用這種方式可以獲取很大的 I/O 提升,因為它省去了使用者空間到核心空間複製的開銷(呼叫檔案的 read 函式會把資料先放到核心空間的記憶體中,然後再複製到使用者空間的記憶體中)但這樣也有一個很明顯的缺陷——不可靠,寫到 mmap 中的資料並沒有被真正的寫到硬碟,作業系統會在程式主動呼叫 flush 的時候才把資料真正的寫到硬碟。所以 Kafka 提供了一個引數—— producer.type 來控制是不是主動 flush,如果Kafka 寫入到 mmap 之後就立即 flush 然後再傳回 Producer 叫同步(sync);如果寫入 mmap 之後立即傳回,Producer 不呼叫 flush ,就叫非同步(async)。

3. 標準化二進位制訊息格式

為了避免無效率的位元組複製,尤其是在負載比較高的情況下影響是顯著的。為了避免這種情況,Kafka 採用由 Producer,Broker 和 Consumer 共享的標準化二進位制訊息格式,這樣資料塊就可以在它們之間自由傳輸,無需轉換,降低了位元組複製的成本開銷。

而在讀取速度的最佳化上 Kafak 採取的主要是零複製

零複製(Zero Copy)的技術:

傳統樣式下我們從硬碟讀取一個檔案是這樣的

檔案傳輸到 Socket 的常規方式

(2) 應用將資料從核心空間讀到使用者空間的快取中

(3) 應用將資料寫會核心空間的套接字快取中

(4)作業系統將資料從套接字快取寫到網絡卡快取中,以便將資料經網路發出

這樣做明顯是低效的,這裡有四次複製,兩次系統呼叫。針對這種情況 Unix 作業系統提供了一個最佳化的路徑,用於將資料從頁快取區傳輸到 socket。在 Linux 中,是透過 sendfile 系統呼叫來完成的。Java提供了訪問這個系統呼叫的方法:FileChannel.transferTo API。這種方式只需要一次複製:作業系統將資料直接從頁快取傳送到網路上,在這個最佳化的路徑中,只有最後一步將資料複製到網絡卡快取中是需要的。

零複製方式傳輸到 Socket

Kafka 速度的秘訣在於它把所有的訊息都變成一個的檔案。透過 mmap 提高 I/O的速度,寫入資料的時候是末尾新增所以速度最優;讀取資料的時候配合sendfile 直接暴力輸出。所以單純的去測試 MQ 的速度沒有任何意義,Kafka 的這種暴力的做法已經脫了 MQ 的底褲,更像是一個暴力的資料傳送器。


知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽

09. 拓展機制 SPI

10. 執行緒池


一共 60 篇++

贊(0)

分享創造快樂