歡迎光臨
每天分享高質量文章

Kafka 原始碼分析 5 :KafkaConsumer 消費處理

(點選上方公眾號,可快速關註)


來源:劉正陽 ,

liuzhengyang.github.io/2018/02/21/kafka-source-5-kafka-consumer/

Kafka消費者客戶端從Kafka cluster中讀取訊息並處理。

Kafka消費者可以手動系結自己到某個topic的某些partition上或者透過subscribe方法監聽某個topic自動系結。Kafka消費者系結到某個parition後就和這個partition的leader連線,然後發出fetch request, 獲取訊息後進行處理。

offset管理

kafka的消費模型是一個partition最多被一個consumer消費,而offset可以有consumer控制,例如透過seek前進或後退到某個offset位置。

首次連線時,可以透過KafkaConsumer配置引數裡的auto.offset.reset引數決定是從最新的位置(預設)還是從就早的位置開始消費。

預設情況下, enable.auto.commit引數是true,即KafkaConsumer客戶端會定時commit offset,所有要註意的一點是如果poll函式得到ConsumerRecords後如果處理是非同步的,則可能出現消費處理還沒有完成但是卻commit offset了,這時如果行程掛掉則重啟後則會發生丟訊息的情況。這裡有兩種解決方案,1是poll後的處理是同步的,這樣下一次poll會嘗試commit offset,則能保證at least one語意。2是關閉enable.auto.commit, 然後透過KafkaConsumer.commitSync方法來手動commit offset。

max.poll.interval.ms引數用於設定kafka消費者處理一次poll的消費結果的最大時間(預設300s),如果超過了這個時間則consumer被認為掛了會重新rebalance。

Consumer執行緒相關

消費者多執行緒處理有幾種方式

  1. 每個consumer只由一個執行緒處理,優點是能保證partition內有序和實現簡單,缺點是併發能力受限於partition的數量

  2. 將consumption和process過程分離,即consumer拉到一個訊息後傳遞給另一個執行緒或執行緒池處理,這裡提高了併發能力但是需要註意多執行緒處理中的順序問題不再保證以及可能出現consumer提交了offset而執行緒池沒處理完的情況,另外執行緒池要註意處理慢導致的記憶體佇列積壓問題。

KafkaConsumer.subscribe

監聽某個topic

subscribe(Collection topics, ConsumerRebalanceListener listener)

當消費者使用kafka cluster來管理group membership時,ConsumerRebalanceListener會在consumer rebalance時呼叫,consumer rebalance發生在消費者或消費關係變化的時候

  1. 某個消費行程掛掉

  2. 新消費行程加入

  3. partition數量發生變化時

這個Listener的常見用途是儲存這個partition的最新消費offset,在void onPartitionsRevoked(java.util.Collection partitions)裡儲存當前的partition和offset到資料庫中。然後reassign完成後,void onPartitionsAssigned(java.util.Collection partitions)中從資料庫讀取之前的消費位置,透過seek方法設定消費位置繼續消費。

Kafka.poll

public ConsumerRecords poll(long timeout) {

        // KafkaConsumer不是執行緒安全的

       acquireAndEnsureOpen();

       try {

           if (timeout < 0)

               throw new IllegalArgumentException(“Timeout must not be negative”);

           if (this.subscriptions.hasNoSubscriptionOrUserAssignment())

               throw new IllegalStateException(“Consumer is not subscribed to any topics or assigned any partitions”);

           // poll for new data until the timeout expires

           long start = time.milliseconds();

           long remaining = timeout;

           do {

               Map>> records = pollOnce(remaining);

               if (!records.isEmpty()) {

                   // before returning the fetched records, we can send off the next round of fetches

                   // and avoid block waiting for their responses to enable pipelining while the user

                   // is handling the fetched records.

                   //

                   // NOTE: since the consumed position has already been updated, we must not allow

                   // wakeups or any other errors to be triggered prior to returning the fetched records.

                   if (fetcher.sendFetches() > 0 || client.hasPendingRequests())

                       client.pollNoWakeup();

                   if (this.interceptors == null)

                       return new ConsumerRecords<>(records);

                   else

                       return this.interceptors.onConsume(new ConsumerRecords<>(records));

               }

               long elapsed = time.milliseconds() – start;

               remaining = timeout – elapsed;

           } while (remaining > 0);

           return ConsumerRecords.empty();

       } finally {

           release();

       }

   }

pollOnce處理

private Map>> pollOnce(long timeout) {

        client.maybeTriggerWakeup();

        // 協調者進行一次poll,裡面會根據auto.commit.interval.ms決定是否自動提交offset

        coordinator.poll(time.milliseconds(), timeout);

        // fetch positions if we have partitions we’re subscribed to that we

        // don’t know the offset for

        if (!subscriptions.hasAllFetchPositions())

            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // 如果已經有record資料了直接傳回

        // if data is available already, return it immediately

        Map>> records = fetcher.fetchedRecords();

        if (!records.isEmpty())

            return records;

        // 傳送一次fetch請求

        // send any new fetches (won’t resend pending fetches)

        fetcher.sendFetches();

        long now = time.milliseconds();

        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

        // 等待fetch請求結果

        client.poll(pollTimeout, now, new PollCondition() {

            @Override

            public boolean shouldBlock() {

                // since a fetch might be completed by the background thread, we need this poll condition

                // to ensure that we do not block unnecessarily in poll()

                return !fetcher.hasCompletedFetches();

            }

        });

        // after the long poll, we should check whether the group needs to rebalance

        // prior to returning data so that the group can stabilize faster

        if (coordinator.needRejoin())

            return Collections.emptyMap();

        // 傳回fetch結果

        return fetcher.fetchedRecords();

    }

看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