3、MapReduce詳解與源碼剖析

@[TOC]
3、MapReduce詳解與源碼剖析

創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括伊犁網(wǎng)站建設(shè)、伊犁網(wǎng)站制作、伊犁網(wǎng)頁(yè)制作以及伊犁網(wǎng)絡(luò)營(yíng)銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,伊犁網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到伊犁省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

1 Split階段

?????首先,接到hdf文件輸入,在mapreduce中的map task開始之前,將文件按照指定的大小切割成若干個(gè)部分,每一部分稱為一個(gè)split,默認(rèn)是split的大小與block的大小相等,均為128MB。split大小由minSize、maxSize、blocksize決定,以wordcount代碼為例,以下是main()方法
3、MapReduce詳解與源碼剖析
進(jìn)入waitForCompletion(true)方法,進(jìn)入submit()方法
3、MapReduce詳解與源碼剖析
找到 return submitter .submitJobInternal(Job.this, cluster);
進(jìn)入,找到 int maps = writeSplits(job, submitJobDir);
3、MapReduce詳解與源碼剖析
3、MapReduce詳解與源碼剖析
進(jìn)入writeNewSplits()方法
3、MapReduce詳解與源碼剖析
?????進(jìn)入writeNewSplits()方法,可以看出該方法首先獲取splits數(shù)組信息后,排序,將會(huì)優(yōu)先處理大文件。最終返回mapper數(shù)量。這其中又分為兩部分:確定切片數(shù)量 和 寫入切片信息。確定切片數(shù)量的任務(wù)交由FileInputFormat的getSplits(job)完成,寫入切片信息的任務(wù)交由JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法,該方法會(huì)將切片信息和SplitMetaInfo都寫入HDFS中,return array.length;返回的是map任務(wù)數(shù),默認(rèn)map的數(shù)量是: default_num = total_size / block_size;
?????實(shí)際的mapper數(shù)量就是輸入切片的數(shù)量,而切片的數(shù)量又由使用的輸入格式?jīng)Q定,默認(rèn)為TextInputFormat,該類為FileInputFormat的子類。確定切片數(shù)量的任務(wù)交由FileInputFormat的getSplits(job)完成。FileInputFormat繼承自抽象類InputFormat,該類定義了MapReduce作業(yè)的輸入規(guī)范,其中的抽象方法List<InputSplit> getSplits(JobContext context)定義了如何將輸入分割為InputSplit,不同的輸入有不同的分隔邏輯,而分隔得到的每個(gè)InputSplit交由不同的mapper處理,因此該方法的返回值確定了mapper的數(shù)量。

2 Map階段

?????每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū), map的輸出結(jié)果先寫到內(nèi)存中的環(huán)形緩沖區(qū),緩沖區(qū)為100M,不斷的向緩沖區(qū)力寫數(shù)據(jù),當(dāng)達(dá)到80M時(shí),需要將緩沖區(qū)中的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存到磁盤,當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤中這個(gè)map task所產(chǎn)生的所有臨時(shí)文件做合并,生成最終的輸出文件。最后,等待reduce task來拉取數(shù)據(jù)。當(dāng)然,如果map task的結(jié)果不大,能夠完全存儲(chǔ)到內(nèi)存緩沖區(qū),且未達(dá)到內(nèi)存緩沖區(qū)的閥值,那么就不會(huì)有寫臨時(shí)文件到磁盤的操作,也不會(huì)有后面的合并。在寫入的過程中會(huì)進(jìn)行分區(qū)、排序、combine操作。
?????環(huán)形緩沖區(qū):是使用指針機(jī)制把內(nèi)存中的地址首尾相接形成一個(gè)存儲(chǔ)中間數(shù)據(jù)的緩存區(qū)域,默認(rèn)100MB;80M閾值,20M緩沖區(qū),是為了解決寫入環(huán)形緩沖區(qū)數(shù)據(jù)的速度大于寫出到spill文件的速度是數(shù)據(jù)的不丟失;Spill文件:spill文件是環(huán)形緩沖區(qū)到達(dá)閾值后寫入到磁盤的單個(gè)文件.這些文件在map階段計(jì)算結(jié)束時(shí),會(huì)合成分好區(qū)的一個(gè)merge文件供給給reduce任務(wù)抓取;spill文件過小的時(shí)候,就不會(huì)浪費(fèi)io資源合并merge;默認(rèn)情況下3個(gè)以下spill文件不合并;對(duì)于在環(huán)形緩沖區(qū)中的數(shù)據(jù),最終達(dá)不到80m但是數(shù)據(jù)已經(jīng)計(jì)算完畢的情況,map任務(wù)將會(huì)調(diào)用flush將緩沖區(qū)中的數(shù)據(jù)強(qiáng)行寫出spill文件。

