歡迎光臨
每天分享高質量文章

鏈路追蹤 SkyWalking 原始碼分析 —— Collector Storage 儲存組件

摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-storage-module/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. apm-collector-core
  • 3. collector-storage-define
  • 4. collector-storage-h2-provider
  • 5. collector-storage-es-provider

1. 概述

本文主要分享 SkyWalking Collector Storage 儲存組件。顧名思義,負責將呼叫鏈路、應用、應用實體等等信息儲存到儲存器,例如,ES 、H2 。

友情提示:建議先閱讀 《SkyWalking 原始碼分析 —— Collector 初始化》 ,以瞭解 Collector 組件體系。

FROM https://github.com/apache/incubating-skywalking

下麵我們來看看整體的專案結構,如下圖所示 :

  • apm-collector-core 的 data 和 define  :資料的抽象。
  • collector-storage-define :定義儲存組件接口。
  • collector-storage-h2-provider :基於 H2 的 儲存組件實現。該實現是單機版,建議僅用於 SkyWalking 快速上手,生產環境不建議使用
  • collector-storage-es-provider :基於 Elasticsearch 的集群管理實現。生產環境推薦使用

下麵,我們從接口到實現的順序進行分享。

2. apm-collector-core

apm-collector-core 的 data 和 define ,如下圖所示:

我們對類進行梳理分類,如下圖:

  • Table :Data 和 TableDefine 之間的橋梁,每個 Table 定義了該表的表名欄位名們
  • TableDefine :Table 的詳細定義,包括表名欄位定義( ColumnDefine )們。在下文中,StorageInstaller 會基於 TableDefine 初始化表的相關信息。
  • Data :資料,包括一條資料的資料值們和資料欄位( Column )們。在下文中,Dao 會儲存 Data 到儲存器中。另外,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》 中,我們也會看到對 Data 的流式處理通用封裝。

2.1 Table

org.skywalking.apm.collector.core.data.CommonTable ,通用表。

  • `TABLE_TYPE` 靜態屬性,表型別。目前只有 ES 儲存組件使用到,下文詳細解析。
  • `COLUMN_` 前綴的靜態屬性,通用的欄位名。

在 collector-storage-define 的 table 下,我們可以看到所有 Table 類,以 "Table" 結尾。每個 Table 的表名,在每個實現類里,例如 ApplicationTable 。

2.2 TableDefine

org.skywalking.apm.collector.core.data.TableDefine ,表定義抽象類

  • `name` 屬性,表名。
  • `columnDefines` 屬性,ColumnDefine陣列。
  • `#initialize()` 抽象方法,初始化表定義。例如:ApplicationEsTableDefine 。

不同的儲存組件實現,有不同的 TableDefine 實現類,如下圖:

  • ElasticSearchTableDefine :基於 Elasticsearch 的表定義抽象類,在 collector-storage-es-provider 的 define 下,我們可以看到所有 ES 的 TableDefine 類。

  • H2TableDefine :基於 H2 的表定義抽象類,在 collector-storage-h2-provider 的 `define` 下,我們可以看到所有 H2 的 TableDefine 類。

2.2.1 ColumnDefine

org.skywalking.apm.collector.core.data.ColumnDefine ,欄位定義抽象類

  • `name` 屬性,欄位名。
  • `type` 屬性,欄位型別。

在 collector-storage-xxx-provider 模塊中,H2ColumnDefine 、ElasticSearchColumnDefine 實現 ColumnDefine 。

2.2.2 Loader

涉及到的類如下圖所示:

org.skywalking.apm.collector.core.data.StorageDefineLoader ,呼叫 org.skywalking.apm.collector.core.define.DefinitionLoader ,從 org.skywalking.apm.collector.core.data.StorageDefinitionFile 中,加載 TableDefine 實現類陣列。

另外,在 collector-storage-es-provider 和 collector-storage-h2-provider 里都有 storage.define 檔案,如下圖:

  • StorageDefinitionFile 宣告了讀取該檔案。
  • 註意,DefinitionLoader 在加載時,兩個檔案都會被讀取,最終在 StorageInstaller#defineFilter(List) 方法,進行過濾。

代碼比較簡單,中文註釋已加,胖友自己閱讀理解下。

