歡迎光臨
每天分享高質量文章

使用 Kafka 和 MongoDB 進行 Go 非同步處理 | Linux 中國

在這個示例中,我將資料的儲存和 MongoDB 分離,並建立另一個微服務去處理它。我還添加了 Kafka 為訊息層服務,這樣微服務就可以非同步處理它自己關心的東西了。
— Melvin Vivas


致謝
編譯自 | 
https://www.melvinvivas.com/developing-microservices-using-kafka-and-mongodb/
 
 作者 | Melvin Vivas
 譯者 | qhwdw ?共計翻譯:145 篇 貢獻時間:287 天

在我前面的部落格文章 “我的第一個 Go 微服務:使用 MongoDB 和 Docker 多階段構建[1]” 中,我建立了一個 Go 微服務示例,它釋出一個 REST 式的 http 端點,並將從 HTTP POST 中接收到的資料儲存到 MongoDB 資料庫。

在這個示例中,我將資料的儲存和 MongoDB 分離,並建立另一個微服務去處理它。我還添加了 Kafka 為訊息層服務,這樣微服務就可以非同步處理它自己關心的東西了。

如果你有時間去看,我將這個部落格文章的整個過程錄製到 這個影片中了[2] 🙂

下麵是這個使用了兩個微服務的簡單的非同步處理示例的上層架構圖。

rest-kafka-mongo-microservice-draw-io

微服務 1 —— 是一個 REST 式微服務,它從一個 /POST http 呼叫中接收資料。接收到請求之後,它從 http 請求中檢索資料,並將它儲存到 Kafka。儲存之後,它透過 /POST 傳送相同的資料去響應呼叫者。

微服務 2 —— 是一個訂閱了 Kafka 中的一個主題的微服務,微服務 1 的資料儲存在該主題。一旦訊息被微服務消費之後,它接著儲存資料到 MongoDB 中。

在你繼續之前,我們需要能夠去執行這些微服務的幾件東西:

☉ 下載 Kafka[3] —— 我使用的版本是 kafka_2.11-1.1.0
☉ 安裝 librdkafka[4] —— 不幸的是,這個庫應該在標的系統中
☉ 安裝 Kafka Go 客戶端[4]
☉ 執行 MongoDB。你可以去看我的 以前的文章[1] 中關於這一塊的內容,那篇文章中我使用了一個 MongoDB docker 映象。

我們開始吧!

首先,啟動 Kafka,在你執行 Kafka 伺服器之前,你需要執行 Zookeeper。下麵是示例:

  1. $ cd /<download path>/kafka_2.11-1.1.0

  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接著執行 Kafka —— 我使用 9092 埠連線到 Kafka。如果你需要改變埠,只需要在 config/server.properties 中配置即可。如果你像我一樣是個新手,我建議你現在還是使用預設埠。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起來之後,我們需要 MongoDB。它很簡單,只需要使用這個 docker-compose.yml 即可。

  1. version: '3'

  2. services:

  3.  mongodb:

  4.    image: mongo

  5.    ports:

  6.      - "27017:27017"

  7.    volumes:

  8.      - "mongodata:/data/db"

  9.    networks:

  10.      - network1

  11. volumes:

  12.   mongodata:

  13. networks:

  14.   network1:

使用 Docker Compose 去執行 MongoDB docker 容器。

  1. docker-compose up

這裡是微服務 1 的相關程式碼。我只是修改了我前面的示例去儲存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go[5]

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request) {

  2.    //Retrieve body from http request

  3.    b, err := ioutil.ReadAll(r.Body)

  4.    defer r.Body.Close()

  5.    if err != nil {

  6.        panic(err)

  7.    }

  8.    //Save data into Job struct

  9.    var _job Job

  10.    err = json.Unmarshal(b, &_job)

  11.    if err != nil {

  12.        http.Error(w, err.Error(), 500)

  13.        return

  14.    }

  15.    saveJobToKafka(_job)

  16.    //Convert job struct into json

  17.    jsonString, err := json.Marshal(_job)

  18.    if err != nil {

  19.        http.Error(w, err.Error(), 500)

  20.        return

  21.    }

  22.    //Set content-type http essay-header

  23.    w.Header().Set("content-type", "application/json")

  24.    //Send back data as response

  25.    w.Write(jsonString)

  26. }

  27. func saveJobToKafka(job Job) {

  28.    fmt.Println("save to kafka")

  29.    jsonString, err := json.Marshal(job)

  30.    jobString := string(jsonString)

  31.    fmt.Print(jobString)

  32.    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})

  33.    if err != nil {

  34.        panic(err)

  35.    }

  36.    // Produce messages to topic (asynchronously)

  37.    topic := "jobs-topic1"

  38.    for _, word := range []string{string(jobString)} {

  39.        p.Produce(&kafka.Message{

  40.            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},

  41.            Value:          []byte(word),

  42.        }, nil)

  43.    }

  44. }