?????經(jīng)過map類處理后,輸出到內(nèi)存緩沖區(qū)(默認(rèn)大小100M),超過一定大小后,文件溢寫到磁盤上,按照key分類
3、MapReduce詳解與源碼剖析
按照key合并成大文件,減少網(wǎng)絡(luò)開銷
3、MapReduce詳解與源碼剖析

2.1分區(qū)

看一下MapReduce自帶的分區(qū)器HashPartitioner
3、MapReduce詳解與源碼剖析
假設(shè)有聽個(gè)reduce任務(wù),則分區(qū)的計(jì)算如下:
3、MapReduce詳解與源碼剖析

2.2排序

在對(duì)map結(jié)果進(jìn)行分區(qū)之后,對(duì)于落在相同的分區(qū)中的鍵值對(duì),要進(jìn)行排序。

3 Shuffle階段

?????Shuffle過程是MapReduce的核心,描述著數(shù)據(jù)從map task輸出到reduce task輸入的這段過程。reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask分區(qū)機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù),reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)。
?????所有相同key的數(shù)據(jù)匯集到一個(gè)partition
3、MapReduce詳解與源碼剖析
?????將相同的key value匯聚到一起, 但不計(jì)算
3、MapReduce詳解與源碼剖析

4 Reduce階段

reduce階段分三個(gè)步驟:
抓取,合并,排序
?????1 reduce 任務(wù)會(huì)創(chuàng)建并行的抓取線程(fetcher)負(fù)責(zé)從完成的map任務(wù)中獲取結(jié)果文件,是否完成是通過rpc心跳監(jiān)聽,通過http協(xié)議抓?。荒J(rèn)是5個(gè)抓取線程,可調(diào),為了是整體并行,在map任務(wù)量大,分區(qū)多的時(shí)候,抓取線程調(diào)大;
?????2 抓取過來的數(shù)據(jù)會(huì)先保存在內(nèi)存中,如果內(nèi)存過大也溢出,不可見,不可調(diào),但是單位是每個(gè)merge文件,不會(huì)切分?jǐn)?shù)據(jù);每個(gè)merge文件都會(huì)被封裝成一個(gè)segment的對(duì)象,這個(gè)對(duì)象控制著這個(gè)merge文件的讀取記錄操作,有兩種情況出現(xiàn):在內(nèi)存中有merge數(shù)據(jù) ?在溢寫之后存到磁盤上的數(shù)據(jù) ?通過構(gòu)造函數(shù)的區(qū)分,來分別創(chuàng)建對(duì)應(yīng)的segment對(duì)象
?????3 這種segment對(duì)象會(huì)放到一個(gè)內(nèi)存隊(duì)列中MergerQueue,對(duì)內(nèi)存和磁盤上的數(shù)據(jù)分別進(jìn)行合并,內(nèi)存中的merge對(duì)應(yīng)的segment直接合并,磁盤中的合并與一個(gè)叫做合并因子的factor有關(guān)(默認(rèn)是10)
?????4 排序問題,MergerQueue繼承輪換排序的接口,每一個(gè)segment 是排好序的,而且按照key的值大小邏輯(和真的大小沒關(guān)系);每一個(gè)segment的第一個(gè)key都是邏輯最小,而所有的segment的排序是按照第一個(gè)key大小排序的,最小的在前面,這種邏輯總能保證第一個(gè)segment的第一個(gè)key值是所有key的邏輯最小文件合并之后,最終交給reduce函數(shù)計(jì)算的,是MergeQueue隊(duì)列,每次計(jì)算的提取數(shù)據(jù)邏輯都是提取第一個(gè)segment的第一個(gè)key和value數(shù)據(jù),一旦segment被調(diào)用了提取key的方法,MergeQueue隊(duì)列將會(huì)整體重新按照最小key對(duì)segment排序,最終形成整體有序的計(jì)算結(jié)果;
3、MapReduce詳解與源碼剖析
3、MapReduce詳解與源碼剖析
partition 、Reduce、輸出文件數(shù)量相等
3、MapReduce詳解與源碼剖析
Reduce任務(wù)數(shù)量
在大數(shù)據(jù)量的情況下,如果只設(shè)置1個(gè)Reduce任務(wù),其他節(jié)點(diǎn)將被閑置,效率底下 所以將Reduce設(shè)置成一個(gè)較大的值(max:72).調(diào)節(jié)Reduce任務(wù)數(shù)量的方法 一個(gè)節(jié)點(diǎn)的Reduce任務(wù)數(shù)并不像Map任務(wù)數(shù)那樣受多個(gè)因素制約

通過參數(shù)調(diào)節(jié)mapred.reduce.tasks(在配置文件中)
在代碼中調(diào)用job.setNumReduceTasks(int n)方法(在code中)

當(dāng)前文章:3、MapReduce詳解與源碼剖析
網(wǎng)頁(yè)地址:http://bm7419.com/article46/pciohg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作、網(wǎng)站設(shè)計(jì)公司建站公司、域名注冊(cè)、ChatGPT、商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)