歡迎光臨
每天分享高質量文章

說說 MQ 之 Kafka ( 二 )

(點擊上方公眾號,可快速關註)


來源:Valleylord ,

valleylord.github.io/post/201607-mq-kafka/

Kafka 的工具和編程接口

Kafka 的工具

Kafka 提供的工具還是比較全的,bin/ 目錄下的工具有以下一些,

bin/connect-distributed.sh     bin/kafka-consumer-offset-checker.sh     bin/kafka-replica-verification.sh   bin/kafka-verifiable-producer.sh

bin/connect-standalone.sh      bin/kafka-consumer-perf-test.sh          bin/kafka-run-class.sh              bin/zookeeper-security-migration.sh

bin/kafka-acls.sh              bin/kafka-mirror-maker.sh                bin/kafka-server-start.sh           bin/zookeeper-server-start.sh

bin/kafka-configs.sh           bin/kafka-preferred-replica-election.sh  bin/kafka-server-stop.sh            bin/zookeeper-server-stop.sh

bin/kafka-console-consumer.sh  bin/kafka-producer-perf-test.sh          bin/kafka-simple-consumer-shell.sh  bin/zookeeper-shell.sh

bin/kafka-console-producer.sh  bin/kafka-reassign-partitions.sh         bin/kafka-topics.sh

bin/kafka-consumer-groups.sh   bin/kafka-replay-log-producer.sh         bin/kafka-verifiable-consumer.sh

我常用的命令有以下幾個,

bin/kafka-server-start.sh -daemon config/server.properties &

bin/kafka-topics.sh –describe –zookeeper 192.168.232.23:2181 –topic topic1

bin/kafka-topics.sh –list –zookeeper 192.168.232.23:2181

bin/kafka-topics.sh –delete –zookeeper 192.168.232.23:2181 –topic topic1

bin/kafka-topics.sh –create –zookeeper 192.168.232.23:2181 –replication-factor 3 –partitions 2 –topic topic1

bin/kafka-console-consumer.sh –zookeeper 192.168.232.23:2181 –topic topic1 –from-beginning

bin/kafka-console-producer.sh –broker-list 192.168.232.23:9092 –topic topic1

kafka-server-start.sh 是用於 Kafka 的 Broker 啟動的,主要就一個引數 config/server.properties,該檔案中的配置項待會再說.還有一個 -daemon 引數,這個是將 Kafka 放在後臺用守護行程的方式運行,如果不加這個引數,Kafka 會在運行一段時間後自動退出,據說這個是 0.10.0.0 版本才有的問題 5。kafka-topics.sh 是用於管理 Topic 的工具,我主要用的 –describe、–list、–delete、–create 這4個功能,上述的例子基本是不言自明的,–replication-factor 3、–partitions 2 這兩個引數分別表示3個副本(含 Leader),和2個分割槽。kafka-console-consumer.sh 和 kafka-console-producer.sh 是生產者和消費者的簡易終端工具,在除錯的時候比較有用,我常用的是 kafka-console-consumer.sh。我沒有用 Kafka 自帶的 zookeeper,而是用的 zookeeper 官方的發佈版本 3.4.8,端口是預設2181,與 Broker 在同一臺機器上。

下麵說一下 Broker 啟動的配置檔案 config/server.properties,我在預設配置的基礎上,修改了以下一些,

broker.id=0

listeners=PLAINTEXT://192.168.232.23:9092

log.dirs=/tmp/kafka-logs

delete.topic.enable=true

