Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些

本篇內(nèi)容主要講解“Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些”吧!

創(chuàng)新互聯(lián)建站長期為數(shù)千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為澤普企業(yè)提供專業(yè)的做網(wǎng)站、成都做網(wǎng)站,澤普網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

#開發(fā)調(diào)優(yōu)
Spark性能優(yōu)化的第一步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),就是要讓大家了解以下一些Spark基本開發(fā)原則,包括:RDD lineage設(shè)計(jì)、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時(shí)時(shí)刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場(chǎng)景,靈活地運(yùn)用到自己的Spark作業(yè)中。
##原則一:避免創(chuàng)建重復(fù)的RDD
通常來說,我們?cè)陂_發(fā)一個(gè)Spark作業(yè)時(shí),首先是基于某個(gè)數(shù)據(jù)源(比如Hive表或HDFS文件)創(chuàng)建一個(gè)初始的RDD;接著對(duì)這個(gè)RDD執(zhí)行某個(gè)算子操作,然后得到下一個(gè)RDD;以此類推,循環(huán)往復(fù),直到計(jì)算出最終我們需要的結(jié)果。在這個(gè)過程中,多個(gè)RDD會(huì)通過不同的算子操作(比如map、reduce等)串起來,這個(gè)“RDD串”,就是RDD lineage,也就是“RDD的血緣關(guān)系鏈”。

我們?cè)陂_發(fā)過程中要注意:對(duì)于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD,不能創(chuàng)建多個(gè)RDD來代表同一份數(shù)據(jù)。

一些Spark初學(xué)者在剛開始開發(fā)Spark作業(yè)時(shí),或者是有經(jīng)驗(yàn)的工程師在開發(fā)RDD lineage極其冗長的Spark作業(yè)時(shí),可能會(huì)忘了自己之前對(duì)于某一份數(shù)據(jù)已經(jīng)創(chuàng)建過一個(gè)RDD了,從而導(dǎo)致對(duì)于同一份數(shù)據(jù),創(chuàng)建了多個(gè)RDD。這就意味著,我們的Spark作業(yè)會(huì)進(jìn)行多次重復(fù)計(jì)算來創(chuàng)建多個(gè)代表相同數(shù)據(jù)的RDD,進(jìn)而增加了作業(yè)的性能開銷。

// 需要對(duì)名為“hello.txt”的HDFS文件進(jìn)行一次map操作,再進(jìn)行一次reduce操作。也就是說,需要對(duì)一份數(shù)據(jù)執(zhí)行兩次算子操作。

// 錯(cuò)誤的做法:對(duì)于同一份數(shù)據(jù)執(zhí)行多次算子操作時(shí),創(chuàng)建多個(gè)RDD。
// 這里執(zhí)行了兩次textFile方法,針對(duì)同一個(gè)HDFS文件,創(chuàng)建了兩個(gè)RDD出來,然后分別對(duì)每個(gè)RDD都執(zhí)行了一個(gè)算子操作。
// 這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個(gè)單獨(dú)的RDD;第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷,很明顯是白白浪費(fèi)掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正確的用法:對(duì)于一份數(shù)據(jù)執(zhí)行多次算子操作時(shí),只使用一個(gè)RDD。
// 這種寫法很明顯比上一種寫法要好多了,因?yàn)槲覀儗?duì)于同一份數(shù)據(jù)只創(chuàng)建了一個(gè)RDD,然后對(duì)這一個(gè)RDD執(zhí)行了多次算子操作。
// 但是要注意到這里為止優(yōu)化還沒有結(jié)束,由于rdd1被執(zhí)行了兩次算子操作,第二次執(zhí)行reduce操作的時(shí)候,還會(huì)再次從源頭處重新計(jì)算一次rdd1的數(shù)據(jù),因此還是會(huì)有重復(fù)計(jì)算的性能開銷。
// 要徹底解決這個(gè)問題,必須結(jié)合“原則三:對(duì)多次使用的RDD進(jìn)行持久化”,才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

