歡迎光臨
每天分享高質量文章

Spark入門必讀:核心概念介紹及常用RDD操作

導讀:Spark是由加州大學伯克利分校AMP實驗室開源的分佈式大規模資料處理通用引擎,具有高吞吐、低延時、通用易擴展、高容錯等特點。Spark內部提供了豐富的開發庫,集成了資料分析引擎Spark SQL、圖計算框架GraphX、機器學習庫MLlib、流計算引擎Spark Streaming。

Spark在函式式編程語言Scala中實現,提供了豐富的開發API,支持Scala、Java、Python、R等多種開發語言。同時,Spark提供了多種運行樣式,既可以採用獨立部署的方式運行,也可以依托Hadoop YARN、Apache Mesos等資源管理器調度任務運行。

目前,Spark已經在金融、交通、醫療、氣象等多種領域中廣泛使用。

 

 

作者:肖冠宇

如需轉載請聯繫大資料(ID:hzdashuju)

 

 

01 Spark概述

 

1. 核心概念介紹

 

Spark架構示意圖如圖2-1所示,下麵將分別介紹各核心組件。

 

  • Client:客戶端行程,負責提交作業。

  • Driver:一個Spark作業有一個Spark Context,一個Spark Context對應一個Driver行程,作業的main函式運行在Driver中。Driver主要負責Spark作業的解析,以及通過DAGScheduler劃分Stage,將Stage轉化成TaskSet提交給TaskScheduler任務調度器,進而調度Task到Executor上執行。

  • Executor:負責執行Driver分發的Task任務。集群中一個節點可以啟動多個Executor,每一個Executor可以執行多個Task任務。

  • Catche:Spark提供了對RDD不同級別的快取策略,分別可以快取到記憶體、磁盤、外部分佈式記憶體儲存系統Tachyon等。

  • Application:提交的一個作業就是一個Application,一個Application只有一個Spark Context。

  • Job:RDD執行一次Action操作就會生成一個Job。

  • Task:Spark運行的基本單位,負責處理RDD的計算邏輯。

  • Stage:DAGScheduler將Job劃分為多個Stage,Stage的劃分界限為Shuffle的產生,Shuffle標志著上一個Stage的結束和下一個Stage的開始。

  • TaskSet:劃分的Stage會轉換成一組相關聯的任務集。

  • RDD(Resilient Distributed Dataset):彈性分佈式資料集,可以理解為一種只讀的分佈式多分割槽的陣列,Spark計算操作都是基於RDD進行的,下麵會有詳細介紹。

  • DAG(Directed Acyclic Graph):有向無環圖。Spark實現了DAG的計算模型,DAG計算模型是指將一個計算任務按照計算規則分解為若干子任務,這些子任務之間根據邏輯關係構建成有向無環圖。

 

▲圖2-1 Spark架構示意圖

 

2. RDD介紹

 

RDD從字面上理解有些困難,我們可以認為是一種分佈式多分割槽只讀的陣列,Spark計算操作都是基於RDD進行的。

RDD具有幾個特性:只讀、多分割槽、分佈式,可以將HDFS塊檔案轉換成RDD,也可以由一個或多個RDD轉換成新的RDD,失效自動重構。基於這些特性,RDD在分佈式環境下能夠被高效地並行處理。

 

(1)計算型別

 

在Spark中RDD提供Transformation和Action兩種計算型別。Transformation操作非常豐富,採用延遲執行的方式,在邏輯上定義了RDD的依賴關係和計算邏輯,但並不會真正觸發執行動作,只有等到Action操作才會觸發真正執行操作。Action操作常用於最終結果的輸出。

 