broker.id 是 Kafka 集群中的 Broker ID,不可重覆,我在多副本的實驗中,將他們分別設置為0、1、2;listeners 是 Broker 監聽的地址,預設是監聽 localhost:9092,因為我不是單機實驗,所以修改為本機局域網地址,當然,如果要監聽所有地址的話,也可以設置為 0.0.0.0:9092,多副本實驗中,將監聽端口分別設置為 9092、9093、9094;log.dirs 是 Broker 的 log 的目錄,多副本實驗中,不同的 Broker 需要有不同的 log 目錄;delete.topic.enable 設為 true 後,可以刪除 Topic,並且連帶 Topic 中的訊息也一併刪掉,否則,即使呼叫 kafka-topics.sh –delete 也無法刪除 Topic,這是一個便利性的設置,對於開發環境可以,生產環境一定要設為 false(預設)。實驗中發現, 如果有消費者在消費這個 Topic,那麼也無法刪除,還是比較安全的。

剩下的工具多數在文件中也有提到。如果看一下這些腳本的話,會發現多數腳本的寫法都是一致的,先做一些引數的校驗,最後運行 exec $base_dir/kafka-run-class.sh XXXXXXXXX “$@”,可見,這些工具都是使用運行 Java Class 的方式呼叫的。

Kafka 的 Java API

在編程接口方面,官方提供了 Scala 和 Java 的接口,社區提供了更多的其他語言的接口,基本上,無論用什麼語言開發,都能找到相應的 API。下麵說一下 Java 的 API 接口。

生產者的 API 只有一種,相對比較簡單,代碼如下,

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

import java.util.concurrent.ExecutionException;

public class SimpleProducerDemo {

    public static void main(String[] args){

        Properties props = new Properties();

        props.put(“bootstrap.servers”, “192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094”);

        props.put(“zookeeper.connect”, “192.168.232.23:2181”);

        props.put(“client.id”, “DemoProducer”);

        props.put(“key.serializer”, “org.apache.kafka.common.serialization.IntegerSerializer”);

        props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

        KafkaProducer producer = new KafkaProducer<>(props);

        String topic = “topic1”;

        Boolean isAsync = false;

        int messageNo = 1;

        while (true) {

            String messageStr = “Message_” + String.format(“%05d”,messageNo);

            long startTime = System.currentTimeMillis();

            if (isAsync) { // Send asynchronously

                producer.send(new ProducerRecord<>(topic,

                        messageNo,

                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));

            } else { // Send synchronously

                try {

                    producer.send(new ProducerRecord<>(topic,

                            messageNo,

                            messageStr)).get();

                    System.out.println(“Sent message: (” + messageNo + “, ” + messageStr + “)”);

                } catch (InterruptedException | ExecutionException e) {

                    e.printStackTrace();

                }

            }

            try {

                Thread.sleep(3);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            ++messageNo;

        }

    }

}

class DemoCallBack implements Callback {

    private final long startTime;

    private final int key;

    private final String message;

    public DemoCallBack(long startTime, int key, String message) {

        this.startTime = startTime;

        this.key = key;

        this.message = message;

    }

    public void onCompletion(RecordMetadata metadata, Exception exception) {

        long elapsedTime = System.currentTimeMillis() – startTime;

        if (metadata != null) {

            System.out.println(

                    “Send     message: (” + String.format(“%05d”,key) + “, ” + message + “) at offset “+ metadata.offset() +

                            ” to partition(” + metadata.partition() +

                            “) in ” + elapsedTime + ” ms”);

        } else {

            exception.printStackTrace();

        }

    }

}

上例中使用了同步和異步發送兩種方式。在多副本的情況下,如果要指定同步複製還是異步複製,可以使用 acks 引數,詳細參考官方文件 Producer Configs 部分的內容;在多分割槽的情況下,如果要指定發送到哪個分割槽,可以使用 partitioner.class 引數,其值是一個實現了 org.apache.kafka.clients.producer.Partitioner 接口的類,用於根據不同的訊息指定分割槽6。消費者的 API 有幾種,比較新的 API 如下,

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

public class SimpleConsumer {

