歡迎光臨
每天分享高質量文章

分佈式鏈路追蹤 SkyWalking 原始碼分析 —— Collector Remote 遠程通信服務

摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-remote-module/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. collector-remote-define
    • 2.1 RemoteModule
    • 2.2 RemoteSenderService
    • 2.3 RemoteClientService
    • 2.4 RemoteClient
    • 2.5 CommonRemoteDataRegisterService
    • 2.6 RemoteSerializeService
    • 2.7 RemoteSerializeService
  • 3. collector-remote-grpc-provider
    • 3.1 RemoteModuleGRPCProvider
    • 3.2 GRPCRemoteSenderService
    • 3.3 GRPCRemoteClientService
    • 3.4 GRPCRemoteClient
    • 3.5 RemoteCommonServiceHandler
    • 3.6 GRPCRemoteSerializeService
    • 3.7 GRPCRemoteDeserializeService
  • 4. collector-remote-grpc-provider

     


1. 概述

本文主要分享 SkyWalking Collector Remote 遠程通信服務。該服務用於 Collector 集群內部通信。

目前集群內部通信的目的,跨節點的流式處理。Remote Module 應用在 SkyWalking 架構圖如下位置( 紅框 ) :

FROM https://github.com/apache/incubating-skywalking

下麵我們來看看整體的專案結構,如下圖所示 :

  • collector-remote-define :定義遠程通信接口。
  • collector-remote-kafka-provider :基於 Kafka 的遠程通信實現。目前暫未完成
  • collector-remote-grpc-provider :基於 Google gRPC 的遠程通信實現。生產環境目前使用

下麵,我們從接口到實現的順序進行分享。

2. collector-remote-define

collector-remote-define :定義遠程通信接口。專案結構如下 :

整體流程如下圖:

我們按照整個流程的處理順序,逐個解析涉及到的類與接口。

2.1 RemoteModule

org.skywalking.apm.collector.remote.RemoteModule ,實現 Module 抽象類,遠程通信 Module 。

#name() 實現方法,傳回模塊名為 "remote"

#services() 實現方法,傳回 Service 類名:RemoteSenderService 、RemoteDataRegisterService 。

2.2 RemoteSenderService

org.skywalking.apm.collector.remote.service.RemoteSenderService ,繼承 Service 接口,遠程發送服務接口,定義了 #send(graphId, nodeId, data, selector) 接口方法,呼叫 RemoteClient ,發送資料。

  • graphId 方法引數,Graph 編號。通過 graphId ,可以查找到對應的 Graph 物件。
    • Graph 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「2. apm-collector-core/graph」 有詳細解析。
  • nodeId 方法引數,Worker 編號。通過 workerId ,可以查找在 Graph 物件中的 Worker 物件,從而 Graph 中的流式處理。
    • Worker 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「3. apm-collector-stream」 有詳細解析。
  • data 方法引數,Data 資料物件。例如,流式處理的具體資料物件。
    • Data 在 《SkyWalking 原始碼分析 —— Collector Storage 儲存組件》「2. apm-collector-core」 有詳細解析。
  • selector 方法引數,org.skywalking.apm.collector.remote.service.Selector 選擇器物件。根據 Selector 物件,使用對應的負載均衡策略,選擇集群內的 Collector 節點,發送資料。
  • RemoteSenderService.Mode 傳回值,發送樣式分成 RemoteLocal 兩種方式。前者,發送資料到遠程的 Collector 節點;後者,發送資料到本地,即本地處理,參見 RemoteWorkerRef#in(message) 方法。

2.3 RemoteClientService

org.skywalking.apm.collector.remote.service.RemoteClientService ,繼承 Service 接口,遠程客戶端服務接口,定義了 #create(host, port, channelSize, bufferSize) 接口方法,創建 RemoteClient 物件。

2.4 RemoteClient

org.skywalking.apm.collector.remote.service.RemoteClient ,繼承 java.lang.Comparable 接口,遠程客戶端接口。定義瞭如下接口方法:

  • #push(graphId, nodeId, data, selector) 接口方法,發送資料。
  • #getAddress() 接口方法,傳回客戶端連接的遠程 Collector 地址。
  • #equals(address) 接口方法,判斷 RemoteClient 是否連接了指定的地址。

2.5 CommonRemoteDataRegisterService

在說 CommonRemoteDataRegisterService 之前,首先來說下 CommonRemoteDataRegisterService 的意圖。

在上文中,我們可以看到發送給 Collector 是 Data 物件,而 Data 是資料的抽象類,在具體反序列化 Data 物件之前,程式是無法得知它是 Data 的哪個實現物件。這個時候,我們可以給 Data 物件的每個實現類,生成一個對應的資料協議編號

  • 在發送資料之前,序列化 Data 物件時,增加該 Data 對應的協議編號,一起發送。
  • 在接收資料之後,反序列化資料時,根據協議編號,創建 Data 對應的實現類物件。

