摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-remote-module/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文主要基於 SkyWalking 3.2.6 正式版
- 1. 概述
- 2. collector-remote-define
- 2.1 RemoteModule
- 2.2 RemoteSenderService
- 2.3 RemoteClientService
- 2.4 RemoteClient
- 2.5 CommonRemoteDataRegisterService
- 2.6 RemoteSerializeService
- 2.7 RemoteSerializeService
- 3. collector-remote-grpc-provider
- 3.1 RemoteModuleGRPCProvider
- 3.2 GRPCRemoteSenderService
- 3.3 GRPCRemoteClientService
- 3.4 GRPCRemoteClient
- 3.5 RemoteCommonServiceHandler
- 3.6 GRPCRemoteSerializeService
- 3.7 GRPCRemoteDeserializeService
- 4. collector-remote-grpc-provider
1. 概述
本文主要分享 SkyWalking Collector Remote 遠程通信服務。該服務用於 Collector 集群內部通信。
目前集群內部通信的目的,跨節點的流式處理。Remote Module 應用在 SkyWalking 架構圖如下位置( 紅框 ) :
FROM https://github.com/apache/incubating-skywalking
下麵我們來看看整體的專案結構,如下圖所示 :
collector-remote-define
:定義遠程通信接口。collector-remote-kafka-provider
:基於 Kafka 的遠程通信實現。目前暫未完成。collector-remote-grpc-provider
:基於 Google gRPC 的遠程通信實現。生產環境目前使用
下麵,我們從接口到實現的順序進行分享。
2. collector-remote-define
collector-remote-define
:定義遠程通信接口。專案結構如下 :

整體流程如下圖:

我們按照整個流程的處理順序,逐個解析涉及到的類與接口。
2.1 RemoteModule
org.skywalking.apm.collector.remote.RemoteModule
,實現 Module 抽象類,遠程通信 Module 。
#name()
實現方法,傳回模塊名為 "remote"
。
#services()
實現方法,傳回 Service 類名:RemoteSenderService 、RemoteDataRegisterService 。
2.2 RemoteSenderService
org.skywalking.apm.collector.remote.service.RemoteSenderService
,繼承 Service 接口,遠程發送服務接口,定義了 #send(graphId, nodeId, data, selector)
接口方法,呼叫 RemoteClient ,發送資料。
graphId
方法引數,Graph 編號。通過graphId
,可以查找到對應的 Graph 物件。- Graph 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「2. apm-collector-core/graph」 有詳細解析。
nodeId
方法引數,Worker 編號。通過workerId
,可以查找在 Graph 物件中的 Worker 物件,從而 Graph 中的流式處理。- Worker 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「3. apm-collector-stream」 有詳細解析。
data
方法引數,Data 資料物件。例如,流式處理的具體資料物件。- Data 在 《SkyWalking 原始碼分析 —— Collector Storage 儲存組件》「2. apm-collector-core」 有詳細解析。
selector
方法引數,org.skywalking.apm.collector.remote.service.Selector
選擇器物件。根據 Selector 物件,使用對應的負載均衡策略,選擇集群內的 Collector 節點,發送資料。- RemoteSenderService.Mode 傳回值,發送樣式分成
Remote
和Local
兩種方式。前者,發送資料到遠程的 Collector 節點;後者,發送資料到本地,即本地處理,參見RemoteWorkerRef#in(message)
方法。
2.3 RemoteClientService
org.skywalking.apm.collector.remote.service.RemoteClientService
,繼承 Service 接口,遠程客戶端服務接口,定義了 #create(host, port, channelSize, bufferSize)
接口方法,創建 RemoteClient 物件。
2.4 RemoteClient
org.skywalking.apm.collector.remote.service.RemoteClient
,繼承 java.lang.Comparable
接口,遠程客戶端接口。定義瞭如下接口方法:
#push(graphId, nodeId, data, selector)
接口方法,發送資料。#getAddress()
接口方法,傳回客戶端連接的遠程 Collector 地址。#equals(address)
接口方法,判斷 RemoteClient 是否連接了指定的地址。
2.5 CommonRemoteDataRegisterService
在說 CommonRemoteDataRegisterService 之前,首先來說下 CommonRemoteDataRegisterService 的意圖。
在上文中,我們可以看到發送給 Collector 是 Data 物件,而 Data 是資料的抽象類,在具體反序列化 Data 物件之前,程式是無法得知它是 Data 的哪個實現物件。這個時候,我們可以給 Data 物件的每個實現類,生成一個對應的資料協議編號。
- 在發送資料之前,序列化 Data 物件時,增加該 Data 對應的協議編號,一起發送。
- 在接收資料之後,反序列化資料時,根據協議編號,創建 Data 對應的實現類物件。
org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService
,通用遠程資料註冊服務。
id
屬性,資料協議自增編號。dataClassMapping
屬性,資料型別( Class extends Data> )與資料協議編號的映射。dataInstanceCreatorMapping
屬性,資料協議編號與資料物件創建器( RemoteDataInstanceCreator )的映射。
2.5.1 RemoteDataRegisterService
org.skywalking.apm.collector.remote.service.RemoteDataRegisterService
,繼承 Service 接口,遠程客戶端服務接口,定義了 #register(Class extends Data>, RemoteDataInstanceCreator)
接口方法,註冊資料型別對應的遠程資料創建器( RemoteDataRegisterService.RemoteDataInstanceCreator
)物件。
CommonRemoteDataRegisterService 實現了 RemoteDataRegisterService 接口,#register(Class extends Data>, RemoteDataInstanceCreator)
實現方法。
另外,AgentStreamRemoteDataRegister 會呼叫 RemoteDataRegisterService#register(Class extends Data>, RemoteDataInstanceCreator)
方法,註冊每個資料型別的 RemoteDataInstanceCreator 物件。註意,例如 Application::new
是 RemoteDataInstanceCreator 的匿名實現類。
2.5.2 RemoteDataIDGetter
org.skywalking.apm.collector.remote.service.RemoteDataIDGetter
,繼承 Service 接口,遠程資料協議編號獲取器接口,定義了 #getRemoteDataId(Class extends Data>)
接口方法,根據資料型別獲取資料協議編號。
CommonRemoteDataRegisterService 實現了 RemoteDataIDGetter 接口,#getRemoteDataId(Class extends Data>)
實現方法。
2.5.3 RemoteDataInstanceCreatorGetter
org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter
,繼承 Service 接口,遠程資料創建器的獲取器接口,定義了 #getInstanceCreator(remoteDataId
接口方法,根據資料協議編號獲得遠程資料創建器( RemoteDataInstanceCreator )。
CommonRemoteDataRegisterService 實現了 RemoteDataInstanceCreatorGetter 接口,#getInstanceCreator(remoteDataId)
實現方法。
2.6 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteSerializeService
,遠程通信序列化服務接口,定義了 #serialize(Data)
接口方法,序列化資料,生成 Builder 物件。
2.7 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteDeserializeService
,遠程通信序反列化服務接口,定義了 #deserialize(RemoteData, Data)
接口方法,反序列化傳輸資料。
3. collector-remote-grpc-provider
collector-remote-grpc-provider
,基於 Google gRPC 的遠程通信實現。
專案結構如下 :
預設配置,在 application-default.yml
已經配置如下:
remote:
gRPC:
host: localhost
port: 11800
3.1 RemoteModuleGRPCProvider
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider
,實現 ModuleProvider 抽象類,基於 gRPC 的組件服務提供者實現類。
#name()
實現方法,傳回組件服務提供者名為 "gRPC"
。
module()
實現方法,傳回組件類為 RemoteModule 。
#requiredModules()
實現方法,傳回依賴組件為 cluster
、gRPC_manager
。
#prepare(Properties)
實現方法,執行準備階段邏輯。
- 第 53 至 56 行 :創建 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 物件,並呼叫
#registerServiceImplementation()
父類方法,註冊到services
。
#start()
實現方法,執行啟動階段邏輯。
- Server 相關
- 第 65 行:創建 gRPC Server 物件。
- 第 67 行:註冊 RemoteCommonServiceHandler 物件到 gRPC Server 上,用於接收 gRPC 請求後的處理。
- 《SkyWalking 原始碼分析 —— Collector Server Component 服務器組件》「3. gRPC 實現」
- 《SkyWalking 原始碼分析 —— Collector gRPC Server Manager》
- 註冊發現相關
- 第 70 至 71 行:創建
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration
物件,將自己註冊到集群管理。這樣,自己可以被 Collector 集群節點發現,從而被呼叫。 - 第 73 至 74 行:註冊 GRPCRemoteSenderService 物件到集群管理。這樣,自己可以監聽到 Collector 集群節點的加入或離開,從而呼叫。
- 《SkyWalking 原始碼分析 —— Collector Cluster 集群管理》
- 第 70 至 71 行:創建
#notifyAfterCompleted()
實現方法,方法為空。
3.2 GRPCRemoteSenderService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService
,繼承 ClusterModuleListener 抽象類,實現 RemoteSenderService 接口,基於 gPRC 的遠程發送服務實現類。
3.2.1 註冊發現
通過繼承 ClusterModuleListener 抽象類,實現了監聽 Collector 集群節點的加入或離開。
remoteClients
屬性,連接 Collector 集群節點的客戶端陣列。每個 Collector 集群節點,對應一個客戶端。#path()
實現方法,傳回監聽的目錄"/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME
。Collector 集群中,每個節點的 Remote Server 都會註冊到該目錄下。#serverJoinNotify(serverAddress)
實現方法,當新的節點加入,創建新的客戶端連接。#serverQuitNotify(serverAddress)
實現方法,當老的節點離開,移除對應的客戶端連接。
3.2.2 負載均衡
RemoteModuleGRPCProvider 基於不同的選擇器 ( Selector ) ,提供不同的客戶端選擇( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector
)實現 :
hashCodeSelector
屬性,HashCodeSelector ,基於資料的哈希碼。foreverFirstSelector
屬性,ForeverFirstSelector ,基於客戶端陣列的順序,選擇第一個。rollingSelector
屬性,RollingSelector ,基於客戶端陣列的順序,順序向下選擇。#send(graphId, nodeId, data, selector)
方法,代碼如下:- 第 76 至 77 行:當選擇的客戶端連接的是本地時,不發送資料,交給本地處理,參見
RemoteWorkerRef#in(message)
方法。 - 第 78 至 81 行:當選擇的客戶端連接的是遠程時,呼叫
RemoteClient#push(graphId, nodeId, data)
方法,發送資料。 - 第 63 、66 、69 行:根據選擇器,呼叫
RemoteClientSelector#select(clients, data)
方法,選擇客戶端。 - 第 64 、67 、70 行:呼叫
#sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data)
方法,發送請求資料。
- 第 76 至 77 行:當選擇的客戶端連接的是本地時,不發送資料,交給本地處理,參見
3.3 GRPCRemoteClientService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService
,實現 RemoteClientService 接口,基於 gRPC 的遠程客戶端服務實現類。
#create(host, port, channelSize, bufferSize)
實現方法,創建 GRPCRemoteClient 物件。
3.4 GRPCRemoteClient
友情提示:本小節會涉及較多 gRPC 相關的知識,建議不熟悉的胖友自己 Google ,補充下姿勢。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient
,實現 RemoteClient 接口,基於 gRPC 的遠程客戶端實現類。
client
屬性,GRPCClient 物件。相比來說,GRPCRemoteClient 偏業務的封裝,內部呼叫 GRPCClient 物件。carrier
屬性,DataCarrier 物件,本地訊息佇列。GRPCRemoteClient 在被呼叫發送資料時,先提交到本地佇列,異步消費進行發送到遠程 Collector 節點。DataCarrier 在 《SkyWalking 原始碼分析 —— DataCarrier 異步處理庫》 詳細解析。- 第 63 行:呼叫
DataCarrier#consume(IConsumer, num)
方法,設置消費者為 RemoteMessageConsumer 物件。
- 第 63 行:呼叫
#push(graphId, nodeId, data)
實現方法,異步發送訊息到遠程 Collector 。
- 第 73 行:呼叫
RemoteDataIDGetter#getRemoteDataId(Class extends Data>)
方法,獲得資料協議編號。 - 第 76 至 80 行:創建傳輸資料( RemoteMessage.Builder ) 物件。RemoteMessage 通過 Protobuf 創建定義,如下圖所示:
- 第 83 行:呼叫
DataCarrier#produce(data)
方法,發送資料到本地佇列。
RemoteMessageConsumer ,批量消費本地佇列的資料,逐條發送資料到遠程 Collector 節點。
#consume(List)
實現方法,代碼如下:- 第 100 行:創建 StreamObserver 物件。StreamObserver 主要是 gPRC 相關的 API 的呼叫。
- 第 101 至 103 行:呼叫
io.grpc.stub.StreamObserver#onNext(RemoteMessage)
方法,逐條發送資料。 - 第 106 行:呼叫
io.grpc.stub.StreamObserver#onCompleted()
方法,全部請求資料發送完成。
3.5 RemoteCommonServiceHandler
org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler
,實現 org.skywalking.apm.collector.server.grpc.GRPCHandler
接口,繼承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象類,遠程通信通用邏輯處理器。
其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto
檔案的定義如下圖:
#call(StreamObserver)
實現方法,代碼如下:
#onNext(RemoteMessage)
方法,處理每一條訊息,代碼如下:- 第 65 行:呼叫
RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId)
方法,獲得資料協議編號對應的 RemoteDataInstanceCreator 物件。然後,呼叫RemoteDataInstanceCreator#createInstance(id)
方法,創建資料協議編號對應的 Data 實現類對應的物件。 - 第 70 行:呼叫
GraphManager#findGraph(graphId)
方法,獲得graphId
對應的 Graph 物件。然後,調動GraphNodeFinder#findNext(nodeId)
方法,獲得 Next 物件。 - 第 71 行:呼叫
Next#execute(Data)
方法,繼續流式處理。
- 第 65 行:呼叫
3.6 GRPCRemoteSerializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService
,實現 RemoteSerializeService 接口,基於 gRPC 的遠程通信序列化服務實現類。
3.7 GRPCRemoteDeserializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService
,實現 GRPCRemoteDeserializeService 接口,基於 gRPC 的遠程通信反序列化服務實現類。
4. collector-remote-grpc-provider
collector-remote-kafka-provider
:基於 Kafka 的遠程通信實現。
目前暫未完成。
TODO 【4005】collector-remote-grpc-provider
朋友會在“發現-看一看”看到你“在看”的內容