歡迎光臨
每天分享高質量文章

鏈路追蹤 SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)

摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-streaming-second/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. Data
    • 2.1 Collection
    • 2.2 DataCollection
    • 2.3 Window
    • 2.4 DataCache
  • 3. AggregationWorker
  • 4. PersistenceWorker
    • 4.1 WorkerCreateListener
    • 4.2 PersistenceTimer
  • 666. 彩蛋

1. 概述

本文接 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》 ,主要分享 Collector Streaming 流式處理的第二部分。主要包含如下部分:

  • AggregationWorker :聚合處理資料,後提交 Data 到 Next 節點們處理。
  • PersistenceWorker :聚合處理資料,後儲存 Data 。

2. Data

AggregationWorker 和 PersistenceWorker ,都先聚合處理資料,在進行各自的後續處理。那麼聚合處理的資料結果,需要有容器進行快取暫存:

  • org.skywalking.apm.collector.core.cache :接口
  • org.skywalking.apm.collector.stream.worker.impl.data :實現

類圖如下:

  • Collection :資料採集,提供有讀、寫兩個狀態的資料容器。
  • Window :視窗( ?這個解釋怪怪的 ),內有兩個 Collection
    • 一個 Collection ,負責寫入資料資料
    • 一個 Collection ,負責讀出處理資料
    • 當寫的 Collection 符合處理的條件,讀寫 Collection 切換

2.1 Collection

org.skywalking.apm.collector.core.cache.Collection ,資料採集接口

  • 資料相關 :#collection() / #size() / #clear()
  • 讀相關 :#reading() / #isReading() / #finishReading()
  • 寫相關 :#writing() / #isWriting() / #finishWriting()

2.2 DataCollection

org.skywalking.apm.collector.stream.worker.impl.data.DataCollection ,實現 Collection 接口,資料採集實現類,使用 Map 作為資料容器。

2.3 Window

org.skywalking.apm.collector.core.cache.Window ,視窗抽象類

構造方法 ,代碼如下:

  • windowDataA 屬性,視窗資料A 。
  • windowDataB 屬性,視窗資料B 。
  • 通過 #collectionInstance() 抽象方法,創建視窗資料( Collection )物件。
  • pointer 屬性,資料指向 windowDataAwindowDataA
    • #getCurrent() 方法,獲得資料指向,即 pointer
    • #getLast() 方法,獲得資料指向,即 pointer
  • windowSwitch 屬性,視窗切換計數。

切換 Collection 相關,方法如下:

  • #trySwitchPointer() 方法,傳回是否可以切換 Collection 。可以切換需要滿足如下條件:
    • 只有一個呼叫方申請切換,通過 windowSwitch 屬性進行計數。
    • 資料指向不處於正在讀取狀態。如果切換,一邊讀一邊寫,可能會有併發問題。
    • 無論是否可以切換 Collection ,需要呼叫 #trySwitchPointerFinally() 方法,釋放 windowSwitch 的計數。
  • #switchPointer() 方法,切換資料指向,並標記資料指向的 Collection 正在讀取中
  • #finishReadingLast() 方法,清空資料指向的 Collection 資料,並標記資料指向的 Collection 完成讀取( 不在正在讀取中 )。

寫 Collection 相關,方法如下:

  • #getCurrentAndWriting() 方法,獲得資料指向,並標記正在寫入中。通過正在寫入標記,切換 Collection 完成後,可以判斷該 Collection 正在寫入中,若是,等待不在寫入中,開始資料讀取並處理

2.4 DataCache

org.skywalking.apm.collector.stream.worker.impl.data.DataCache ,實現 Window 抽象類,資料快取。

  • #collectionInstance() 實現方法,創建 DataCollection 物件。
  • #currentCollectionSize() 方法,獲得當前資料指向( 寫入 Collection )的資料數量。

寫 Collection 相關,方法如下:

  • #writing() 方法,呼叫 #getCurrentAndWriting() 方法,開始寫入。即,獲得資料指向,並標記正在寫入中
    • lockedDataCollection 屬性,寫入的視窗資料。
    • #put(id, data) 方法,向 lockedDataCollection 屬性,寫入 Data 。
    • #get(id) 方法,向 lockedDataCollection 屬性,根據 ID 獲得 Data 。
    • #containsKey(id) 方法,向 lockedDataCollection 屬性,根據 ID 判斷 Data 是否存在 。
  • #finishWriting() 方法,完成寫入。即,標記 lockedDataCollection 不在正在寫入中

3. AggregationWorker

org.skywalking.apm.collector.stream.worker.impl.AggregationWorker ,實現 AbstractLocalAsyncWorker 抽象類,異步聚合 Worker,負責聚合處理資料,後提交 Data 到 Next 節點們處理。

構造方法 ,代碼如下:

  • dataCache 屬性,資料快取。
  • messageNum 屬性,訊息計數。當超過一定數量( 目前是 100 ),重置計數歸零。

#onWork(message) 實現方法,聚合處理資料,當滿足條件時,提交 Data 到 Next 節點們處理。

  • 第 53 行:messageNum 計數增加。
  • 第 56 行:呼叫 #aggregate(message) 方法,聚合訊息到資料。
  • 第 59 至 62 行:messageNum >= 100 時,呼叫 #sendToNext() ,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。
  • 第 65 至 67 行:messageNum.endOfBatch == true 時,當訊息是批處理的最後一條時,呼叫 #sendToNext() ,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。