##原則二:盡可能復(fù)用同一個(gè)RDD
除了要避免在開發(fā)過程中對(duì)一份完全相同的數(shù)據(jù)創(chuàng)建多個(gè)RDD之外,在對(duì)不同的數(shù)據(jù)執(zhí)行算子操作時(shí)還要盡可能地復(fù)用一個(gè)RDD。比如說,有一個(gè)RDD的數(shù)據(jù)格式是key-value類型的,另一個(gè)是單value類型的,這兩個(gè)RDD的value數(shù)據(jù)是完全一樣的。那么此時(shí)我們可以只使用key-value類型的那個(gè)RDD,因?yàn)槠渲幸呀?jīng)包含了另一個(gè)的數(shù)據(jù)。對(duì)于類似這種多個(gè)RDD的數(shù)據(jù)有重疊或者包含的情況,我們應(yīng)該盡量復(fù)用一個(gè)RDD,這樣可以盡可能地減少RDD的數(shù)量,從而盡可能減少算子執(zhí)行的次數(shù)。

// 錯(cuò)誤的做法。

// 有一個(gè)<Long, String>格式的RDD,即rdd1。
// 接著由于業(yè)務(wù)需要,對(duì)rdd1執(zhí)行了一個(gè)map操作,創(chuàng)建了一個(gè)rdd2,而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分別對(duì)rdd1和rdd2執(zhí)行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正確的做法。

// 上面這個(gè)case中,其實(shí)rdd1和rdd2的區(qū)別無非就是數(shù)據(jù)格式不同而已,rdd2的數(shù)據(jù)完全就是rdd1的子集而已,卻創(chuàng)建了兩個(gè)rdd,并對(duì)兩個(gè)rdd都執(zhí)行了一次算子操作。
// 此時(shí)會(huì)因?yàn)閷?duì)rdd1執(zhí)行map算子來創(chuàng)建rdd2,而多執(zhí)行一次算子操作,進(jìn)而增加性能開銷。

// 其實(shí)在這種情況下完全可以復(fù)用同一個(gè)RDD。
// 我們可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在進(jìn)行第二個(gè)map操作時(shí),只使用每個(gè)數(shù)據(jù)的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二種方式相較于第一種方式而言,很明顯減少了一次rdd2的計(jì)算開銷。
// 但是到這里為止,優(yōu)化還沒有結(jié)束,對(duì)rdd1我們還是執(zhí)行了兩次算子操作,rdd1實(shí)際上還是會(huì)被計(jì)算兩次。
// 因此還需要配合“原則三:對(duì)多次使用的RDD進(jìn)行持久化”進(jìn)行使用,才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次。

##原則三:對(duì)多次使用的RDD進(jìn)行持久化
當(dāng)你在Spark代碼中多次對(duì)一個(gè)RDD做了算子操作后,恭喜,你已經(jīng)實(shí)現(xiàn)Spark作業(yè)第一步的優(yōu)化了,也就是盡可能復(fù)用RDD。此時(shí)就該在這個(gè)基礎(chǔ)之上,進(jìn)行第二步優(yōu)化了,也就是要保證對(duì)一個(gè)RDD執(zhí)行多次算子操作時(shí),這個(gè)RDD本身僅僅被計(jì)算一次。

Spark中對(duì)于一個(gè)RDD執(zhí)行多次算子的默認(rèn)原理是這樣的:每次你對(duì)一個(gè)RDD執(zhí)行一個(gè)算子操作時(shí),都會(huì)重新從源頭處計(jì)算一遍,計(jì)算出那個(gè)RDD來,然后再對(duì)這個(gè)RDD執(zhí)行你的算子操作。這種方式的性能是很差的。

因此對(duì)于這種情況,我們的建議是:對(duì)多次使用的RDD進(jìn)行持久化。此時(shí)Spark就會(huì)根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中。以后每次對(duì)這個(gè)RDD進(jìn)行算子操作時(shí),都會(huì)直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子,而不會(huì)從源頭處重新計(jì)算一遍這個(gè)RDD,再執(zhí)行算子操作。