2.3 Data

org.skywalking.apm.collector.core.data.Data ,資料抽象類

  • [dataXXX]() 前綴的屬性,欄位值們。
    • `dataStrings` 屬性的第一位,是 ID 屬性。參見 構造方法的【第 51 行】 或者 `#setId(id)` 方法。
  • [xxxColumns]() 後綴的屬性,欄位( Column )們。
  • 通過上述兩種屬性 + 自身類,可以確定一條資料記錄的表、欄位型別、欄位名、欄位值。
  • 繼承 `org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage` ,帶是否訊息批處理的最後一條標記的訊息抽象類,`endOfBatch` 屬性,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 詳細解析。
    • 繼承 `org.skywalking.apm.collector.core.data.AbstractHashMessage` ,帶哈希碼的訊息抽象類,`hashCode` 屬性,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 詳細解析。
  • `#mergeData(Data)` 方法,合併傳入的資料到自身。該方法被 `AggregationWorker#aggregate(message)` 呼叫,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 詳細解析。

在 collector-storage-define 的 table 下,我們可以看到所有 Data 類, "Table" 結尾,例如 Application 。

2.3.1 Column

org.skywalking.apm.collector.core.data.Column ,欄位。

  • `name` 屬性,欄位名。
  • `operation` 屬性,操作( Operation )。

2.3.2 Operation

org.skywalking.apm.collector.core.data.Operation ,操作接口。用於兩個值之間的操作,例如,相加等等。目前實現類有:

  • AddOperation :值相加操作。
  • CoverOperation :值改寫操作,即以新值為傳回。
  • NonOperation :空操作,即以老值為傳回。

3. collector-storage-define

collector-cluster-define :定義儲存組件接口。專案結構如下 :

3.1 StorageModule

org.skywalking.apm.collector.storage.StorageModule ,實現 Module 抽象類,集群管理 Module 。

#name() 實現方法,傳回模塊名為 "storage" 。

#services() 實現方法,傳回 Service 類名:在 org.skywalking.apm.collector.storage.dao 下的所有類 和 IBatchDAO。

3.2 table 包

在 org.skywalking.apm.collector.storage.table 包下,定義了儲存模塊所有的 Table 和 Data 實現類。

3.3 StorageInstaller

org.skywalking.apm.collector.storage.StorageInstaller ,儲存安裝器抽象類,基於 TableDefine ,初始化儲存組件的表。

  • `#defineFilter(List)` 抽象方法,過濾 TableDefine 陣列中,非自身需要的。例如說,ElasticSearchStorageInstaller 過濾後,只保留 ElasticSearchTableDefine 物件。
  • `#isExists(Client, TableDefine)` 抽象方法,判斷表是否存在。
  • `#deleteTable(Client, TableDefine)` 抽象方法,刪除表。
  • `#createTable(Client, TableDefine)` 抽象方法,創建表。
  • `#install(Client)` 方法,基於 TableDefine ,初始化儲存組件的表。
    • 該方法會被 StorageModuleH2Provider 或 StorageModuleEsProvider 啟動時呼叫。

3.4 dao 包

在 collector-storage-define 專案結構圖,我們看到一共有個 bao 包:

  • org.skywalking.apm.collector.storage.base.dao ,系統的 DAO 接口。
  • org.skywalking.apm.collector.storage.dao ,業務的 DAO 接口。
    • 繼承系統的 DAO 接口。
    • 被 `collector-storage-xxx-provider` 的 `dao` 包實現

3.4.1 系統 DAO

org.skywalking.apm.collector.storage.base.dao.DAO ,繼承 Service 接口,DAO 接口

無任何方法。

3.4.1.1 AbstractDAO

org.skywalking.apm.collector.storage.base.dao.AbstractDAO ,實現 DAO 接口,DAO 抽象基類。

  • `client` 屬性,資料操作客戶端。例如,H2Client 、ElasticSearchClient 。

在 collector-storage-xxx-provider 模塊中,H2DAO 、EsDAO 實現 AbstractDAO 。

3.4.1.2 IPersistenceDAO