#sendToNext() 方法,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。

  • 第 72 行:直接呼叫 Window#switchPointer() 方法,切換資料指標,並標記指向正在讀取中。這裡並未先呼叫 Window#trySwitchPointer() 方法,是否會有併發問題?目前這裡是異步單執行緒,所以不會有問題,參見 《SkyWalking 原始碼分析 —— Collector Queue 佇列組件》 。另外,在 「4. PersistenceWorker」 會看到併發的情況處理。
  • 第 74 至 80 行:等待指向不在讀取中。
  • 第 82 至 85 行:提交資料給 Next 節點們繼續處理。
  • 第 87 行:標記指向完成讀取。

4. PersistenceWorker

org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker ,實現 AbstractLocalAsyncWorker 抽象類,異步批量儲存 Worker,負責聚合處理資料,後儲存 Data 。

考慮到需要保證儲存的時效性,PersistenceWorker 使用 PersistenceTimer ,定時儲存 Data ,在 「4.2 PersistenceWorker」 詳細解析。


構造方法 ,代碼如下:

  • dataCache 屬性,資料快取。
  • batchDAO 屬性,批量操作 DAO ,在 《SkyWalking 原始碼分析 —— Collector Storage 儲存組件》 有詳細解析。

#needMergeDBData() 抽象方法,儲存時,是否需要合併資料。一些 Data 只有新增操作,沒有更新操作。

#persistenceDAO() 抽象方法,獲得 Data 對應的持久化 DAO 接口的實現類物件。

上述兩個抽象方法,用於 #prepareBatch(dataMap) 方法,生成批量操作物件陣列,最終呼叫 IBatchDAO#batchPersistence(List >) 方法,通過執行批量操作物件陣列,實現批量持久化資料,在 《SkyWalking 原始碼分析 —— Collector Storage 儲存組件》 有詳細解析。


#onWork(message) 實現方法,當滿足條件時儲存 Data ,而後聚合資料。這點和 AggregationWorker 相反的,因為要考慮併發問題。代碼如下:

  • 第 72 行:呼叫 DataCache#currentCollectionSize() 方法,獲得當前寫入 Collection 的資料數量,判斷是否超過 5000 。
    • 第 75 行:呼叫 DataCache#trySwitchPointer() 方法,判斷是否可以切換 Collection 。通過該判斷,保證和 PersistenceTimer 一起時,不會出現併發問題
    • 第 77 行:呼叫 Window#switchPointer() 方法,切換資料指標,並標記指向正在讀取中。
    • 第 80 行:呼叫 #buildBatchCollection() 方法,創建批量操作物件陣列。該方法和 AggregationWorker#sendToNext() 方法基本類似
    • 第 83 行:呼叫 IBatchDAO#batchPersistence(List >) 方法,通過執行批量操作物件陣列,實現批量持久化資料。
    • 第 86 行:呼叫 DataCache#trySwitchPointerFinally() 方法,釋放 DataCache.windowSwitch 的計數。
  • 第 91 行:呼叫 #aggregate(message) 方法,聚合資料。該方法和 AggregationWorker#aggregate(message) 方法基本相似

4.1 WorkerCreateListener

org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener ,Worker 創建監聽器。

Worker 在創建時,會呼叫 WorkerCreateListener#addWorker 方法,記錄所有的 PersistenceWorker 物件。

記錄下來有什麼用呢?在 AgentStreamBootStartup 啟動時,創建 PersistenceTimer 物件,並將 WorkerCreateListener 記錄的 PersistenceWorker 物件集合傳遞給 PersistenceTimer 物件。這樣,PersistenceTimer 能夠”訪問“到 PersistenceWorker 物件們的 DataCache ,定時儲存資料。

4.2 PersistenceTimer

org.skywalking.apm.collector.stream.timer.PersistenceTimer ,持久化定時任務,負責定時批量儲存 PersistenceWorker 快取的資料。

#start(IBatchDAO, List) 方法,創建延遲 1 秒,每 1 秒執行一次 #extractDataAndSave() 方法的定時任務,用於定時批量儲存 PersistenceWorker 快取的資料。

#extractDataAndSave(IBatchDAO, List) 方法,代碼如下:

  • 第 55 至 68 行:獲得所有 PersistenceWorker 讀 Collection 快取的資料。

    • 第 60 行:呼叫 PersistenceWorker#flushAndSwitch() 切換資料指標,即切換讀寫 Collection 。
    • 第 62 行:呼叫 PersistenceWorker#buildBatchCollection() 方法,創建批量操作物件陣列。
    • 怎麼保證併發安全?通過 Window#trySwitchPointer() 方法,保證讀 Collection 正在被讀取中時,PersistenceWorker 和 PersistenceTimer 有且僅有一個切換佇列,讀取資料。當讀取完成後,呼叫 Window#finishReadingLast() 方法,清空原資料指向,並標記原資料指向完成正在讀取中。
  • 第 71 行:呼叫 IBatchDAO#batchPersistence(List >) 方法,執行批量操作,進行儲存。

赞(0)

分享創造快樂