// 如果要對(duì)一個(gè)RDD進(jìn)行持久化,只要對(duì)這個(gè)RDD調(diào)用cache()和persist()即可。

// 正確的做法。
// cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中。
// 此時(shí)再對(duì)rdd1執(zhí)行兩次算子操作時(shí),只有在第一次執(zhí)行map算子時(shí),才會(huì)將這個(gè)rdd1從源頭處計(jì)算一次。
// 第二次執(zhí)行reduce算子時(shí),就會(huì)直接從內(nèi)存中提取數(shù)據(jù)進(jìn)行計(jì)算,不會(huì)重復(fù)計(jì)算一個(gè)rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手動(dòng)選擇持久化級(jí)別,并使用指定的方式進(jìn)行持久化。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,內(nèi)存充足時(shí)優(yōu)先持久化到內(nèi)存中,內(nèi)存不充足時(shí)持久化到磁盤文件中。
// 而且其中的_SER后綴表示,使用序列化的方式來保存RDD數(shù)據(jù),此時(shí)RDD中的每個(gè)partition都會(huì)序列化成一個(gè)大的字節(jié)數(shù)組,然后再持久化到內(nèi)存或磁盤中。
// 序列化的方式可以減少持久化的數(shù)據(jù)對(duì)內(nèi)存/磁盤的占用量,進(jìn)而避免內(nèi)存被持久化數(shù)據(jù)占用過多,從而發(fā)生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

對(duì)于persist()方法而言,我們可以根據(jù)不同的業(yè)務(wù)場(chǎng)景選擇不同的持久化級(jí)別。
##原則四:盡量避免使用shuffle類算子
如果有可能的話,要盡量避免使用shuffle類算子。因?yàn)镾park作業(yè)運(yùn)行過程中,最消耗性能的地方就是shuffle過程。shuffle過程,簡(jiǎn)單來說,就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合或join等操作。比如reduceByKey、join等算子,都會(huì)觸發(fā)shuffle操作。

shuffle過程中,各個(gè)節(jié)點(diǎn)上的相同key都會(huì)先寫入本地磁盤文件中,然后其他節(jié)點(diǎn)需要通過網(wǎng)絡(luò)傳輸拉取各個(gè)節(jié)點(diǎn)上的磁盤文件中的相同key。而且相同key都拉取到同一個(gè)節(jié)點(diǎn)進(jìn)行聚合操作時(shí),還有可能會(huì)因?yàn)橐粋€(gè)節(jié)點(diǎn)上處理的key過多,導(dǎo)致內(nèi)存不夠存放,進(jìn)而溢寫到磁盤文件中。因此在shuffle過程中,可能會(huì)發(fā)生大量的磁盤文件讀寫的IO操作,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因。

因此在我們的開發(fā)過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會(huì)進(jìn)行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷。
Broadcast與map進(jìn)行join代碼示例

// 傳統(tǒng)的join操作會(huì)導(dǎo)致shuffle操作。
// 因?yàn)閮蓚€(gè)RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個(gè)節(jié)點(diǎn)上,由一個(gè)task進(jìn)行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不會(huì)導(dǎo)致shuffle操作。
// 使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)。
// 然后進(jìn)行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,那么就判定可以進(jìn)行join。
// 此時(shí)就可以根據(jù)自己需要的方式,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M(fèi),或者一兩G)的情況下使用。
// 因?yàn)槊總€(gè)Executor的內(nèi)存中,都會(huì)駐留一份rdd2的全量數(shù)據(jù)。

