歡迎光臨
每天分享高質量文章

震驚了!原來這才是 Kafka!(多圖+深入)

  • 簡介
  • 生產
  • 消費
  • 訊息投遞語意
  • 檔案組織
  • 常用配置項

簡介

kafka是一個分散式訊息佇列。具有高效能、持久化、多副本備份、橫向擴充套件能力。生產者往佇列裡寫訊息,消費者從佇列裡取訊息進行業務邏輯。一般在架構設計中起到解耦、削峰、非同步處理的作用。

kafka對外使用topic的概念,生產者往topic裡寫訊息,消費者從讀訊息。為了做到水平擴充套件,一個topic實際是由多個partition組成的,遇到瓶頸時,可以透過增加partition的數量來進行橫向擴容。單個parition內是保證訊息有序。

每新寫一條訊息,kafka就是在對應的檔案append寫,所以效能非常高。

kafka的總體資料流是這樣的:

kafka data flow

大概用法就是,Producers往Brokers裡面的指定Topic中寫訊息,Consumers從Brokers裡面拉去指定Topic的訊息,然後進行業務處理。
圖中有兩個topic,topic 0有兩個partition,topic 1有一個partition,三副本備份。可以看到consumer gourp 1中的consumer 2沒有分到partition處理,這是有可能出現的,下麵會講到。

關於broker、topics、partitions的一些元資訊用zk來存,監控和路由啥的也都會用到zk。


生產

基本流程是這樣的:

kafka sdk product flow.png

建立一條記錄,記錄中一個要指定對應的topic和value,key和partition可選。 先序列化,然後按照topic和partition,放進對應的傳送佇列中。kafka produce都是批次請求,會積攢一批,然後一起傳送,不是調send()就進行立刻進行網路發包。
如果partition沒填,那麼情況會是這樣的:

  1. key有填
    按照key進行雜湊,相同key去一個partition。(如果擴充套件了partition的數量那麼就不能保證了)
  2. key沒填
    round-robin來選partition

這些要發往同一個partition的請求按照配置,攢一波,然後由一個單獨的執行緒一次性發過去。

API

有high level api,替我們把很多事情都幹了,offset,路由啥都替我們幹了,用以來很簡單。
還有simple api,offset啥的都是要我們自己記錄。

partition

當存在多副本的情況下,會儘量把多個副本,分配到不同的broker上。kafka會為partition選出一個leader,之後所有該partition的請求,實際操作的都是leader,然後再同步到其他的follower。當一個broker歇菜後,所有leader在該broker上的partition都會重新選舉,選出一個leader。(這裡不像分散式檔案儲存系統那樣會自動進行複製保持副本數)

然後這裡就涉及兩個細節:怎麼分配partition,怎麼選leader。

關於partition的分配,還有leader的選舉,總得有個執行者。在kafka中,這個執行者就叫controller。kafka使用zk在broker中選出一個controller,用於partition分配和leader選舉。

partition的分配

  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序
  2. 將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader)
  3. 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

leader容災

controller會在Zookeeper的/brokers/ids節點上註冊Watch,一旦有broker宕機,它就能知道。當broker宕機後,controller就會給受到影響的partition選出新leader。controller從zk的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應partition的ISR(in-sync replica已同步的副本)串列,選一個出來做leader。
選出leader後,更新zk,然後傳送LeaderAndISRRequest給受影響的broker,讓它們改變知道這事。為什麼這裡不是使用zk通知,而是直接給broker傳送rpc請求,我的理解可能是這樣做zk有效能問題吧。

如果ISR串列是空,那麼會根據配置,隨便選一個replica做leader,或者乾脆這個partition就是歇菜。如果ISR串列的有機器,但是也歇菜了,那麼還可以等ISR的機器活過來。

多副本同步

這裡的策略,服務端這邊的處理是follower從leader批次拉取資料來同步。但是具體的可靠性,是由生產者來決定的。
生產者生產訊息的時候,透過request.required.acks引數來設定資料的可靠性。

acks what happen
0 which means that the producer never waits for an acknowledgement from the broker.發過去就完事了,不關心broker是否處理成功,可能丟資料。
1 which means that the producer gets an acknowledgement after the leader replica has received the data. 當寫Leader成功後就傳回,其他的replica都是透過fetcher去同步的,所以kafka是非同步寫,主備切換可能丟資料。
-1 which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr裡所有機器同步成功,才能傳回成功,延時取決於最慢的機器。強一致,不會丟資料。