這裡是微服務 2 的程式碼。在這個程式碼中最重要的東西是從 Kafka 中消費資料,儲存部分我已經在前面的部落格文章中討論過了。這裡程式碼的重點部分是從 Kafka 中消費資料:

kafka-to-mongo/kafka-mongo-sample.go[6]

  1. func main() {

  2.    //Create MongoDB session

  3.    session := initialiseMongo()

  4.    mongoStore.session = session

  5.    receiveFromKafka()

  6. }

  7. func receiveFromKafka() {

  8.    fmt.Println("Start receiving from Kafka")

  9.    c, err := kafka.NewConsumer(&kafka.ConfigMap{

  10.        "bootstrap.servers": "localhost:9092",

  11.        "group.id":          "group-id-1",

  12.        "auto.offset.reset": "earliest",

  13.    })

  14.    if err != nil {

  15.        panic(err)

  16.    }

  17.    c.SubscribeTopics([]string{"jobs-topic1"}, nil)

  18.    for {

  19.        msg, err := c.ReadMessage(-1)

  20.        if err == nil {

  21.            fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))

  22.            job := string(msg.Value)

  23.            saveJobToMongo(job)

  24.        } else {

  25.            fmt.Printf("Consumer error: %v (%v)\n", err, msg)

  26.            break

  27.        }

  28.    }

  29.    c.Close()

  30. }

  31. func saveJobToMongo(jobString string) {

  32.    fmt.Println("Save to MongoDB")

  33.    col := mongoStore.session.DB(database).C(collection)

  34.    //Save data into Job struct

  35.    var _job Job

  36.    b := []byte(jobString)

  37.    err := json.Unmarshal(b, &_job)

  38.    if err != nil {

  39.        panic(err)

  40.    }

  41.    //Insert job into MongoDB

  42.    errMongo := col.Insert(_job)

  43.    if errMongo != nil {

  44.        panic(errMongo)

  45.    }

  46.    fmt.Printf("Saved to MongoDB : %s", jobString)

  47. }

我們來演示一下,執行微服務 1。確保 Kafka 已經運行了。

  1. $ go run rest-kafka-sample.go

我使用 Postman 向微服務 1 傳送資料。

Screenshot-2018-04-29-22.20.33

這裡是日誌,你可以在微服務 1 中看到。當你看到這些的時候,說明已經接收到了來自 Postman 傳送的資料,並且已經儲存到了 Kafka。

Screenshot-2018-04-29-22.22.00

因為我們尚未執行微服務 2,資料被微服務 1 只儲存在了 Kafka。我們來消費它並透過執行的微服務 2 來將它儲存到 MongoDB。

  1. $ go run kafka-mongo-sample.go

現在,你將在微服務 2 上看到消費的資料,並將它儲存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

檢查一下資料是否儲存到了 MongoDB。如果有資料,我們成功了!

Screenshot-2018-04-29-22.26.39

完整的原始碼可以在這裡找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

現在是廣告時間:如果你喜歡這篇文章,請在 Twitter @donvito[8] 上關註我。我的 Twitter 上有關於 Docker、Kubernetes、GoLang、Cloud、DevOps、Agile 和 Startups 的內容。歡迎你們在 GitHub 和 LinkedIn[10] 關註我。

開心地玩吧!


via: https://www.melvinvivas.com/developing-microservices-using-kafka-and-mongodb/

作者:Melvin Vivas [12] 譯者:qhwdw 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