##原則五:使用map-side預(yù)聚合的shuffle操作
  如果因?yàn)闃I(yè)務(wù)需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預(yù)聚合的算子。

  所謂的map-side預(yù)聚合,說的是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作,類似于MapReduce中的本地combiner。map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會(huì)有一條相同的key,因?yàn)槎鄺l相同的key都被聚合起來了。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者  aggregateByKey算子來替代掉groupByKey算子。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的,全量的數(shù)據(jù)會(huì)在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸,性能相對(duì)來說比較差。

  比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進(jìn)行單詞計(jì)數(shù)。其中第一張圖是groupByKey的原理圖,可以看到,沒有進(jìn)行任何本地聚合時(shí),所有數(shù)據(jù)都會(huì)在集群節(jié)點(diǎn)之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個(gè)節(jié)點(diǎn)本地的相同key數(shù)據(jù),都進(jìn)行了預(yù)聚合,然后才傳輸?shù)狡渌?jié)點(diǎn)上進(jìn)行全局聚合。
Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些
Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些
##原則六:使用高性能的算子
除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則。
使用reduceByKey/aggregateByKey替代groupByKey
詳情見“原則五:使用map-side預(yù)聚合的shuffle操作”。 使用mapPartitions替代普通map
mapPartitions類的算子,一次函數(shù)調(diào)用會(huì)處理一個(gè)partition所有的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對(duì)來說會(huì)高一些。但是有的時(shí)候,使用mapPartitions會(huì)出現(xiàn)OOM(內(nèi)存溢出)的問題。因?yàn)閱未魏瘮?shù)調(diào)用就要處理掉一個(gè)partition所有的數(shù)據(jù),如果內(nèi)存不夠,垃圾回收時(shí)是無法回收掉太多對(duì)象的,很可能出現(xiàn)OOM異常。所以使用這類操作時(shí)要慎重!
使用foreachPartitions替代foreach
原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個(gè)partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。在實(shí)踐中發(fā)現(xiàn),foreachPartitions類的算子,對(duì)性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL,那么如果是普通的foreach算子,就會(huì)一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會(huì)創(chuàng)建一個(gè)數(shù)據(jù)庫連接,此時(shí)就勢(shì)必會(huì)頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個(gè)partition的數(shù)據(jù),那么對(duì)于每個(gè)partition,只要?jiǎng)?chuàng)建一個(gè)數(shù)據(jù)庫連接即可,然后執(zhí)行批量插入操作,此時(shí)性能是比較高的。實(shí)踐中發(fā)現(xiàn),對(duì)于1萬條左右的數(shù)據(jù)量寫MySQL,性能可以提升30%以上。
使用filter之后進(jìn)行coalesce操作
通常對(duì)一個(gè)RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù)),建議使用coalesce算子,手動(dòng)減少RDD的partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。因?yàn)閒ilter之后,RDD的每個(gè)partition中都會(huì)有很多數(shù)據(jù)被過濾掉,此時(shí)如果照常進(jìn)行后續(xù)的計(jì)算,其實(shí)每個(gè)task處理的partition中的數(shù)據(jù)量并不是很多,有一點(diǎn)資源浪費(fèi),而且此時(shí)處理的task越多,可能速度反而越慢。因此用coalesce減少partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場(chǎng)景下,對(duì)于性能的提升會(huì)有一定的幫助。
使用repartitionAndSortWithinPartitions替代repartition與sort類操作repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個(gè)算子,官方建議,如果需要在repartition重分區(qū)之后,還要進(jìn)行排序,建議直接使用repartitionAndSortWithinPartitions算子。因?yàn)樵撍阕涌梢砸贿呥M(jìn)行重分區(qū)的shuffle操作,一邊進(jìn)行排序。shuffle與sort兩個(gè)操作同時(shí)進(jìn)行,比先shuffle再sort來說,性能可能是要高的。
##原則七:廣播大變量   有時(shí)在開發(fā)過程中,會(huì)遇到需要在算子函數(shù)中使用外部變量的場(chǎng)景(尤其是大變量,比如100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能。

  在算子函數(shù)中使用到外部變量時(shí),默認(rèn)情況下,Spark會(huì)將該變量復(fù)制多個(gè)副本,通過網(wǎng)絡(luò)傳輸?shù)絫ask中,此時(shí)每個(gè)task都有一個(gè)變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個(gè)節(jié)點(diǎn)的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC,都會(huì)極大地影響性能。

  因此對(duì)于上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對(duì)該變量進(jìn)行廣播。廣播后的變量,會(huì)保證每個(gè)Executor的內(nèi)存中,只駐留一份變量副本,而Executor中的task執(zhí)行時(shí)共享該Executor中的那份變量副本。這樣的話,可以大大減少變量副本的數(shù)量,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對(duì)Executor內(nèi)存的占用開銷,降低GC的頻率。

