歡迎光臨
每天分享高質量文章

分散式鏈路追蹤 SkyWalking 原始碼分析 —— Collector Remote 遠端通訊服務

摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-remote-module/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. collector-remote-define
    • 2.1 RemoteModule
    • 2.2 RemoteSenderService
    • 2.3 RemoteClientService
    • 2.4 RemoteClient
    • 2.5 CommonRemoteDataRegisterService
    • 2.6 RemoteSerializeService
    • 2.7 RemoteSerializeService
  • 3. collector-remote-grpc-provider
    • 3.1 RemoteModuleGRPCProvider
    • 3.2 GRPCRemoteSenderService
    • 3.3 GRPCRemoteClientService
    • 3.4 GRPCRemoteClient
    • 3.5 RemoteCommonServiceHandler
    • 3.6 GRPCRemoteSerializeService
    • 3.7 GRPCRemoteDeserializeService
  • 4. collector-remote-grpc-provider

     


1. 概述

本文主要分享 SkyWalking Collector Remote 遠端通訊服務。該服務用於 Collector 叢集內部通訊。

目前叢集內部通訊的目的,跨節點的流式處理。Remote Module 應用在 SkyWalking 架構圖如下位置( 紅框 ) :

FROM https://github.com/apache/incubating-skywalking

下麵我們來看看整體的專案結構,如下圖所示 :

  • collector-remote-define :定義遠端通訊介面。
  • collector-remote-kafka-provider :基於 Kafka 的遠端通訊實現。目前暫未完成
  • collector-remote-grpc-provider :基於 Google gRPC 的遠端通訊實現。生產環境目前使用

下麵,我們從介面到實現的順序進行分享。

2. collector-remote-define

collector-remote-define :定義遠端通訊介面。專案結構如下 :

整體流程如下圖:

我們按照整個流程的處理順序,逐個解析涉及到的類與介面。

2.1 RemoteModule

org.skywalking.apm.collector.remote.RemoteModule ,實現 Module 抽象類,遠端通訊 Module 。

#name() 實現方法,傳回模組名為 "remote"

#services() 實現方法,傳回 Service 類名:RemoteSenderService 、RemoteDataRegisterService 。

2.2 RemoteSenderService

org.skywalking.apm.collector.remote.service.RemoteSenderService ,繼承 Service 介面,遠端傳送服務介面,定義了 #send(graphId, nodeId, data, selector) 介面方法,呼叫 RemoteClient ,傳送資料。

  • graphId 方法引數,Graph 編號。透過 graphId ,可以查詢到對應的 Graph 物件。
    • Graph 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「2. apm-collector-core/graph」 有詳細解析。
  • nodeId 方法引數,Worker 編號。透過 workerId ,可以查詢在 Graph 物件中的 Worker 物件,從而 Graph 中的流式處理。
    • Worker 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「3. apm-collector-stream」 有詳細解析。
  • data 方法引數,Data 資料物件。例如,流式處理的具體資料物件。
    • Data 在 《SkyWalking 原始碼分析 —— Collector Storage 儲存元件》「2. apm-collector-core」 有詳細解析。
  • selector 方法引數,org.skywalking.apm.collector.remote.service.Selector 選擇器物件。根據 Selector 物件,使用對應的負載均衡策略,選擇叢集內的 Collector 節點,傳送資料。
  • RemoteSenderService.Mode 傳回值,傳送樣式分成 RemoteLocal 兩種方式。前者,傳送資料到遠端的 Collector 節點;後者,傳送資料到本地,即本地處理,參見 RemoteWorkerRef#in(message) 方法。

2.3 RemoteClientService

org.skywalking.apm.collector.remote.service.RemoteClientService ,繼承 Service 介面,遠端客戶端服務介面,定義了 #create(host, port, channelSize, bufferSize) 介面方法,建立 RemoteClient 物件。

2.4 RemoteClient

org.skywalking.apm.collector.remote.service.RemoteClient ,繼承 java.lang.Comparable 介面,遠端客戶端介面。定義瞭如下介面方法:

  • #push(graphId, nodeId, data, selector) 介面方法,傳送資料。
  • #getAddress() 介面方法,傳回客戶端連線的遠端 Collector 地址。
  • #equals(address) 介面方法,判斷 RemoteClient 是否連線了指定的地址。

2.5 CommonRemoteDataRegisterService

在說 CommonRemoteDataRegisterService 之前,首先來說下 CommonRemoteDataRegisterService 的意圖。

在上文中,我們可以看到傳送給 Collector 是 Data 物件,而 Data 是資料的抽象類,在具體反序列化 Data 物件之前,程式是無法得知它是 Data 的哪個實現物件。這個時候,我們可以給 Data 物件的每個實現類,生成一個對應的資料協議編號

  • 在傳送資料之前,序列化 Data 物件時,增加該 Data 對應的協議編號,一起傳送。
  • 在接收資料之後,反序列化資料時,根據協議編號,建立 Data 對應的實現類物件。