org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService ,通用遠程資料註冊服務。

  • id 屬性,資料協議自增編號。
  • dataClassMapping 屬性,資料型別( Class extends Data> )與資料協議編號的映射。
  • dataInstanceCreatorMapping 屬性,資料協議編號與資料物件創建器( RemoteDataInstanceCreator )的映射。

2.5.1 RemoteDataRegisterService

org.skywalking.apm.collector.remote.service.RemoteDataRegisterService ,繼承 Service 接口,遠程客戶端服務接口,定義了 #register(Class extends Data>, RemoteDataInstanceCreator) 接口方法,註冊資料型別對應的遠程資料創建器( RemoteDataRegisterService.RemoteDataInstanceCreator )物件。

CommonRemoteDataRegisterService 實現了 RemoteDataRegisterService 接口,#register(Class extends Data>, RemoteDataInstanceCreator) 實現方法。

另外,AgentStreamRemoteDataRegister 會呼叫 RemoteDataRegisterService#register(Class extends Data>, RemoteDataInstanceCreator) 方法,註冊每個資料型別的 RemoteDataInstanceCreator 物件。註意,例如 Application::new 是 RemoteDataInstanceCreator 的匿名實現類

2.5.2 RemoteDataIDGetter

org.skywalking.apm.collector.remote.service.RemoteDataIDGetter ,繼承 Service 接口,遠程資料協議編號獲取器接口,定義了 #getRemoteDataId(Class extends Data>) 接口方法,根據資料型別獲取資料協議編號。

CommonRemoteDataRegisterService 實現了 RemoteDataIDGetter 接口,#getRemoteDataId(Class extends Data>) 實現方法。

2.5.3 RemoteDataInstanceCreatorGetter

org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter ,繼承 Service 接口,遠程資料創建器的獲取器接口,定義了 #getInstanceCreator(remoteDataId 接口方法,根據資料協議編號獲得遠程資料創建器( RemoteDataInstanceCreator )。

CommonRemoteDataRegisterService 實現了 RemoteDataInstanceCreatorGetter 接口,#getInstanceCreator(remoteDataId) 實現方法。

2.6 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteSerializeService ,遠程通信序列化服務接口,定義了 #serialize(Data) 接口方法,序列化資料,生成 Builder 物件。

2.7 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteDeserializeService ,遠程通信序反列化服務接口,定義了 #deserialize(RemoteData, Data) 接口方法,反序列化傳輸資料。

3. collector-remote-grpc-provider

collector-remote-grpc-provider ,基於 Google gRPC 的遠程通信實現。

專案結構如下 :

預設配置,在 application-default.yml 已經配置如下:

remote:
  gRPC:
    host: localhost
    port: 11800

3.1 RemoteModuleGRPCProvider

org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider ,實現 ModuleProvider 抽象類,基於 gRPC 的組件服務提供者實現類。

#name() 實現方法,傳回組件服務提供者名為 "gRPC"

module() 實現方法,傳回組件類為 RemoteModule 。

#requiredModules() 實現方法,傳回依賴組件為 clustergRPC_manager


#prepare(Properties) 實現方法,執行準備階段邏輯。

  • 第 53 至 56 行 :創建 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 物件,並呼叫 #registerServiceImplementation() 父類方法,註冊到 services

#start() 實現方法,執行啟動階段邏輯。

  • Server 相關
    • 第 65 行:創建 gRPC Server 物件。
    • 第 67 行:註冊 RemoteCommonServiceHandler 物件到 gRPC Server 上,用於接收 gRPC 請求後的處理。
    • 《SkyWalking 原始碼分析 —— Collector Server Component 服務器組件》「3. gRPC 實現」
    • 《SkyWalking 原始碼分析 —— Collector gRPC Server Manager》
  • 註冊發現相關
    • 第 70 至 71 行:創建 org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration 物件,將自己註冊到集群管理。這樣,自己可以被 Collector 集群節點發現,從而被呼叫。
    • 第 73 至 74 行:註冊 GRPCRemoteSenderService 物件到集群管理。這樣,自己可以監聽到 Collector 集群節點的加入或離開,從而呼叫。
    • 《SkyWalking 原始碼分析 —— Collector Cluster 集群管理》

#notifyAfterCompleted() 實現方法,方法為空。

3.2 GRPCRemoteSenderService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService ,繼承 ClusterModuleListener 抽象類,實現 RemoteSenderService 接口,基於 gPRC 的遠程發送服務實現類。

3.2.1 註冊發現

通過繼承 ClusterModuleListener 抽象類,實現了監聽 Collector 集群節點的加入或離開。

  • remoteClients 屬性,連接 Collector 集群節點的客戶端陣列。每個 Collector 集群節點,對應一個客戶端。
  • #path() 實現方法,傳回監聽的目錄 "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME 。Collector 集群中,每個節點的 Remote Server 都會註冊到該目錄下。
  • #serverJoinNotify(serverAddress) 實現方法,當新的節點加入,創建新的客戶端連接。
  • #serverQuitNotify(serverAddress) 實現方法,當老的節點離開,移除對應的客戶端連接。