// 以下代碼在算子函數(shù)中,使用了外部的變量。
// 此時(shí)沒有做任何特殊操作,每個(gè)task都會(huì)有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代碼將list1封裝成了Broadcast類型的廣播變量。
// 在算子函數(shù)中,使用廣播變量時(shí),首先會(huì)判斷當(dāng)前task所在Executor內(nèi)存中,是否有變量副本。
// 如果有則直接使用;如果沒有則從Driver或者其他Executor節(jié)點(diǎn)上遠(yuǎn)程拉取一份放到本地Executor內(nèi)存中。
// 每個(gè)Executor內(nèi)存中,就只會(huì)駐留一份廣播變量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

##原則八:使用Kryo優(yōu)化序列化性能
在Spark中,主要有三個(gè)地方涉及到了序列化:

  1. 在算子函數(shù)中使用到外部變量時(shí),該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見“原則七:廣播大變量”中的講解)

  2. 將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,Student是自定義類型),所有自定義類型對(duì)象,都會(huì)進(jìn)行序列化。因此這種情況下,也要求自定義的類必須實(shí)現(xiàn)Serializable接口

  3. 使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組。

  對(duì)于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫,來優(yōu)化序列化和反序列化的性能。Spark默認(rèn)使用的是Java的序列化機(jī)制,也就是ObjectOutputStream/ObjectInputStream API來進(jìn)行序列化和反序列化。但是Spark同時(shí)支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制,性能高10倍左右。Spark之所以默認(rèn)沒有使用Kryo作為序列化類庫,是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類型,因此對(duì)于開發(fā)者來說,這種方式比較麻煩.

  以下是使用Kryo的代碼示例,我們只要設(shè)置序列化類,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):

// 創(chuàng)建SparkConf對(duì)象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

##原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)
Java中,有三種類型比較耗費(fèi)內(nèi)存:

  1. 對(duì)象,每個(gè)Java對(duì)象都有對(duì)象頭、引用等額外的信息,因此比較占用內(nèi)存空間

  2. 字符串,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長度等額外信息

  3. 集合類型,比如HashMap、LinkedList等,因?yàn)榧项愋蛢?nèi)部通常會(huì)使用一些內(nèi)部類來封裝集合元素,比如Map.Entry

  因此Spark官方建議,在Spark編碼實(shí)現(xiàn)中,特別是對(duì)于算子函數(shù)中的代碼,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對(duì)象,使用原始類型(比如Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率,提升性能。

  但是在筆者的編碼實(shí)踐中發(fā)現(xiàn),要做到該原則其實(shí)并不容易。因?yàn)槲覀兺瑫r(shí)要考慮到代碼的可維護(hù)性,如果一個(gè)代碼中,完全沒有任何對(duì)象抽象,全部是字符串拼接的方式,那么對(duì)于后續(xù)的代碼維護(hù)和修改,無疑是一場(chǎng)巨大的災(zāi)難。同理,如果所有操作都基于數(shù)組實(shí)現(xiàn),而不使用HashMap、LinkedList等集合類型,那么對(duì)于我們的編碼難度以及代碼可維護(hù)性,也是一個(gè)極大的挑戰(zhàn)。因此筆者建議,在可能以及合適的情況下,使用占用內(nèi)存較少的數(shù)據(jù)結(jié)構(gòu),但是前提是要保證代碼的可維護(hù)性。 ##資源調(diào)優(yōu)