org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO ,實現 DAO 接口,持久化 DAO 接口,定義了 Data 的增刪改查操作。

  • `#get(id)` 接口方法,根據 ID 查詢一條 Data 。
  • `#deleteHistory(startTimestamp, endTimestamp)` 接口方法,刪除時間範圍內的 Data 們。
  • `#prepareBatchInsert(data)` 接口方法,準備批量插入操作物件。例如:`CpuMetricEsPersistenceDAO#prepareBatchInsert(CpuMetric)` 方法,傳回的是 org.elasticsearch.action.index.IndexRequestBuilder 物件。註意:
    • 該方法不會發起具體的 DAO 操作,僅僅是創建插入操作物件,最終的執行在 `IBatchDAO#batchPersistence(List)`。
    • 該方法創建的是批量插入操作物件們中的一個。
  • `#prepareBatchUpdate(data)` 接口方法,準備批量更新操作物件。類似 #prepareBatchInsert(data)方法。

3.4.1.3 IBatchDAO

org.skywalking.apm.collector.storage.base.dao.IBatchDAO ,實現 DAO 接口,批量操作 DAO 接口

  • `#batchPersistence(List batchCollection)` 接口方法,通過執行批量操作物件陣列,實現批量持久化資料。
    • `batchCollection` 方法引數,通過 `IPersistenceDAO#prepareBatchInsert` 或 `IPersistenceDAO#prepareBatchUpdate` 方法,生成每個運算元組元素。
    • 該方法會被 `PersistenceTimer#extractDataAndSave(…)` 或 `PersistenceWorker#onWork(…)` 方法呼叫,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「4. PersistenceWorker」 詳細解析。

在 collector-storage-xxx-provider 模塊中,BatchH2DAO 、BatchEsDAO 實現 IBatchDAO 。

3.4.2 業務 DAO

在 StorageModule#services() 方法里,我們可以看到,業務 DAO 按照用途可以拆分成四種

  • Cache :快取應用、應用實體、服務名
  • Register :註冊應用、應用實體、服務名
  • Persistence :持久化,實際可以理解成批量持久化
  • UI :SkyWaling UI 查詢使用。

那麼整理如下:

Package Data Cache / Register Persistence UI 關聯文章
register Application
register Instance
register ServiceName
jvm CpuMetric
jvm CMetric
jvm MemoryMetric
jvm MemoryPoolMetric
global GlobalTrace
instance InstPerformance
node NodeComponent
node NodeMapping
noderef NodeReference
segment SegmentCost
segment Segment
service ServiceEntry
serviceref ServiceReference

4. collector-storage-h2-provider

collector-storage-h2-provider ,基於 H2 的儲存組件實現。專案結構如下 :

該實現是單機版,建議僅用於 SkyWalking 快速上手,生產環境不建議使用

由於生產環境主要使用 ES 的儲存組件實現,所以本文暫不解析相關實現,感興趣的胖友自己嗨起來。

5. collector-storage-es-provider

collector-storage-es-provider ,基於 ES 的儲存組件實現。專案結構如下 :

實際使用時,通過 application.yml 配置如下:

JSON storage: elasticsearch: cluster_name: elasticsearch cluster_transport_sniffer: true cluster_nodes: 127.0.0.1:9300 index_shards_number: 2 index_replicas_number: 0 ttl: 7

  • 生產環境下,推薦 Elasticsearch 配置成集群。
  • cluster_name 、cluster_transport_sniffer 、cluster_nodes 、index_shards_number 、index_replicas_number 引數,Elasticsearch 相關引數。
  • ttl :保留 N 天內的資料。超過 N 天的資料,將被自動滾動刪除。
    • 該功能目前版本暫未發佈,需要等到 5.0 版本後。
  • 《部署集群collector》

5.1 StorageModuleEsProvider

org.skywalking.apm.collector.storage.es.StorageModuleEsProvider ,實現 ModuleProvider抽象類,基於 ES 的儲存組件服務提供者。

#name() 實現方法,傳回組件服務提供者名為 "elasticsearch" 。

module() 實現方法,傳回組件類為 StorageModule 。

#requiredModules() 實現方法,傳回依賴組件為 "cluster" 。