常用的Transformation操作及其描述:

 

  • map (func):接收一個處理函式並行處理源RDD中的每個元素,傳回與源RDD元素一一對應的新RDD

  • filter (func):並行處理源RDD中的每個元素,接收一個處理函式,並根據定義的規則對RDD中的每個元素進行過濾處理,傳回處理結果為true的元素重新組成新的RDD

  • flatMap (func):flatMap是map和flatten的組合操作,與map函式相似,不過map函式傳回的新RDD包含的元素可能是嵌套型別,flatMap接收一個處理嵌套會將嵌套型別的元素展開映射成多個元素組成新的RDD

  • mapPartitions (func):與map函式應用於RDD中的每個元素不同,mapPartitions應用於RDD中的每個分割槽。mapPartitions函式接收的引數為func函式,func接收引數為每個分割槽的迭代器,傳回值為每個分割槽元素處理之後組成的新的迭代器,func會作用於分割槽中的每一個元素。有一種典型的應用場景,比如待處理分割槽中的資料需要寫入到資料庫,如果使用map函式,每一個元素都會創建一個資料庫連接物件,非常耗時並且容易引起問題發生,如果使用mapPartitions函式只會在分割槽中創建一個資料庫連接物件,性能提高明顯

  • mapPartitionsWithIndex(func):作用與mapPartitions函式相同,只是接收的引數func函式需要傳入兩個引數,分割槽的索引作為第一個引數傳入,按照分割槽的索引對分割槽中元素進行處理

  • union (otherDataset):將兩個RDD進行合併,傳回結果為RDD中元素(不去重)

  • intersection (otherDataset):對兩個RDD進行取交集運算,傳回結果為RDD無重覆元素

  • distinct ([numTasks])):對RDD中元素去重

  • groupByKey ([numTasks]):在KV型別的RDD中按Key分組,將相同Key的元素聚集到同一個分割槽內,此函式不能接收函式作為引數,只接收一個可選引數任務數,所以不能在RDD分割槽本地進行聚合計算,如需按Key對Value聚合計算,只能對groupByKey傳回的新RDD繼續使用其他函式運算

  • reduceByKey (func, [numTasks]):對KV型別的RDD按Key分組,接收兩個引數,第一個引數為處理函式,第二個引數為可選引數設置reduce的任務數。reduceByKey函式能夠在RDD分割槽本地提前進行聚合運算,這有效減少了shuffle過程傳輸的資料量。相對於groupByKey函式更簡潔高效

  • aggregateByKey (zeroValue)(seqOp, combOp):對KV型別的RDD按Key分組進行reduce計算,可接收三個引數,第一個引數是初始化值,第二個引數是分割槽內處理函式,第三個引數是分割槽間處理函式

  • sortByKey ([ascending], [numTasks]):對KV型別的RDD內部元素按照Key進行排序,排序過程會涉及Shuffle

  • join (otherDataset, [numTasks]):對KV型別的RDD進行關聯,只能是兩個RDD之間關聯,超過兩個RDD關聯需要使用多次join函式,join函式只會關聯出具有相同Key的元素,相當於SQL陳述句中的inner join

  • cogroup (otherDataset, [numTasks]):對KV型別的RDD進行關聯,cogroup處理多個RDD關聯比join更加優雅,它可以同時傳入多個RDD作為引數進行關聯,產生的新RDD中的元素不會出現笛卡爾積的情況,使用fullOuterJoin函式會產生笛卡爾積

  • coalesce (numPartitions):對RDD重新分割槽,將RDD中的分割槽數減小到引數numPartitions個,不會產生shuffle。在較大的資料集中使用filer等過濾操作後可能會產生多個大小不等的中間結果資料檔案,重新分割槽並減小分割槽可以提高作業的執行效率,是Spark中常用的一種優化手段

  • repartition (numPartitions):對RDD重新分割槽,接收一個引數——numPartitions分割槽數,是coalesce函式設置shuffle為true的一種實現形式

  • repartitionAndSortWithinPartitions (partitioner):接收一個分割槽物件(如Spark提供的分割槽類HashPartitioner)對RDD中元素重新分割槽併在分割槽內排序

 

常用的Action操作及其描述:

 

  • reduce(func):處理RDD兩兩之間元素的聚集操作

  • collect():傳回RDD中所有資料元素

  • count():傳回RDD中元素個數

  • first():傳回RDD中的第一個元素

  • take(n):傳回RDD中的前n個元素

  • saveAsTextFile(path):將RDD寫入文本檔案,儲存至本地檔案系統或者HDFS中

  • saveAsSequenceFile(path):將KV型別的RDD寫入SequenceFile檔案,儲存至本地檔案系統或者HDFS中

  • countByKey():傳回KV型別的RDD每個Key包含的元素個數

  • foreach(func):遍歷RDD中所有元素,接收引數為func函式,常用操作是傳入println函式打印所有元素

 

從HDFS檔案生成Spark RDD,經過map、filter、join等多次Transformation操作,最終呼叫saveAsTextFile Action操作將結果集輸出到HDFS,並以檔案形式儲存。RDD的流轉過程如圖2-2所示。

 