在開發(fā)完Spark作業(yè)之后,就該為作業(yè)配置合適的資源了。Spark的資源參數(shù),基本都可以在spark-submit命令中作為參數(shù)設(shè)置。很多Spark初學(xué)者,通常不知道該設(shè)置哪些必要的參數(shù),以及如何設(shè)置這些參數(shù),最后就只能胡亂設(shè)置,甚至壓根兒不設(shè)置。資源參數(shù)設(shè)置的不合理,可能會(huì)導(dǎo)致沒有充分利用集群資源,作業(yè)運(yùn)行會(huì)極其緩慢;或者設(shè)置的資源過大,隊(duì)列沒有足夠的資源來提供,進(jìn)而導(dǎo)致各種異常??傊瑹o論是哪種情況,都會(huì)導(dǎo)致Spark作業(yè)的運(yùn)行效率低下,甚至根本無法運(yùn)行。因此我們必須對(duì)Spark作業(yè)的資源使用原理有一個(gè)清晰的認(rèn)識(shí),并知道在Spark作業(yè)運(yùn)行過程中,有哪些資源參數(shù)是可以設(shè)置的,以及如何設(shè)置合適的參數(shù)值。
** Spark作業(yè)基本運(yùn)行原理**
Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些
詳細(xì)原理見上圖。我們使用spark-submit提交一個(gè)Spark作業(yè)之后,這個(gè)作業(yè)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程。根據(jù)你使用的部署模式(deploy-mode)不同,Driver進(jìn)程可能在本地啟動(dòng),也可能在集群中某個(gè)工作節(jié)點(diǎn)上啟動(dòng)。Driver進(jìn)程本身會(huì)根據(jù)我們?cè)O(shè)置的參數(shù),占有一定數(shù)量的內(nèi)存和CPU core。而Driver進(jìn)程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,美團(tuán)?大眾點(diǎn)評(píng)使用的是YARN作為資源管理集群)申請(qǐng)運(yùn)行Spark作業(yè)需要使用的資源,這里的資源指的就是Executor進(jìn)程。YARN集群管理器會(huì)根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù),在各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)一定數(shù)量的Executor進(jìn)程,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core。

在申請(qǐng)到了作業(yè)執(zhí)行所需的資源之后,Driver進(jìn)程就會(huì)開始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了。Driver進(jìn)程會(huì)將我們編寫的Spark作業(yè)代碼分拆為多個(gè)stage,每個(gè)stage執(zhí)行一部分代碼片段,并為每個(gè)stage創(chuàng)建一批task,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行。task是最小的計(jì)算單元,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段),只是每個(gè)task處理的數(shù)據(jù)不同而已。一個(gè)stage的所有task都執(zhí)行完畢之后,會(huì)在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果,然后Driver就會(huì)調(diào)度運(yùn)行下一個(gè)stage。下一個(gè)stage的task的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果。如此循環(huán)往復(fù),直到將我們自己編寫的代碼邏輯全部執(zhí)行完,并且計(jì)算完所有的數(shù)據(jù),得到我們想要的結(jié)果為止。

Spark是根據(jù)shuffle類算子來進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個(gè)shuffle類算子(比如reduceByKey、join等),那么就會(huì)在該算子處,劃分出一個(gè)stage界限來。可以大致理解為,shuffle算子執(zhí)行之前的代碼會(huì)被劃分為一個(gè)stage,shuffle算子執(zhí)行以及之后的代碼會(huì)被劃分為下一個(gè)stage。因此一個(gè)stage剛開始執(zhí)行的時(shí)候,它的每個(gè)task可能都會(huì)從上一個(gè)stage的task所在的節(jié)點(diǎn),去通過網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key,然后對(duì)拉取到的所有相同的key使用我們自己編寫的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))。這個(gè)過程就是shuffle。