在acks=-1的時候,如果ISR少於min.insync.replicas指定的數目,那麼就會傳回不可用。

這裡ISR串列中的機器是會變化的,根據配置replica.lag.time.max.ms,多久沒同步,就會從ISR串列中剔除。以前還有根據落後多少條訊息就踢出ISR,在1.0版本後就去掉了,因為這個值很難取,在高峰的時候很容易出現節點不斷的進出ISR串列。

從ISA中選出leader後,follower會從把自己日誌中上一個高水位後面的記錄去掉,然後去和leader拿新的資料。因為新的leader選出來後,follower上面的資料,可能比新leader多,所以要擷取。這裡高水位的意思,對於partition和leader,就是所有ISR中都有的最新一條記錄。消費者最多隻能讀到高水位;

從leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新訊息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。

也正是由於這個高水位延遲一輪,在一些情況下,kafka會出現丟資料和主備資料不一致的情況,0.11開始,使用leader epoch來代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)

思考:
當acks=-1時

  1. 是follwers都來fetch就傳回成功,還是等follwers第二輪fetch?
  2. leader已經寫入本地,但是ISR中有些機器失敗,那麼怎麼處理呢?

消費

訂閱topic是以一個消費組來訂閱的,一個消費組裡面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,只能被消費組裡的一個消費者消費,但是可以同時被多個消費組消費。因此,如果消費組內的消費者如果比partition多的話,那麼就會有個別消費者一直空閑。

untitled_page.png

API

訂閱topic時,可以用正則運算式,如果有新topic匹配上,那能自動訂閱上。

offset的儲存

一個消費組消費partition,需要儲存offset記錄消費到哪,以前儲存在zk中,由於zk的寫效能不好,以前的解決方法都是consumer每隔一分鐘上報一次。這裡zk的效能嚴重影響了消費的速度,而且很容易出現重覆消費。
在0.10版本後,kafka把這個offset的儲存,從zk總剝離,儲存在一個名叫__consumeroffsets topic的topic中。寫進訊息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact。總是保留最新的key,其餘刪掉。一般情況下,每個key的offset都是快取在記憶體中,查詢的時候不用遍歷partition,如果沒有快取,第一次就會遍歷partition建立快取,然後查詢傳回。

確定consumer group位移資訊寫入__consumers_offsets的哪個partition,具體計算公式:

__consumers_offsets partition =
           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,預設是50個分割槽。

思考:
如果正在跑的服務,修改了offsets.topic.num.partitions,那麼offset的儲存是不是就亂套了?

分配partition–reblance

生產過程中broker要分配partition,消費過程這裡,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用於分配partition。
下麵從頂向下,分別闡述一下

  1. 怎麼選coordinator。
  2. 互動流程。
  3. reblance的流程。

選coordinator

  1. 看offset儲存在那個partition
  2. 該partition leader所在的broker就是被選定的coordinator

這裡我們可以看到,consumer group的coordinator,和儲存consumer group offset的partition leader是同一臺機器。

互動流程

把coordinator選出來之後,就是要分配了
整個流程是這樣的:

  1. consumer啟動、或者coordinator宕機了,consumer會任意請求一個broker,傳送ConsumerMetadataRequest請求,broker會按照上面說的方法,選出這個consumer對應coordinator的地址。
  2. consumer 傳送heartbeat請求給coordinator,傳回IllegalGeneration的話,就說明consumer的資訊是舊的了,需要重新加入進來,進行reblance。傳回成功,那麼consumer就從上次分配的partition中繼續執行。

reblance流程

  1. consumer給coordinator傳送JoinGroupRequest請求。
  2. 這時其他consumer發heartbeat請求過來時,coordinator會告訴他們,要reblance了。
  3. 其他consumer傳送JoinGroupRequest請求。
  4. 所有記錄在冊的consumer都發了JoinGroupRequest請求之後,coordinator就會在這裡consumer中隨便選一個leader。然後回JoinGroupRespone,這會告訴consumer你是follower還是leader,對於leader,還會把follower的資訊帶給它,讓它根據這些資訊去分配partition

5、consumer向coordinator傳送SyncGroupRequest,其中leader的SyncGroupRequest會包含分配的情況。
6、coordinator回包,把分配的情況告訴consumer,包括leader。

當partition或者消費者的數量發生變化時,都得進行reblance。
列舉一下會reblance的情況:

  1. 增加partition
  2. 增加消費者
  3. 消費者主動關閉
  4. 消費者宕機了
  5. coordinator自己也宕機了