圖2-2 RDD的流轉過程示意圖

 

(2)快取

 

在Spark中RDD可以快取到記憶體或者磁盤上,提供快取的主要目的是減少同一資料集被多次使用的網絡傳輸次數,提高Spark的計算性能。Spark提供對RDD的多種快取級別,可以滿足不同場景對RDD的使用需求。RDD的快取具有容錯性,如果有分割槽丟失,可以通過系統自動重新計算。

 

在代碼中可以使用persist()方法或cache()方法快取RDD。cache()方法預設將RDD快取到記憶體中,cache()方法和persist()方法都可以用unpersist()方法來取消RDD快取。示例如下:

 

val fileDataRdd = sc.textFile("hdfs://data/hadoop/test.text")
fileDataRdd.cache()        // 快取RDD到記憶體

 

或者

 

fileDataRdd.persist(StorageLevel.MEMORY_ONLY)
fileDataRdd..unpersist()        // 取消快取

 

Spark的所有快取級別定義在org.apache.spark.storage.StorageLevel物件中,如下所示。

 

object storageLevel extends scala.AnyRef with scala.Serializable {
    val NONE : org.apache.spark.storage.StorageLevel
    val DISK_ONLY : org.apache.spark.storage.StorageLevel
    val DISK_ONLY_2 : org.apache.spark.storage.StorageLevel
    val MEMORY_ONLY : org.apache.spark.storage.StorageLevel
    val MEMORY_ONLY_2 : org.apache.spark.storage.StorageLevel
    val MEMORY_ONLY_SER : org.apache.spark.storage.StorageLevel
    val MEMORY_ONLY_SER_2 : org.apache.spark.storage.StorageLevel
    val MEMORY_AND_DISK : org.apache.spark.storage.StorageLevel
    val MEMORY_AND_DISK_2 : org.apache.spark.storage.StorageLevel
    val MEMORY_AND_DISK_SER : org.apache.spark.storage.StorageLevel
    val MEMORY_AND_DISK_SER_2 : org.apache.spark.storage.StorageLevel
    val OFF_HEAP : org.apache.spark.storage.StorageLevel

 

Spark各快取級別及其描述:

 

  • MEMORY_ONLY:RDD僅快取一份到記憶體,此為預設級別

  • MEMORY_ONLY_2:將RDD分別快取在集群的兩個節點上,RDD在集群記憶體中儲存兩份

  • MEMORY_ONLY_SER:將RDD以Java序列化物件的方式快取到記憶體中,有效減少了RDD在記憶體中占用的空間,不過讀取時會消耗更多的CPU資源

  • DISK_ONLY:RDD僅快取一份到磁盤

  • MEMORY_AND_DISK:RDD僅快取一份到記憶體,當記憶體中空間不足時會將部分RDD分割槽快取到磁盤

  • MEMORY_AND_DISK_2:將RDD分別快取在集群的兩個節點上,當記憶體中空間不足時會將部分RDD分割槽快取到磁盤,RDD在集群記憶體中儲存兩份

  • MEMORY_AND_DISK_SER:將RDD以Java序列化物件的方式快取到記憶體中,當記憶體中空間不足時會將部分RDD分割槽快取到磁盤,有效減少了RDD在記憶體中占用的空間,不過讀取時會消耗更多的CPU資源

  • OFF_HEAP:將RDD以序列化的方式快取到JVM之外的儲存空間Tachyon中,與其他快取樣式相比,減少了JVM垃圾回收開銷。Spark執行程式失敗不會導致資料丟失,Spark與Tachyon已經能較好地兼容,使用起來方便穩定

 

(3)依賴關係

 

窄依賴(Narrow Dependency):父RDD的分割槽只對應一個子RDD的分割槽,如圖2-3所示,如果子RDD只有部分分割槽資料損壞或者丟失,只需要從對應的父RDD重新計算恢復。

 

圖2-3 窄依賴示意圖

 

寬依賴(Shuffle Dependency):子RDD分割槽依賴父RDD的所有分割槽,如圖2-4所示。如果子RDD部分分割槽甚至全部分割槽資料損壞或丟失,需要從所有父RDD重新計算,相對窄依賴而言付出的代價更高,所以應儘量避免寬依賴的使用。

 

圖2-4 寬依賴示意圖

 

Lineage:每個RDD都會記錄自己依賴的父RDD信息,一旦出現資料損壞或者丟失將從父RDD迅速重新恢復。

 

3. 運行樣式

 

Spark運行樣式主要有以下幾種:

 