當(dāng)我們?cè)诖a中執(zhí)行了cache/persist等持久化操作時(shí),根據(jù)我們選擇的持久化級(jí)別的不同,每個(gè)task計(jì)算出來的數(shù)據(jù)也會(huì)保存到Executor進(jìn)程的內(nèi)存或者所在節(jié)點(diǎn)的磁盤文件中。

因此Executor的內(nèi)存主要分為三塊:第一塊是讓task執(zhí)行我們自己編寫的代碼時(shí)使用,默認(rèn)是占Executor總內(nèi)存的20%;第二塊是讓task通過shuffle過程拉取了上一個(gè)stage的task的輸出后,進(jìn)行聚合等操作時(shí)使用,默認(rèn)也是占Executor總內(nèi)存的20%;第三塊是讓RDD持久化時(shí)使用,默認(rèn)占Executor總內(nèi)存的60%。

task的執(zhí)行速度是跟每個(gè)Executor進(jìn)程的CPU core數(shù)量有直接關(guān)系的。一個(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)線程。而每個(gè)Executor進(jìn)程上分配到的多個(gè)task,都是以每個(gè)task一條線程的方式,多線程并發(fā)運(yùn)行的。如果CPU core數(shù)量比較充足,而且分配到的task數(shù)量比較合理,那么通常來說,可以比較快速和高效地執(zhí)行完這些task線程。

以上就是Spark作業(yè)的基本運(yùn)行原理的說明,大家可以結(jié)合上圖來理解。理解作業(yè)基本原理,是我們進(jìn)行資源參數(shù)調(diào)優(yōu)的基本前提。
##原則:資源參數(shù)調(diào)優(yōu)
了解完了Spark作業(yè)運(yùn)行的基本原理之后,對(duì)資源相關(guān)的參數(shù)就容易理解了。所謂的Spark資源參數(shù)調(diào)優(yōu),其實(shí)主要就是對(duì)Spark運(yùn)行過程中各個(gè)使用資源的地方,通過調(diào)節(jié)各種參數(shù),來優(yōu)化資源使用的效率,從而提升Spark作業(yè)的執(zhí)行性能。以下參數(shù)就是Spark中主要的資源參數(shù),每個(gè)參數(shù)都對(duì)應(yīng)著作業(yè)運(yùn)行原理中的某個(gè)部分,我們同時(shí)也給出了一個(gè)調(diào)優(yōu)的參考值。

num-executors

  • 參數(shù)說明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來執(zhí)行。Driver在向YARN集群管理器申請(qǐng)資源時(shí),YARN集群管理器會(huì)盡可能按照你的設(shè)置來在集群的各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)相應(yīng)數(shù)量的Executor進(jìn)程。這個(gè)參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程,此時(shí)你的Spark作業(yè)的運(yùn)行速度是非常慢的。

  • 參數(shù)調(diào)優(yōu)建議:每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無法充分利用集群資源;設(shè)置的太多的話,大部分隊(duì)列可能無法給予充分的資源。

executor-memory

  • 參數(shù)說明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常,也有直接的關(guān)聯(lián)。

  • 參數(shù)調(diào)優(yōu)建議:每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適。但是這只是一個(gè)參考值,具體的設(shè)置還是得根據(jù)不同部門的資源隊(duì)列來定。可以看看自己團(tuán)隊(duì)的資源隊(duì)列的最大內(nèi)存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作業(yè)申請(qǐng)到的總內(nèi)存量(也就是所有Executor進(jìn)程的內(nèi)存總和),這個(gè)量是不能超過隊(duì)列的最大內(nèi)存量的。此外,如果你是跟團(tuán)隊(duì)里其他人共享這個(gè)資源隊(duì)列,那么申請(qǐng)的總內(nèi)存量最好不要超過資源隊(duì)列最大總內(nèi)存的1/3~1/2,避免你自己的Spark作業(yè)占用了隊(duì)列所有的資源,導(dǎo)致別的同學(xué)的作業(yè)無法運(yùn)行。