#prepare(Properties) 實現方法,執行準備階段邏輯。

  • 第 71 至 75 行 :創建 `org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient` 物件。
  • 第 77 至 82 行 :創建 DAO 物件們,並呼叫 #registerServiceImplementation() 父類方法,註冊到 services 。

#start() 實現方法,執行啟動階段邏輯。

  • 第 90 行 :呼叫 ElasticSearchClient#initialize() 方法,初始化 ZookeeperClient 。
  • 第 93 至 94 行 :創建 ElasticSearchStorageInstaller 物件,初始化儲存組件的表。在 「5.2.4 ElasticSearchStorageInstaller」 詳細解析。
  • 第 100 至 102 行 :創建 `org.skywalking.apm.collector.storage.es.StorageModuleEsRegistration` 物件,並註冊信息到集群管理。在 《SkyWalking 原始碼分析 —— Collector Cluster 集群管理》 有詳細解析。
  • 第 105 至 107 行 :創建 `org.skywalking.apm.collector.storage.es.StorageModuleEsNamingListener`物件,並註冊信息到集群管理。在 《SkyWalking 原始碼分析 —— Collector Cluster 集群管理》 有詳細解析。
  • 第 110 至 111 行 :創建 DataTTLKeeperTimer 物件。在 「5.4 DataTTLKeeperTimer」 詳細解析。

#notifyAfterCompleted() 實現方法,執行啟動完成邏輯。

  • 第 115 行 :呼叫 DataTTLKeeperTimer#start() 方法,啟動 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 詳細解析。

5.2 define 包

在 collector-storage-es-provider 專案結構圖,我們看到一共有個 define 包:

  • org.skywalking.apm.collector.storage.es.base.define ,系統的 TableDefine 抽象類。
  • org.skywalking.apm.collector.storage.es.define ,業務的 TableDefine 實現類。
    • 繼承系統的 TableDefine 抽象類。

5.2.1 ElasticSearchTableDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine ,實現 TableDefine 接口,基於 Elasticsearch 的表定義抽象類

  • `#type()` 方法,文件元資料 _type 欄位,參見 《Elasticsearch學習筆記》「_type」 。
  • `#refreshInterval()` 抽象方法,文件索引掃清頻率,參見 《Elasticsearch: 權威指南 » 基礎入門 » 分片內部原理 » 近實時搜索》「refresh API」。

5.2.2 ElasticSearchColumnDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine ,實現 ColumnDefine 抽象類,基於 ES 的欄位定義。

  • Type 列舉類:列舉 ES 欄位型別。

5.2.3 業務 TableDefine 實現類

在 org.apache.skywalking.apm.collector.storage.es.define 里,我們可以看到,所有基於 ES 的業務 TableDefine 實現類。例如:ApplicationEsTableDefine 。

整體 #refreshInterval() 方法傳回的結果如下:

  • 1 s
    • CpuMetricEsTableDefine
    • GCMetricEsTableDefine
    • MemoryMetricEsTableDefine
    • MemoryPoolMetricEsTableDefine
  • 2 s
    • InstPerformanceEsTableDefine
    • NodeComponentEsTableDefine
    • NodeMappingEsTableDefine
    • NodeReferenceEsTableDefine
    • ServiceEntryEsTableDefine
    • ServiceReferenceEsTableDefine
  • 2 s && WriteRequest.RefreshPolicy.IMMEDIATE
    • 【WriteRequest.RefreshPolicy.IMMEDIATE】參見 `ApplicationEsRegisterDAO#save(Application)` 方法
    • ApplicationEsTableDefine
    • InstanceEsTableDefine
    • ServiceNameEsTableDefine
  • 5 s
    • GlobalTraceEsTableDefine
    • SegmentCostEsTableDefine
  • 10 s
    • SegmentEsTableDefine

5.2.4 ElasticSearchStorageInstaller