    public static void main(String[] args){

        Properties props = new Properties();

        props.put(“bootstrap.servers”, “192.168.232.23:9092”);

        props.put(“group.id”, “test”);

        props.put(“enable.auto.commit”, “true”);

        props.put(“auto.commit.interval.ms”, “1000”);

        props.put(“session.timeout.ms”, “30000”);

        props.put(“key.deserializer”, “org.apache.kafka.common.serialization.IntegerDeserializer”);

        props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

        KafkaConsumer consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(“topic1”, “topic2”, “topic3”));

        while (true) {

            ConsumerRecords records = consumer.poll(100);

            for (ConsumerRecord record : records) {

                System.out.println(“Received message: (” + String.format(“%05d”, record.key()) + “, ” + record.value() + “) at offset ” + record.offset());

            }

        }

    }

}

消費者還有舊的 API,比如 Consumer 和 SimpleConsumer API,這些都可以從 Kafka 代碼的 kafka-example 中找到,上述的兩個例子也是改寫自 kafka-example。使用新舊 API 在功能上都能滿足訊息收發的需要,但新 API 只依賴 kafka-clients,打包出來的 jar 包會小很多,以我的測試,新 API 的消費者 jar 包大約有 2M 左右,而舊 API 的消費者 jar 包接近 16M。

其實,Kafka 也提供了按分割槽訂閱,可以一次訂閱多個分割槽 TopicPartition[];也支持手動提交 offset,需要呼叫 consumer.commitSync。

Kafka 似乎沒有公開 Topic 創建以及修改的 API(至少我沒有找到),如果生產者向 Broker 寫入的 Topic 是一個新 Topic,那麼 Broker 會創建這個 Topic。創建的過程中會使用預設引數,例如,分割槽個數,會使用 Broker 配置中的 num.partitions 引數(預設1);副本個數,會使用 default.replication.factor 引數。但是通常情況下,我們會需要創建自定義的 Topic,那官方的途徑是使用 Kafka 的工具。也有一些非官方的途徑 7,例如可以這樣寫,

String[] options = new String[]{

        “–create”,

        “–zookeeper”,

        “192.168.232.23:2181”,

        “–partitions”,

        “2”,

        “–replication-factor”,

        “3”,

        “–topic”,

        “topic1”

};

TopicCommand.main(options);

但是這樣寫有一個問題,在執行完 TopicCommand.main(options); 之後,系統會自動退出,原因是執行完指令之後,會呼叫 System.exit(exitCode); 系統直接退出。這樣當然不行,我的辦法是,把相關的執行代碼挖出來,寫一個 TopicUtils 類,如下,

import joptsimple.OptionSpecBuilder;

import kafka.admin.TopicCommand;

import kafka.admin.TopicCommand$;

import kafka.utils.ZkUtils;

import org.apache.kafka.common.security.JaasUtils;

import scala.runtime.Nothing$;

public class TopicUtils {

    // from: http://blog.csdn.net/changong28/article/details/39325079

    // from: http://www.cnblogs.com/davidwang456/p/4313784.html

    public static void createTopic(){

        String[] options = new String[]{

                “–create”,

                “–zookeeper”,

                KafkaProperties.ZOOKEEPER_URL,

                “–partitions”,

                “2”,

                “–replication-factor”,

                “3”,

                “–topic”,

                KafkaProperties.TOPIC

        };

//        TopicCommand.main(options);

        oper(options);

    }

    public static void listTopic(){

        String[] options = new String[]{

                “–list”,

                “–zookeeper”,

                KafkaProperties.ZOOKEEPER_URL

        };

//        TopicCommand.main(options);

        oper(options);

    }

    public static void deleteTopic(){

        String[] options = new String[]{

                “–delete”,

                “–zookeeper”,

                KafkaProperties.ZOOKEEPER_URL,

                “–topic”,

                KafkaProperties.TOPIC

        };

//        TopicCommand.main(options);

        oper(options);

    }