  1. Local樣式:本地採用多執行緒的方式執行,主要用於開發測試。

  2. On Yarn樣式:Spark On Yarn有兩種樣式,分別為yarn-client和yarn-cluster樣式。yarn-client樣式中,Driver運行在客戶端,其作業運行日誌在客戶端查看,適合傳回小資料量結果集交互式場景使用。yarn-cluster樣式中,Driver運行在集群中的某個節點,節點的選擇由YARN調度,作業日誌通過yarn管理名稱查看:yarn logs -applicationId ,也可以在YARN的Web UI中查看,適合大資料量非交互式場景使用。

 

提交作業命令:

 

./bin/spark-submit --class package.MainClass \    # 作業執行主類,需要完成的包路徑
    --master spark://host:port, mesos://host:port, yarn, or local\Maste
                      # 運行方式
    ---deploy-mode client,cluster\ # 部署樣式,如果Master採用YARN樣式則可以選擇使用clent樣式或者cluster樣式,預設client樣式
    --driver-memory 1g \          # Driver運行記憶體,預設1G
    ---driver-cores 1 \          # Driver分配的CPU核個數
    --executor-memory 4g \       # Executor記憶體大小
    --executor-cores 1 \           # Executor分配的CPU核個數
    ---num-executors \           # 作業執行需要啟動的Executor數
    ---jars \               # 作業程式依賴的外部jar包,這些jar包會從本地上傳到Driver然後分發到各Executor classpath中。
    lib/spark-examples*.jar \      # 作業執行JAR包
[other application arguments ]       # 程式運行需要傳入的引數

 

作業在yarn-cluster樣式下的執行過程如圖2-5所示。

 

圖2-5 作業在yarn-cluster樣式下的執行過程

 

  1. Client在任何一臺能與Yarn通信的入口機向Yarn提交作業,提交的配置中可以設置申請的資源情況,如果沒有配置則將採用預設配置。

  2. ResourceManager接收到Client的作業請求後,首先檢查程式啟動的ApplicationMaster需要的資源情況,然後向資源調度器申請選取一個能夠滿足資源要求的NodeManager節點用於啟動ApplicationMaster行程,ApplicationMaster啟動成功之後立即在該節點啟動Driver行程。

  3. ApplicationMaster根據提交作業時設置的Executor相關配置引數或者預設配置引數與ResourceManager通信領取Executor資源信息,並與相關NodeManager通信啟動Executor行程。

  4. Executor啟動成功之後與Driver通信領取Driver分發的任務。

  5. Task執行,運行成功輸出結果。

 

 

02 Shuffle詳解

 

Shuffle最早出現於MapReduce框架中,負責連接Map階段的輸出與Reduce階段的輸入。Shuffle階段涉及磁盤IO、網絡傳輸、記憶體使用等多種資源的呼叫,所以Shuffle階段的執行效率影響整個作業的執行效率,大部分優化也都是針對Shuffle階段進行的。

Spark是實現了MapReduce原語的一種通用實時計算框架。Spark作業中Map階段的Shuffle稱為Shuffle Write,Reduce階段的Shuffle稱為Shuffle Read。

Shuffle Write階段會將Map Task中間結果資料寫入到本地磁盤,而在Shuffle Read階段中,Reduce Task從Shuffle Write階段拉取資料到記憶體中並行計算。Spark Shuffle階段的劃分方式如圖2-6所示。

 

圖2-6 Spark Shuffle階段的劃分方式

 

1. Shuffle Write實現方式

 

(1)基於Hash的實現(hash-based)

 

每個Map Task都會生成與Reduce Task資料相同的檔案數,對Key取Hash值分別寫入對應的檔案中,如圖2-7所示。

生成的檔案數FileNum=MapTaskNum×ReduceTaskNum,如果Map Task和Reduce Task數都比較多就會生成大量的小檔案,寫檔案過程中,每個檔案都要占用一部分緩衝區,總占用緩衝區大小TotalBufferSize=CoreNum×ReduceTaskNum×FileBufferSize,大量的小檔案就會占用更多的緩衝區,造成不必要的記憶體開銷,同時,大量的隨機寫操作會大大降低磁盤IO的性能。

 

圖2-7 基於Hash的實現方式

 

由於簡單的基於Hash的實現方式擴展性較差,記憶體資源利用率低,過多的小檔案在檔案拉取過程中增加了磁盤IO和網絡開銷,所以需要對基於Hash的實現方式進行進一步優化,為此引入了Consolidate(合併)機制。

如圖2-8所示,將同一個Core中執行的Task輸出結果寫入到相同的檔案中,生成的檔案數FileNum=CoreNum×ReduceTaskNum,這種優化方式減少了生成的檔案數目,提高了磁盤IO的吞吐量,但是檔案快取占用的空間並沒有減少,性能沒有得到明顯有效的提高。

 

圖2-8 優化後的基於Hash的實現方式

 

設置方式:

 

