Spark中如何使用Shuffle內(nèi)存

本篇文章為大家展示了Spark 中如何使用Shuffle 內(nèi)存,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

沛縣ssl適用于網(wǎng)站、小程序/APP、API接口等需要進行數(shù)據(jù)傳輸應(yīng)用場景,ssl證書未來市場廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書銷售渠道,可以享受市場價格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:13518219792(備注:SSL證書合作)期待與您的合作!

一、Spark 內(nèi)存管理和消費模型

在分析 Spark Shuffle 內(nèi)存使用之前。我們首先了解下以下問題:當一個 Spark 子任務(wù) (Task) 被分配到 Executor 上運行時,Spark 管理內(nèi)存以及消費內(nèi)存的大體模型是什么樣呢?(注:由于 OOM 主要發(fā)生在 Executor 端,所以接下來的討論主要針對 Executor 端的內(nèi)存管理和使用)。

1,在 Spark 中,使用抽象類 MemoryConsumer 來表示需要使用內(nèi)存的消費者。在這個類中定義了分配,釋放以及 Spill 內(nèi)存數(shù)據(jù)到磁盤的一些方法或者接口。具體的消費者可以繼承 MemoryConsumer 從而實現(xiàn)具體的行為。 因此,在 Spark Task 執(zhí)行過程中,會有各種類型不同,數(shù)量不一的具體消費者。如在 Spark Shuffle 中使用的 ExternalAppendOnlyMap, ExternalSorter 等等(具體后面會分析)。

2,MemoryConsumer 會將申請,釋放相關(guān)內(nèi)存的工作交由 TaskMemoryManager 來執(zhí)行。當一個 Spark Task 被分配到 Executor 上運行時,會創(chuàng)建一個 TaskMemoryManager。在 TaskMemoryManager 執(zhí)行分配內(nèi)存之前,需要首先向 MemoryManager 進行申請,然后由 TaskMemoryManager 借助 MemoryAllocator 執(zhí)行實際的內(nèi)存分配。 

3,Executor 中的 MemoryManager 會統(tǒng)一管理內(nèi)存的使用。由于每個 TaskMemoryManager 在執(zhí)行實際的內(nèi)存分配之前,會首先向 MemoryManager 提出申請。因此 MemoryManager 會對當前進程使用內(nèi)存的情況有著全局的了解。

MemoryManager,TaskMemoryManager 和 MemoryConsumer 之前的對應(yīng)關(guān)系,如下圖??傮w上,一個 MemoryManager 對應(yīng)著至少一個 TaskMemoryManager (具體由 executor-core 參數(shù)指定),而一個 TaskMemoryManager 對應(yīng)著多個 MemoryConsumer (具體由任務(wù)而定)。

Spark 中如何使用Shuffle 內(nèi)存

了解了以上內(nèi)存消費的整體過程以后,有兩個問題需要注意下:

1,當有多個 Task 同時在 Executor 上執(zhí)行時, 將會有多個 TaskMemoryManager 共享 MemoryManager 管理的內(nèi)存。那么 MemoryManager 是怎么分配的呢?答案是每個任務(wù)可以分配到的內(nèi)存范圍是 [1 / (2 * n), 1 / n],其中 n 是正在運行的 Task 個數(shù)。因此,多個并發(fā)運行的 Task 會使得每個 Task 可以獲得的內(nèi)存變小。

2,前面提到,在 MemoryConsumer 中有 Spill 方法,當 MemoryConsumer 申請不到足夠的內(nèi)存時,可以 Spill 當前內(nèi)存到磁盤,從而避免無節(jié)制的使用內(nèi)存。但是,對于堆內(nèi)內(nèi)存的申請和釋放實際是由 JVM 來管理的。因此,在統(tǒng)計堆內(nèi)內(nèi)存具體使用量時,考慮性能等各方面原因,Spark 目前采用的是抽樣統(tǒng)計的方式來計算 MemoryConsumer 已經(jīng)使用的內(nèi)存,從而造成堆內(nèi)內(nèi)存的實際使用量不是特別準確。從而有可能因為不能及時 Spill 而導致 OOM。

二、Spark Shuffle 過程

整體上 Spark Shuffle 具體過程如下圖,主要分為兩個階段:Shuffle Write 和 Shuffle Read。

Write 階段大體經(jīng)歷排序(最低要求是需要按照分區(qū)進行排序),可能的聚合 (combine) 和歸并(有多個文件 spill 磁盤的情況 ),最終每個寫 Task 會產(chǎn)生數(shù)據(jù)和索引兩個文件。其中,數(shù)據(jù)文件會按照分區(qū)進行存儲,即相同分區(qū)的數(shù)據(jù)在文件中是連續(xù)的,而索引文件記錄了每個分區(qū)在文件中的起始和結(jié)束位置。

而對于 Shuffle Read, 首先可能需要通過網(wǎng)絡(luò)從各個 Write 任務(wù)節(jié)點獲取給定分區(qū)的數(shù)據(jù),即數(shù)據(jù)文件中某一段連續(xù)的區(qū)域,然后經(jīng)過排序,歸并等過程,最終形成計算結(jié)果。