3.2.2 負載均衡

RemoteModuleGRPCProvider 基於不同的選擇器 ( Selector ) ,提供不同的客戶端選擇( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector )實現 :

  • hashCodeSelector 屬性,HashCodeSelector ,基於資料的哈希碼。
  • foreverFirstSelector 屬性,ForeverFirstSelector ,基於客戶端陣列的順序,選擇第一個。
  • rollingSelector 屬性,RollingSelector ,基於客戶端陣列的順序,順序向下選擇。
  • #send(graphId, nodeId, data, selector) 方法,代碼如下:
    • 第 76 至 77 行:當選擇的客戶端連接的是本地時,不發送資料,交給本地處理,參見 RemoteWorkerRef#in(message) 方法。
    • 第 78 至 81 行:當選擇的客戶端連接的是遠程時,呼叫 RemoteClient#push(graphId, nodeId, data) 方法,發送資料。
    • 第 63 、66 、69 行:根據選擇器,呼叫 RemoteClientSelector#select(clients, data) 方法,選擇客戶端。
    • 第 64 、67 、70 行:呼叫 #sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data) 方法,發送請求資料。

3.3 GRPCRemoteClientService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService ,實現 RemoteClientService 接口,基於 gRPC 的遠程客戶端服務實現類。

#create(host, port, channelSize, bufferSize) 實現方法,創建 GRPCRemoteClient 物件。

3.4 GRPCRemoteClient

友情提示:本小節會涉及較多 gRPC 相關的知識,建議不熟悉的胖友自己 Google ,補充下姿勢。

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient ,實現 RemoteClient 接口,基於 gRPC 的遠程客戶端實現類。

  • client 屬性,GRPCClient 物件。相比來說,GRPCRemoteClient 偏業務的封裝,內部呼叫 GRPCClient 物件。
  • carrier 屬性,DataCarrier 物件,本地訊息佇列。GRPCRemoteClient 在被呼叫發送資料時,先提交到本地佇列,異步消費進行發送到遠程 Collector 節點。DataCarrier 在 《SkyWalking 原始碼分析 —— DataCarrier 異步處理庫》 詳細解析。
    • 第 63 行:呼叫 DataCarrier#consume(IConsumer, num) 方法,設置消費者為 RemoteMessageConsumer 物件。

#push(graphId, nodeId, data) 實現方法,異步發送訊息到遠程 Collector 。

  • 第 73 行:呼叫 RemoteDataIDGetter#getRemoteDataId(Class extends Data>) 方法,獲得資料協議編號
  • 第 76 至 80 行:創建傳輸資料( RemoteMessage.Builder ) 物件。RemoteMessage 通過 Protobuf 創建定義,如下圖所示:
  • 第 83 行:呼叫 DataCarrier#produce(data) 方法,發送資料到本地佇列。

RemoteMessageConsumer ,批量消費本地佇列的資料,逐條發送資料到遠程 Collector 節點。

  • #consume(List) 實現方法,代碼如下:
    • 第 100 行:創建 StreamObserver 物件。StreamObserver 主要是 gPRC 相關的 API 的呼叫。
    • 第 101 至 103 行:呼叫 io.grpc.stub.StreamObserver#onNext(RemoteMessage) 方法,逐條發送資料。
    • 第 106 行:呼叫 io.grpc.stub.StreamObserver#onCompleted() 方法,全部請求資料發送完成

3.5 RemoteCommonServiceHandler

org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler ,實現 org.skywalking.apm.collector.server.grpc.GRPCHandler 接口,繼承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象類,遠程通信通用邏輯處理器。

其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto 檔案的定義如下圖:

#call(StreamObserver) 實現方法,代碼如下:

  • #onNext(RemoteMessage) 方法,處理每一條訊息,代碼如下:
    • 第 65 行:呼叫 RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId) 方法,獲得資料協議編號對應的 RemoteDataInstanceCreator 物件。然後,呼叫 RemoteDataInstanceCreator#createInstance(id) 方法,創建資料協議編號對應的 Data 實現類對應的物件。
    • 第 70 行:呼叫 GraphManager#findGraph(graphId) 方法,獲得 graphId 對應的 Graph 物件。然後,調動 GraphNodeFinder#findNext(nodeId) 方法,獲得 Next 物件。
    • 第 71 行:呼叫 Next#execute(Data) 方法,繼續流式處理。

3.6 GRPCRemoteSerializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService ,實現 RemoteSerializeService 接口,基於 gRPC 的遠程通信序列化服務實現類。

3.7 GRPCRemoteDeserializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService ,實現 GRPCRemoteDeserializeService 接口,基於 gRPC 的遠程通信反序列化服務實現類。

4. collector-remote-grpc-provider

collector-remote-kafka-provider :基於 Kafka 的遠程通信實現。

目前暫未完成

TODO 【4005】collector-remote-grpc-provider

赞(0)

分享創造快樂