  • 代碼中設置:conf.get(“spark.shuffle.manager”, “hash”)

  • 配置檔案中設置:在conf/spark-default.conf配置檔案中添加spark.shuffle.managerhash

 

基於Hash的實現方式的優缺點:

 

  • 優點:實現簡單,小數量級資料處理操作方便。

  • 缺點:產生小檔案過多,記憶體利用率低,大量的隨機讀寫造成磁盤IO性能下降。

 

(2)基於Sort的實現方式(sort-based)

 

為瞭解決基於Hash的實現方式的諸多問題,Spark Shuffle引入了基於Sort的實現方式,如圖2-9所示。該方式中每個Map Task任務生成兩個檔案,一個是資料檔案,一個是索引檔案,生成的檔案數FileNum=MapTaskNum×2。

資料檔案中的資料按照Key分割槽在不同分割槽之間排序,同一分割槽中的資料不排序,索引檔案記錄了檔案中每個分割槽的偏移量和範圍。當Reduce Task讀取資料時,先讀取索引檔案找到對應的分割槽資料偏移量和範圍,然後從資料檔案讀取指定的資料。

 

設置方式:

 

  • 代碼中設置:conf.get(“spark.shuffle.manager”, “sort”)

  • 配置檔案中設置:在conf/spark-default.conf配置檔案中添加spark.shuffle.manager sort

 

圖2-9 基於Sort的實現方式

 

基於Sort的實現方式的優缺點:

 

  • 優點:順序讀寫能夠大幅提高磁盤IO性能,不會產生過多小檔案,降低檔案快取占用記憶體空間大小,提高記憶體使用率。

  • 缺點:多了一次粗粒度的排序。

 

2. Shuffle Read實現方式

 

Shuffle Read階段中Task通過直接讀取本地Shuffle Write階段產生的中間結果資料或者通過HTTP的方式從遠程Shuffle Write階段拉取中間結果資料進行處理。Shuffle Write階段基於Hash和基於Sort兩種實現方式產生的中間結果資料在Shuffle Read階段採用同一種實現方式。

 

  1. 獲取需要拉取的資料信息,根據資料本地性原則判斷採用哪種級別的拉取方式。

  2. 判斷是否需要在Map端聚合(reduceByKey會在Map端預聚合)。

  3. Shuffle Read階段Task拉取過來的資料如果涉及聚合或者排序,則會使用HashMap結構在記憶體中儲存,如果拉取過來的資料集在HashMap中已經存在相同的鍵則將資料聚合在一起。此時涉及一個比較重要的引數——spark.shuffle.spill,決定在記憶體被寫滿後是否將資料以檔案的形式寫入到磁盤,預設值為true,如果設置為false,則有可能會發生OOM記憶體上限溢位的風險,建議開啟。

  4. 排序聚合之後的資料以檔案形式寫入磁盤將產生大量的檔案內資料有序的小檔案,將這些小檔案重新加載到記憶體中,隨後採用歸併排序的方式合併為一個大的資料檔案。

 

關於作者:資深大資料研發工程師,有多年的大資料工作經驗,對高性能分佈式系統架構、大資料技術、資料分析等有深入的研究。

本文摘編自《企業大資料處理:Spark、Druid、Flume與Kafka應用實踐》,經出版方授權發佈。

延伸閱讀《企業大資料處理

點擊上圖瞭解及購買

轉載請聯繫微信:DoctorData

推薦語:資深大資料工程師,立足於企業真實場景,系統梳理和詳盡講解全棧大資料核心技術;為企業大資料技術選型和大資料平臺構建提供成熟的解決方案,包含大量實用案例。

已同步到看一看
赞(0)

分享創造快樂