友情提示:ElasticSearchStorageInstaller 主要是對 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller,實現 StorageInstaller 抽象類, 基於 ES 儲存安裝器實現類。

  • `#defineFilter(List)` 實現方法,過濾陣列中,非 ElasticSearchTableDefine 的元素。
  • `#createTable(Client, TableDefine)` 實現方法,創建 Elasticsearch 索引。
    • SkyWalking 彭勇升 :`_index`和 `_type` 是 ES 特有的,考慮其他資料庫接入,所以沒有用他這個特性。
    • SkyWalking QQ交流群( 392443393 ) ,小心 群友 :`_type` 本來就沒做物理隔離,Lucene 層面也不存在,ES 6.x 已經廢棄了。
    • 《Elasticsearch 6.0 將移除 Type》
    • `_id` :資料編號,String 型別。
    • `_type` :`”type”` 。
    • `_index` :TableDefine 定義的表名
    • `source` :Data 資料。
    • 文件資料結構如下:
    • 瞭解 Elasticsearch 的胖友可能有和筆者一樣的疑惑,網絡上很多文章把 `_index` 類比成關係資料庫的 DB ,`_type` 類比成關係資料庫的 Table ,和 SkyWalking 目前使用的方式不一致
  • `#deleteTable(Client, TableDefine)` 實現方法,刪除 Elasticsearch 索引。
  • `#isExists(Client, TableDefine)` 實現方法,判斷 Elasticsearch 索引是否存在。
  • 在方法里,筆者添加了一些 API 的說明,不熟悉的胖友,可以仔細閱讀理解。

5.3 dao 包

在 collector-storage-es-provider 專案結構圖,我們看到一共有個 dao 包:

  • org.skywalking.apm.collector.storage.es.base.dao ,系統的 DAO 抽象類。
  • org.skywalking.apm.collector.storage.es.dao ,業務的 DAO 實現類。
    • 繼承系統的 DAO 抽象類。

5.3.1 EsDAO

org.skywalking.apm.collector.storage.es.base.dao.EsDAO ,實現 AbstractDAO 抽象類,基於 ES 的 DAO 抽象類

  • `#getMaxId(indexName, columnName)` 方法,獲得索引名的指定欄位的最大值
  • `#getMinId(indexName, columnName)` 方法,獲得索引名的指定欄位的最小值

5.3.2 BatchEsDAO

org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO ,實現 IBatchDAO 接口,繼承 EsDAO 抽象類,基於 ES 批量操作 DAO 實現類。

  • `#batchPersistence(List)` 實現方法,將 org.elasticsearch.action.index.IndexRequestBuilder 和 org.elasticsearch.action.index.UpdateRequestBuilder 陣列,創建成 org.elasticsearch.action.bulk.BulkRequestBuilder 物件,批量持久化。
    • IndexRequestBuilder 和 UpdateRequestBuilder 的創建,在 「5.3.3 業務 DAO 實現類」 會看到。

5.3.3 業務 DAO 實現類

在 org.apache.skywalking.apm.collector.storage.es.dao 里,我們可以看到,所有基於 ES 的業務 DAO 實現類。

實現代碼易懂,胖友可以自己閱讀。良心如我們,按照 DAO 的業務用途,推薦例子如下:

  • Cache :ApplicationEsCacheDAO
  • Register :ApplicationEsRegisterDAO
  • Persistence :SegmentEsPersistenceDAO
    • 此處可見 IndexRequestBuilder 和 UpdateRequestBuilder 的創建。
  • UI :SegmentEsUIDAO

5.4 DataTTLKeeperTimer

org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer ,過期資料刪除定時器。通過該定時器,只保留 N 天內的資料。

  • `#start()` 方法,啟動定時任務。
    • 第 49 行:創建延遲 1 小時,每 8 小時執行一次 `#delete()` 方法的定時任務。目前該行代碼被註釋,胖友可以等待 SkyWallking 5.0 版本的發佈。
  • `#delete()` 方法,刪除過期資料。
    • 第 54 至 66 行:計算刪除的開始與結束時間,即指定時間的前一天。例如,2017-12-23 執行時,刪除 2017-12-16 那天的資料。
    • 第 69 行:呼叫 `#deleteJVMRelatedData(startTimestamp, endTimestamp)` 方法,刪除 JVM 相關的資料。
    • 第 70 行:呼叫 `#deleteTraceRelatedData(startTimestamp, endTimestamp)` 方法,刪除 Trace 相關的資料。

如下是不會刪除的資料的表:

  • Application
  • Instance
  • ServiceName
  • ServiceEntry
已同步到看一看
赞(0)

分享創造快樂