org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService ,通用遠端資料註冊服務。

  • id 屬性,資料協議自增編號。
  • dataClassMapping 屬性,資料型別( Class extends Data> )與資料協議編號的對映。
  • dataInstanceCreatorMapping 屬性,資料協議編號與資料物件建立器( RemoteDataInstanceCreator )的對映。

2.5.1 RemoteDataRegisterService

org.skywalking.apm.collector.remote.service.RemoteDataRegisterService ,繼承 Service 介面,遠端客戶端服務介面,定義了 #register(Class extends Data>, RemoteDataInstanceCreator) 介面方法,註冊資料型別對應的遠端資料建立器( RemoteDataRegisterService.RemoteDataInstanceCreator )物件。

CommonRemoteDataRegisterService 實現了 RemoteDataRegisterService 介面,#register(Class extends Data>, RemoteDataInstanceCreator) 實現方法。

另外,AgentStreamRemoteDataRegister 會呼叫 RemoteDataRegisterService#register(Class extends Data>, RemoteDataInstanceCreator) 方法,註冊每個資料型別的 RemoteDataInstanceCreator 物件。註意,例如 Application::new 是 RemoteDataInstanceCreator 的匿名實現類

2.5.2 RemoteDataIDGetter

org.skywalking.apm.collector.remote.service.RemoteDataIDGetter ,繼承 Service 介面,遠端資料協議編號獲取器介面,定義了 #getRemoteDataId(Class extends Data>) 介面方法,根據資料型別獲取資料協議編號。

CommonRemoteDataRegisterService 實現了 RemoteDataIDGetter 介面,#getRemoteDataId(Class extends Data>) 實現方法。

2.5.3 RemoteDataInstanceCreatorGetter

org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter ,繼承 Service 介面,遠端資料建立器的獲取器介面,定義了 #getInstanceCreator(remoteDataId 介面方法,根據資料協議編號獲得遠端資料建立器( RemoteDataInstanceCreator )。

CommonRemoteDataRegisterService 實現了 RemoteDataInstanceCreatorGetter 介面,#getInstanceCreator(remoteDataId) 實現方法。

2.6 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteSerializeService ,遠端通訊序列化服務介面,定義了 #serialize(Data) 介面方法,序列化資料,生成 Builder 物件。

2.7 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteDeserializeService ,遠端通訊序反列化服務介面,定義了 #deserialize(RemoteData, Data) 介面方法,反序列化傳輸資料。

3. collector-remote-grpc-provider

collector-remote-grpc-provider ,基於 Google gRPC 的遠端通訊實現。

專案結構如下 :

預設配置,在 application-default.yml 已經配置如下:

remote:
  gRPC:
    host: localhost
    port: 11800

3.1 RemoteModuleGRPCProvider

org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider ,實現 ModuleProvider 抽象類,基於 gRPC 的元件服務提供者實現類。

#name() 實現方法,傳回元件服務提供者名為 "gRPC"

module() 實現方法,傳回元件類為 RemoteModule 。

#requiredModules() 實現方法,傳回依賴元件為 clustergRPC_manager


#prepare(Properties) 實現方法,執行準備階段邏輯。

  • 第 53 至 56 行 :建立 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 物件,並呼叫 #registerServiceImplementation() 父類方法,註冊到 services

#start() 實現方法,執行啟動階段邏輯。

  • Server 相關
    • 第 65 行:建立 gRPC Server 物件。
    • 第 67 行:註冊 RemoteCommonServiceHandler 物件到 gRPC Server 上,用於接收 gRPC 請求後的處理。
    • 《SkyWalking 原始碼分析 —— Collector Server Component 伺服器元件》「3. gRPC 實現」
    • 《SkyWalking 原始碼分析 —— Collector gRPC Server Manager》
  • 註冊發現相關
    • 第 70 至 71 行:建立 org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration 物件,將自己註冊到叢集管理。這樣,自己可以被 Collector 叢集節點發現,從而被呼叫。
    • 第 73 至 74 行:註冊 GRPCRemoteSenderService 物件到叢集管理。這樣,自己可以監聽到 Collector 叢集節點的加入或離開,從而呼叫。
    • 《SkyWalking 原始碼分析 —— Collector Cluster 叢集管理》

#notifyAfterCompleted() 實現方法,方法為空。

3.2 GRPCRemoteSenderService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService ,繼承 ClusterModuleListener 抽象類,實現 RemoteSenderService 介面,基於 gPRC 的遠端傳送服務實現類。

3.2.1 註冊發現

透過繼承 ClusterModuleListener 抽象類,實現了監聽 Collector 叢集節點的加入或離開。

  • remoteClients 屬性,連線 Collector 叢集節點的客戶端陣列。每個 Collector 叢集節點,對應一個客戶端。
  • #path() 實現方法,傳回監聽的目錄 "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME 。Collector 叢集中,每個節點的 Remote Server 都會註冊到該目錄下。
  • #serverJoinNotify(serverAddress) 實現方法,當新的節點加入,建立新的客戶端連線。
  • #serverQuitNotify(serverAddress) 實現方法,當老的節點離開,移除對應的客戶端連線。