Spark 中如何使用Shuffle 內(nèi)存

對于 Shuffle Write,Spark 當前有三種實現(xiàn),具體分別為 BypassMergeSortShuffleWriter, UnsafeShuffleWriter 和 SortShuffleWriter (具體使用哪一個實現(xiàn)有一個判斷條件,此處不表)。而 Shuffle Read 只有一種實現(xiàn)。

2.1 Shuffle Write 階段分析

2.1.1 BypassMergeSortShuffleWriter 分析

對于 BypassMergeSortShuffleWriter 的實現(xiàn),大體實現(xiàn)過程是首先為每個分區(qū)創(chuàng)建一個臨時分區(qū)文件,數(shù)據(jù)寫入對應(yīng)的分區(qū)文件,最終所有的分區(qū)文件合并成一個數(shù)據(jù)文件,并且產(chǎn)生一個索引文件。由于這個過程不做排序,combine(如果需要 combine 不會使用這個實現(xiàn))等操作,因此對于 BypassMergeSortShuffleWriter,總體來說是不怎么耗費內(nèi)存的。

2.1.2 SortShuffleWriter 分析

SortShuffleWriter 是最一般的實現(xiàn),也是日常使用最頻繁的。SortShuffleWriter 主要委托 ExternalSorter 做數(shù)據(jù)插入,排序,歸并 (Merge),聚合 (Combine) 以及最終寫數(shù)據(jù)和索引文件的工作。ExternalSorter 實現(xiàn)了之前提到的 MemoryConsumer 接口。下面分析一下各個過程使用內(nèi)存的情況:

1,對于數(shù)據(jù)寫入,根據(jù)是否需要做 Combine,數(shù)據(jù)會被插入到 PartitionedAppendOnlyMap 這個 Map 或者 PartitionedPairBuffer 這個數(shù)組中。每隔一段時間,當向 MemoryManager 申請不到足夠的內(nèi)存時,或者數(shù)據(jù)量超過 spark.shuffle.spill.numElementsForceSpillThreshold 這個閾值時 (默認是 Long 的最大值,不起作用),就會進行 Spill 內(nèi)存數(shù)據(jù)到文件。假設(shè)可以源源不斷的申請到內(nèi)存,那么 Write 階段的所有數(shù)據(jù)將一直保存在內(nèi)存中,由此可見,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比較吃內(nèi)存的。 

2,無論是 PartitionedAppendOnlyMap 還是 PartitionedPairBuffer, 使用的排序算法是 TimSort。在使用該算法是正常情況下使用的臨時額外空間是很小,但是最壞情況下是 n / 2,其中 n 表示待排序的數(shù)組長度(具體見 TimSort 實現(xiàn))。

3,當插入數(shù)據(jù)因為申請不到足夠的內(nèi)存將會 Spill 數(shù)據(jù)到磁盤,在將最終排序結(jié)果寫入到數(shù)據(jù)文件之前,需要將內(nèi)存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已經(jīng) spill 到磁盤的 SpillFiles 進行合并。Merge 的大體過程如下圖。

Spark 中如何使用Shuffle 內(nèi)存

從上圖可見,大體差不多就是歸并排序的過程,由此可見這個過程是沒有太多額外的內(nèi)存消耗。歸并過程中的聚合計算大體也是差不多的過程,唯一需要注意的是鍵值碰撞的情況,即當前輸入的各個有序隊列的鍵值的哈希值相同,但是實際的鍵值不等的情況。這種情況下,需要額外的空間保存所有鍵值不同,但哈希值相同值的中間結(jié)果。但是總體上來說,發(fā)生這種情況的概率并不是特別大。

4,寫數(shù)據(jù)文件的過程涉及到不同數(shù)據(jù)流之間的轉(zhuǎn)化,而在流的寫入過程中,一般都有緩存,主要由參數(shù) spark.shuffle.file.buffer 和 spark.shuffle.spill.batchSize 控制,總體上這部分開銷也不大。

以上分析了 SortShuffleWriter write 階段的主要過程,從中可以看出主要的內(nèi)存消耗在寫入 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 這個階段。

2.1.3 UnsafeShuffleWriter

UnsafeShuffleWriter 是對 SortShuffleWriter 的優(yōu)化,大體上也和 SortShuffleWriter 差不多,在此不再贅述。從內(nèi)存使用角度看,主要差異在以下兩點:

一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存儲的是鍵值或者值的具體類型,也就是 Java 對象,是反序列化過后的數(shù)據(jù)。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中數(shù)據(jù)是序列化以后存儲到實際的 Page 中,而且在寫入數(shù)據(jù)過程中會額外寫入長度信息。總體而言,序列化以后數(shù)據(jù)大小是遠遠小于序列化之前的數(shù)據(jù)。

另一方面,UnsafeShuffleWriter 中需要額外的存儲記錄(LongArray),它保存著分區(qū)信息和實際指向序列化后數(shù)據(jù)的指針(經(jīng)過編碼的Page num 以及 Offset)。相對于 SortShuffleWriter, UnsafeShuffleWriter 中這部分存儲的開銷是額外的。