executor-cores

  • 參數(shù)說明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量。這個(gè)參數(shù)決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力。因?yàn)槊總€(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)task線程,因此每個(gè)Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程。

  • 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個(gè)較為合適。同樣得根據(jù)不同部門的資源隊(duì)列來定,可以看看自己的資源隊(duì)列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量,來決定每個(gè)Executor進(jìn)程可以分配到幾個(gè)CPU core。同樣建議,如果是跟他人共享這個(gè)隊(duì)列,那么num-executors * executor-cores不要超過隊(duì)列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運(yùn)行。

driver-memory

  • 參數(shù)說明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。

  • 參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來說不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了。唯一需要注意的一點(diǎn)是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大,否則會(huì)出現(xiàn)OOM內(nèi)存溢出的問題。

spark.default.parallelism

  • 參數(shù)說明:該參數(shù)用于設(shè)置每個(gè)stage的默認(rèn)task數(shù)量。這個(gè)參數(shù)極為重要,如果不設(shè)置可能會(huì)直接影響你的Spark作業(yè)性能。

  • 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個(gè)較為合適。很多同學(xué)常犯的一個(gè)錯(cuò)誤就是不去設(shè)置這個(gè)參數(shù),那么此時(shí)就會(huì)導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個(gè)HDFS block對(duì)應(yīng)一個(gè)task。通常來說,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個(gè)task),如果task數(shù)量偏少的話,就會(huì)導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。試想一下,無論你的Executor進(jìn)程有多少個(gè),內(nèi)存和CPU有多大,但是task只有1個(gè)或者10個(gè),那么90%的Executor進(jìn)程可能根本就沒有task執(zhí)行,也就是白白浪費(fèi)了資源!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數(shù)量為300個(gè),那么設(shè)置1000個(gè)task是可以的,此時(shí)可以充分地利用Spark集群的資源。

** spark.storage.memoryFraction**

  • 參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。也就是說,默認(rèn)Executor 60%的內(nèi)存,可以用來保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時(shí),可能數(shù)據(jù)就不會(huì)持久化,或者數(shù)據(jù)會(huì)寫入磁盤。

  • 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù),導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能。但是如果Spark作業(yè)中的shuffle類操作比較多,而持久化操作比較少,那么這個(gè)參數(shù)的值適當(dāng)降低一些比較合適。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢(通過spark web ui可以觀察到作業(yè)的gc耗時(shí)),意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值。

spark.shuffle.memoryFraction

  • 參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個(gè)task拉取到上個(gè)stage的task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說,Executor默認(rèn)只有20%的內(nèi)存用來進(jìn)行該操作。shuffle操作在進(jìn)行聚合時(shí),如果發(fā)現(xiàn)使用的內(nèi)存超出了這個(gè)20%的限制,那么多余的數(shù)據(jù)就會(huì)溢寫到磁盤文件中去,此時(shí)就會(huì)極大地降低性能。

  • 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少,shuffle操作較多時(shí),建議降低持久化操作的內(nèi)存占比,提高shuffle操作的內(nèi)存占比比例,避免shuffle過程中數(shù)據(jù)過多時(shí)內(nèi)存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值。

資源參數(shù)的調(diào)優(yōu),沒有一個(gè)固定的值,需要同學(xué)們根據(jù)自己的實(shí)際情況(包括Spark作業(yè)中的shuffle操作數(shù)量、RDD持久化操作數(shù)量以及spark web ui中顯示的作業(yè)gc情況),同時(shí)參考本篇文章中給出的原理以及調(diào)優(yōu)建議,合理地設(shè)置上述參數(shù)。

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

到此,相信大家對(duì)“Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

當(dāng)前標(biāo)題:Spark性能優(yōu)化基礎(chǔ)知識(shí)有哪些
文章源于:http://bm7419.com/article44/gijhhe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)網(wǎng)站改版、企業(yè)建站自適應(yīng)網(wǎng)站、定制網(wǎng)站、云服務(wù)器

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

手機(jī)網(wǎng)站建設(shè)