3.2.2 負載均衡

RemoteModuleGRPCProvider 基於不同的選擇器 ( Selector ) ,提供不同的客戶端選擇( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector )實現 :

  • hashCodeSelector 屬性,HashCodeSelector ,基於資料的雜湊碼。
  • foreverFirstSelector 屬性,ForeverFirstSelector ,基於客戶端陣列的順序,選擇第一個。
  • rollingSelector 屬性,RollingSelector ,基於客戶端陣列的順序,順序向下選擇。
  • #send(graphId, nodeId, data, selector) 方法,程式碼如下:
    • 第 76 至 77 行:當選擇的客戶端連線的是本地時,不傳送資料,交給本地處理,參見 RemoteWorkerRef#in(message) 方法。
    • 第 78 至 81 行:當選擇的客戶端連線的是遠端時,呼叫 RemoteClient#push(graphId, nodeId, data) 方法,傳送資料。
    • 第 63 、66 、69 行:根據選擇器,呼叫 RemoteClientSelector#select(clients, data) 方法,選擇客戶端。
    • 第 64 、67 、70 行:呼叫 #sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data) 方法,傳送請求資料。

3.3 GRPCRemoteClientService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService ,實現 RemoteClientService 介面,基於 gRPC 的遠端客戶端服務實現類。

#create(host, port, channelSize, bufferSize) 實現方法,建立 GRPCRemoteClient 物件。

3.4 GRPCRemoteClient

友情提示:本小節會涉及較多 gRPC 相關的知識,建議不熟悉的胖友自己 Google ,補充下姿勢。

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient ,實現 RemoteClient 介面,基於 gRPC 的遠端客戶端實現類。

  • client 屬性,GRPCClient 物件。相比來說,GRPCRemoteClient 偏業務的封裝,內部呼叫 GRPCClient 物件。
  • carrier 屬性,DataCarrier 物件,本地訊息佇列。GRPCRemoteClient 在被呼叫傳送資料時,先提交到本地佇列,非同步消費進行傳送到遠端 Collector 節點。DataCarrier 在 《SkyWalking 原始碼分析 —— DataCarrier 非同步處理庫》 詳細解析。
    • 第 63 行:呼叫 DataCarrier#consume(IConsumer, num) 方法,設定消費者為 RemoteMessageConsumer 物件。

#push(graphId, nodeId, data) 實現方法,非同步傳送訊息到遠端 Collector 。

  • 第 73 行:呼叫 RemoteDataIDGetter#getRemoteDataId(Class extends Data>) 方法,獲得資料協議編號
  • 第 76 至 80 行:建立傳輸資料( RemoteMessage.Builder ) 物件。RemoteMessage 透過 Protobuf 建立定義,如下圖所示:
  • 第 83 行:呼叫 DataCarrier#produce(data) 方法,傳送資料到本地佇列。

RemoteMessageConsumer ,批次消費本地佇列的資料,逐條傳送資料到遠端 Collector 節點。

  • #consume(List) 實現方法,程式碼如下:
    • 第 100 行:建立 StreamObserver 物件。StreamObserver 主要是 gPRC 相關的 API 的呼叫。
    • 第 101 至 103 行:呼叫 io.grpc.stub.StreamObserver#onNext(RemoteMessage) 方法,逐條傳送資料。
    • 第 106 行:呼叫 io.grpc.stub.StreamObserver#onCompleted() 方法,全部請求資料傳送完成

3.5 RemoteCommonServiceHandler

org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler ,實現 org.skywalking.apm.collector.server.grpc.GRPCHandler 介面,繼承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象類,遠端通訊通用邏輯處理器。

其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto 檔案的定義如下圖:

#call(StreamObserver) 實現方法,程式碼如下:

  • #onNext(RemoteMessage) 方法,處理每一條訊息,程式碼如下:
    • 第 65 行:呼叫 RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId) 方法,獲得資料協議編號對應的 RemoteDataInstanceCreator 物件。然後,呼叫 RemoteDataInstanceCreator#createInstance(id) 方法,建立資料協議編號對應的 Data 實現類對應的物件。
    • 第 70 行:呼叫 GraphManager#findGraph(graphId) 方法,獲得 graphId 對應的 Graph 物件。然後,調動 GraphNodeFinder#findNext(nodeId) 方法,獲得 Next 物件。
    • 第 71 行:呼叫 Next#execute(Data) 方法,繼續流式處理。

3.6 GRPCRemoteSerializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService ,實現 RemoteSerializeService 介面,基於 gRPC 的遠端通訊序列化服務實現類。

3.7 GRPCRemoteDeserializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService ,實現 GRPCRemoteDeserializeService 介面,基於 gRPC 的遠端通訊反序列化服務實現類。

4. collector-remote-grpc-provider

collector-remote-kafka-provider :基於 Kafka 的遠端通訊實現。

目前暫未完成

TODO 【4005】collector-remote-grpc-provider

贊(0)

分享創造快樂