訊息投遞語意

kafka支援3種訊息投遞語意
At most once:最多一次,訊息可能會丟失,但不會重覆
At least once:最少一次,訊息不會丟失,可能會重覆
Exactly once:只且一次,訊息不丟失不重覆,只且消費一次(0.11中實現,僅限於下游也是kafka)

在業務中,常常都是使用At least once的模型,如果需要可重入的話,往往是業務自己實現。

At least once

先獲取資料,再進行業務處理,業務處理成功後commit offset。
1、生產者生產訊息異常,訊息是否成功寫入不確定,重做,可能寫入重覆的訊息
2、消費者處理訊息,業務處理成功後,更新offset失敗,消費者重啟的話,會重覆消費

At most once

先獲取資料,再commit offset,最後進行業務處理。
1、生產者生產訊息異常,不管,生產下一個訊息,訊息就丟了
2、消費者處理訊息,先更新offset,再做業務處理,做業務處理失敗,消費者重啟,訊息就丟了

Exactly once

思路是這樣的,首先要保證訊息不丟,再去保證不重覆。所以盯著At least once的原因來搞。 首先想出來的:

  1. 生產者重做導致重覆寫入訊息—-生產保證冪等性
  2. 消費者重覆消費—消滅重覆消費,或者業務介面保證冪等性重覆消費也沒問題

由於業務介面是否冪等,不是kafka能保證的,所以kafka這裡提供的exactly once是有限制的,消費者的下游也必須是kafka。所以一下討論的,沒特殊說明,消費者的下游系統都是kafka(註:使用kafka conector,它對部分系統做了適配,實現了exactly once)。

生產者冪等性好做,沒啥問題。

解決重覆消費有兩個方法:

  1. 下游系統保證冪等性,重覆消費也不會導致多條記錄。
  2. 把commit offset和業務處理系結成一個事務。

本來exactly once實現第1點就ok了。

但是在一些使用場景下,我們的資料源可能是多個topic,處理後輸出到多個topic,這時我們會希望輸出時要麼全部成功,要麼全部失敗。這就需要實現事務性。既然要做事務,那麼乾脆把重覆消費的問題從根源上解決,把commit offset和輸出到其他topic系結成一個事務。

生產冪等性

思路是這樣的,為每個producer分配一個pid,作為該producer的唯一標識。producer會為每一個維護一個單調遞增的seq。類似的,broker也會為每個記錄下最新的seq。當req_seq == broker_seq+1時,broker才會接受該訊息。因為:

  1. 訊息的seq比broker的seq大超過時,說明中間有資料還沒寫入,即亂序了。

  2. 訊息的seq不比broker的seq小,那麼說明該訊息已被儲存。

    解決重覆生產

事務性/原子性廣播

場景是這樣的:

  1. 先從多個源topic中獲取資料。
  2. 做業務處理,寫到下游的多個目的topic。
  3. 更新多個源topic的offset。

其中第2、3點作為一個事務,要麼全成功,要麼全失敗。這裡得益與offset實際上是用特殊的topic去儲存,這兩點都歸一為寫多個topic的事務性處理。

基本思路是這樣的:
引入tid(transaction id),和pid不同,這個id是應用程式提供的,用於標識事務,和producer是誰並沒關係。就是任何producer都可以使用這個tid去做事務,這樣進行到一半就死掉的事務,可以由另一個producer去恢復。
同時為了記錄事務的狀態,類似對offset的處理,引入transaction coordinator用於記錄transaction log。在叢集中會有多個transaction coordinator,每個tid對應唯一一個transaction coordinator。
註:transaction log刪除策略是compact,已完成的事務會標記成null,compact後不保留。

做事務時,先標記開啟事務,寫入資料,全部成功就在transaction log中記錄為prepare commit狀態,否則寫入prepare abort的狀態。之後再去給每個相關的partition寫入一條marker(commit或者abort)訊息,標記這個事務的message可以被讀取或已經廢棄。成功後在transaction log記錄下commit/abort狀態,至此事務結束。

資料流:

Kafka Transactions Data Flow.png
  1. 首先使用tid請求任意一個broker(程式碼中寫的是負載最小的broker),找到對應的transaction coordinator。

  2. 請求transaction coordinator獲取到對應的pid,和pid對應的epoch,這個epoch用於防止僵死行程複活導致訊息錯亂,當訊息的epoch比當前維護的epoch小時,拒絕掉。tid和pid有一一對應的關係,這樣對於同一個tid會傳回相同的pid。

  3. client先請求transaction coordinator記錄的事務狀態,初始狀態是BEGIN,如果是該事務中第一個到達的,同時會對事務進行計時;client輸出資料到相關的partition中;client再請求transaction coordinator記錄offset的事務狀態;client傳送offset commit到對應offset partition。

  4. client傳送commit請求,transaction coordinator記錄prepare commit/abort,然後傳送marker給相關的partition。全部成功後,記錄commit/abort的狀態,最後這個記錄不需要等待其他replica的ack,因為prepare不丟就能保證最終的正確性了。

這裡prepare的狀態主要是用於事務恢復,例如給相關的partition傳送控制訊息,沒發完就宕機了,備機起來後,producer傳送請求獲取pid時,會把未完成的事務接著完成。

當partition中寫入commit的marker後,相關的訊息就可被讀取。所以kafka事務在prepare commit到commit這個時間段內,訊息是逐漸可見的,而不是同一時刻可見。

詳細細節可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees

消費事務

前面都是從生產的角度看待事務。還需要從消費的角度去考慮一些問題。
消費時,partition中會存在一些訊息處於未commit狀態,即業務方應該看不到的訊息,需要過濾這些訊息不讓業務看到,kafka選擇在消費者行程中進行過來,而不是在broker中過濾,主要考慮的還是效能。kafka高效能的一個關鍵點是zero copy,如果需要在broker中過濾,那麼勢必需要讀取訊息內容到記憶體,就會失去zero copy的特性。


檔案組織

kafka的資料,實際上是以檔案的形式儲存在檔案系統的。topic下有partition,partition下有segment,segment是實際的一個個檔案,topic和partition都是抽象概念。

在目錄/

不能識別此Latex公式:
{topicName}-{

partitionid}/下,儲存著實際的log檔案(即segment),還有對應的索引檔案。

每個segment檔案大小相等,檔案名以這個segment中最小的offset命名,檔案副檔名是.log;segment對應的索引的檔案名字一樣,副檔名是.index。有兩個index檔案,一個是offset index用於按offset去查message,一個是time index用於按照時間去查,其實這裡可以優化合到一起,下麵只說offset index。總體的組織是這樣的:

kafka 檔案組織.png

為了減少索引檔案的大小,降低空間使用,方便直接載入進記憶體中,這裡的索引使用稀疏矩陣,不會每一個message都記錄下具體位置,而是每隔一定的位元組數,再建立一條索引。 索引包含兩部分,分別是baseOffset,還有position。

baseOffset:意思是這條索引對應segment檔案中的第幾條message。這樣做方便使用數值壓縮演演算法來節省空間。例如kafka使用的是varint。

position:在segment中的絕對位置。

查詢offset對應的記錄時,會先用二分法,找出對應的offset在哪個segment中,然後使用索引,在定位出offset在segment中的大概位置,再遍歷查詢message。


常用配置項

broker配置

配置項 作用
broker.id broker的唯一標識
auto.create.topics.auto 設定成true,就是遇到沒有的topic自動建立topic。
log.dirs log的目錄數,目錄裡面放partition,當生成新的partition時,會挑目錄裡partition數最少的目錄放。

topic配置

配置項 作用
num.partitions 新建一個topic,會有幾個partition。
log.retention.ms 對應的還有minutes,hours的單位。日誌保留時間,因為刪除是檔案維度而不是訊息維度,看的是日誌檔案的mtime。
log.retention.bytes partion最大的容量,超過就清理老的。註意這個是partion維度,就是說如果你的topic有8個partition,配置1G,那麼平均分配下,topic理論最大值8G。
log.segment.bytes 一個segment的大小。超過了就滾動。
log.segment.ms 一個segment的開啟時間,超過了就滾動。
message.max.bytes message最大多大

關於日誌清理,預設當前正在寫的日誌,是怎麼也不會清理掉的。
還有0.10之前的版本,時間看的是日誌檔案的mtime,但這個指是不準確的,有可能檔案被touch一下,mtime就變了。因此在0.10版本開始,改為使用該檔案最新一條訊息的時間來判斷。
按大小清理這裡也要註意,Kafka在定時任務中嘗試比較當前日誌量總大小是否超過閾值至少一個日誌段的大小。如果超過但是沒超過一個日誌段,那麼就不會刪除。

贊(0)

分享創造快樂