2.2 Shuffle Read 階段分析

Spark Shuffle Read 主要經(jīng)歷從獲取數(shù)據(jù),序列化流,添加指標統(tǒng)計,可能的聚合 (Aggregation) 計算以及排序等過程。大體流程如下圖。

Spark 中如何使用Shuffle 內(nèi)存

以上計算主要都是迭代進行。在以上步驟中,比較復雜的操作是從遠程獲取數(shù)據(jù),聚合和排序操作。接下來,依次分析這三個步驟內(nèi)存的使用情況。

1,數(shù)據(jù)獲取分為遠程獲取和本地獲取。本地獲取將直接從本地的 BlockManager 取數(shù)據(jù), 而對于遠程數(shù)據(jù),需要走網(wǎng)絡(luò)。在遠程獲取過程中,有相關(guān)參數(shù)可以控制從遠程并發(fā)獲取數(shù)據(jù)的大小,正在獲取數(shù)據(jù)的請求數(shù),以及單次數(shù)據(jù)塊請求是否放到內(nèi)存等參數(shù)。具體參數(shù)包括 spark.reducer.maxSizeInFlight (默認 48M),spark.reducer.maxReqsInFlight, spark.reducer.maxBlocksInFlightPerAddress 和 spark.maxRemoteBlockSizeFetchToMem。

考慮到數(shù)據(jù)傾斜的場景,如果 Map 階段有一個 Block 數(shù)據(jù)特別的大,默認情況由于 spark.maxRemoteBlockSizeFetchToMem 沒有做限制,所以在這個階段需要將需要獲取的整個 Block 數(shù)據(jù)放到 Reduce 端的內(nèi)存中,這個時候是非常的耗內(nèi)存的??梢栽O(shè)置 spark.maxRemoteBlockSizeFetchToMem 值,如果超過該閾值,可以落盤,避免這種情況的 OOM。 另外,在獲取到數(shù)據(jù)以后,默認情況下會對獲取的數(shù)據(jù)進行校驗(參數(shù) spark.shuffle.detectCorrupt 控制),這個過程也增加了一定的內(nèi)存消耗。

2,對于需要聚合和排序的情況,這個過程是借助 ExternalAppendOnlyMap 來實現(xiàn)的。整個插入,Spill 以及 Merge 的過程和 Write 階段差不多。總體上,這塊也是比較消耗內(nèi)存的,但是因為有 Spill 操作,當內(nèi)存不足時,可以將內(nèi)存數(shù)據(jù)刷到磁盤,從而釋放內(nèi)存空間。

三、Spark Shuffle OOM 可能性分析

圍繞內(nèi)存使用,前面比較詳細的分析了 Spark 內(nèi)存管理以及在 Shuffle 過程可能使用較多內(nèi)存的地方。接下來總結(jié)的要點如下:

1,首先需要注意 Executor 端的任務(wù)并發(fā)度,多個同時運行的 Task 會共享 Executor 端的內(nèi)存,使得單個 Task 可使用的內(nèi)存減少。

2,無論是在 Map 還是在 Reduce 端,插入數(shù)據(jù)到內(nèi)存,排序,歸并都是比較都是比較占用內(nèi)存的。因為有 Spill,理論上不會因為數(shù)據(jù)傾斜造成 OOM。 但是,由于對堆內(nèi)對象的分配和釋放是由 JVM 管理的,而 Spark 是通過采樣獲取已經(jīng)使用的內(nèi)存情況,有可能因為采樣不準確而不能及時 Spill,導致OOM。

3,在 Reduce 獲取數(shù)據(jù)時,由于數(shù)據(jù)傾斜,有可能造成單個 Block 的數(shù)據(jù)非常的大,默認情況下是需要有足夠的內(nèi)存來保存單個 Block 的數(shù)據(jù)。因此,此時極有可能因為數(shù)據(jù)傾斜造成 OOM。 可以設(shè)置 spark.maxRemoteBlockSizeFetchToMem 參數(shù),設(shè)置這個參數(shù)以后,超過一定的閾值,會自動將數(shù)據(jù) Spill 到磁盤,此時便可以避免因為數(shù)據(jù)傾斜造成 OOM 的情況。在我們的生產(chǎn)環(huán)境中也驗證了這點,在設(shè)置這個參數(shù)到合理的閾值后,生產(chǎn)環(huán)境任務(wù) OOM 的情況大大減少了。

4,在 Reduce 獲取數(shù)據(jù)后,默認情況會對數(shù)據(jù)流進行解壓校驗(參數(shù) spark.shuffle.detectCorrupt)。正如在代碼注釋中提到,由于這部分沒有 Spill 到磁盤操作,也有很大的可性能會導致 OOM。在我們的生產(chǎn)環(huán)境中也有碰到因為檢驗導致 OOM 的情況。

上述內(nèi)容就是Spark 中如何使用Shuffle 內(nèi)存,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站題目:Spark中如何使用Shuffle內(nèi)存
文章URL:http://bm7419.com/article14/jcssde.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供、靜態(tài)網(wǎng)站云服務(wù)器、服務(wù)器托管、響應(yīng)式網(wǎng)站、企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化