    public static void describeTopic(){

        String[] options = new String[]{

                “–describe”,

                “–zookeeper”,

                KafkaProperties.ZOOKEEPER_URL,

                “–topic”,

                KafkaProperties.TOPIC

        };

//        TopicCommand.main(options);

        oper(options);

    }

    public static void main(String[] args){

        listTopic();

        createTopic();

        listTopic();

        describeTopic();

        deleteTopic();

        try {

            Thread.sleep(3*1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        listTopic();

    }

    /** copied & modified from kafka.admin.TopicCommand$.main

     *

     * @param args

     */

    public static void oper(String args[]){

        try {

        TopicCommand$ topicCommand$ = TopicCommand$.MODULE$;

        final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);

        if(args.length == 0) {

            throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), “Create, delete, describe, or change a topic.”);

        } else {

            int actions =0;

            OptionSpecBuilder[] optionSpecBuilders = {opts.createOpt(), opts.listOpt(), opts.alterOpt(), opts.describeOpt(), opts.deleteOpt()};

            for (OptionSpecBuilder temp:optionSpecBuilders){

                if (opts.options().has(temp)) {

                    actions++;

                }

            }

            if(actions != 1) {

                throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), “Command must include exactly one action: –list, –describe, –create, –alter or –delete”);

            } else {

                opts.checkArgs();

                ZkUtils zkUtils = kafka.utils.ZkUtils$.MODULE$.apply((String)opts.options().valueOf(opts.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());

                byte exitCode = 0;

                try {

                    try {

                        if(opts.options().has(opts.createOpt())) {

                            topicCommand$.createTopic(zkUtils, opts);

                        } else if(opts.options().has(opts.alterOpt())) {

                            topicCommand$.alterTopic(zkUtils, opts);

                        } else if(opts.options().has(opts.listOpt())) {

                            topicCommand$.listTopics(zkUtils, opts);

                        } else if(opts.options().has(opts.describeOpt())) {

                            topicCommand$.describeTopic(zkUtils, opts);

                        } else if(opts.options().has(opts.deleteOpt())) {

                            topicCommand$.deleteTopic(zkUtils, opts);

                        }

                    } catch (final Throwable var12) {

                        scala.Predef$.MODULE$.println((new StringBuilder()).append(“Error while executing topic command : “).append(var12.getMessage()).toString());

                        System.out.println(var12);

                        exitCode = 1;

                        return;

                    }

                } finally {

                    zkUtils.close();

//                    System.exit(exitCode);

                }

            }

        }

        } catch (Nothing$ nothing$) {

            nothing$.printStackTrace();

        }

    }

}

以上的 oper 方法改寫自 kafka.admin.TopicCommand$.main 方法。可以發現這部分代碼非常怪異,原因是 TopicCommand$ 是 Scala 寫的,再編譯成 Java class 位元組碼,然後我根據這些位元組碼反編譯得到 Java 代碼,並以此為基礎進行修改,等於是我在用 Java 的方式改寫 Scala 的代碼,難免會覺得詭異。當然,這種寫法用在生產環境的話是不太合適的,因為呼叫的 topicCommand$.createTopic 等方法都沒有丟擲異常,例如引數不合法的情況,而且也沒有使用 log4j 之類的 log 庫,只是用 System.out.println 這樣的方法屏顯,在出現錯誤的時候,比較難以定位。

參考文章

  • http://kafka.apache.org/documentation.html

  • http://www.jianshu.com/p/453c6e7ff81c

  • http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章

  • http://developer.51cto.com/art/201501/464491.htm

  • https://segmentfault.com/q/1010000004292925

  • http://www.cnblogs.com/gnivor/p/5318319.html

  • http://www.cnblogs.com/davidwang456/p/4313784.html

  • http://www.jianshu.com/p/8689901720fd

  • http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/

  • How to choose the number of topics/partitions in a Kafka cluster?

系列

【關於投稿】


如果大家有原創好文投稿,請直接給公號發送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

赞(0)

分享創造快樂