大數(shù)據(jù):Map終結(jié)和Spill文件合并-創(chuàng)新互聯(lián)

當(dāng)Mapper沒(méi)有數(shù)據(jù)輸入,mapper.run中的while循環(huán)會(huì)調(diào)用context.nextKeyValue就返回false,于是便返回到runNewMapper中,在這里程序會(huì)關(guān)閉輸入通道和輸出通道,這里關(guān)閉輸出通道并沒(méi)有關(guān)閉collector,必須要先f(wàn)lush一下。

創(chuàng)新互聯(lián)建站提供高防服務(wù)器租用、云服務(wù)器、香港服務(wù)器、達(dá)州托管服務(wù)器

獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114 代碼結(jié)構(gòu):

Maptask.runNewMapper->NewOutputCollector.close->MapOutputBuffer.flush

我們看flush幫我們做了什么事情,為什么要flush。

public void flush() throws IOException, ClassNotFoundException,

       InterruptedException {

  LOG.info("Starting flush of map output");

  spillLock.lock();

  try {

   while (spillInProgress) {

      reporter.progress();

      spillDone.await();

//這里查看spillInProgress狀態(tài),如果有spill就等待完成,并且報(bào)告狀態(tài)。

}

    checkSpillException();

    final int kvbend = 4 * kvend;

//kvend是元數(shù)據(jù)塊的終點(diǎn),元數(shù)據(jù)是向下伸展的。

//kvend是以整數(shù)計(jì)的數(shù)組下標(biāo),kvbend是以字節(jié)計(jì)的數(shù)組下標(biāo)

if ((kvbend + METASIZE) % kvbuffer.length !=

        equator - (equator % METASIZE)) {

//這個(gè)條件說(shuō)明緩沖區(qū)中原來(lái)有數(shù)據(jù),現(xiàn)在spill已經(jīng)完成,需要釋放空間。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

  // spill finished

//spill一次需要調(diào)整一些參數(shù),以釋放空間,這個(gè)工作通過(guò)resetSpill完成

  resetSpill();

private void resetSpill() {

  final int e = equator;

  bufstart = bufend = e;

  final int aligned = e - (e % METASIZE);

  // set start/end to point to first meta record

  // Cast one of the operands to long to avoid integer overflow

  kvstart = kvend = (int)

    (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;

  LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +

    (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");

}

//這里其實(shí)就是在調(diào)整各個(gè)參數(shù)的位置。比如原點(diǎn)位,kvstart等。

}

    if (kvindex != kvend) {

//再來(lái)判斷緩沖區(qū)是否為空,如果不空表示不滿足spill條件(80%),但map處理完成沒(méi)有數(shù)據(jù)輸入。

  kvend = (kvindex + NMETA) % kvmeta.capacity();

      bufend = bufmark;

      LOG.info("Spilling map output");

      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

               "; bufvoid = " + bufvoid);

      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

               "); kvend = " + kvend + "(" + (kvend * 4) +

               "); length = " + (distanceTo(kvend, kvstart,

                     kvmeta.capacity()) + 1) + "/" + maxRec);

      sortAndSpill();

//調(diào)用一次sortAndSpill過(guò)程。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

}

  } catch (InterruptedException e) {

    throw new IOException("Interrupted while waiting for the writer", e);

  } finally {

    spillLock.unlock();

  }

//至此所有數(shù)據(jù)都已經(jīng)溢寫(xiě)出去,緩沖區(qū)已空,所有數(shù)據(jù)都spill到文件中

  assert !spillLock.isHeldByCurrentThread();

  // shut down spill thread and wait for it to exit. Since the preceding

  // ensures that it is finished with its work (and sortAndSpill did not

  // throw), we elect to use an interrupt instead of setting a flag.

  // Spilling simultaneously from this thread while the spill thread

  // finishes its work might be both a useful way to extend this and also

  // sufficient motivation for the latter approach.

  try {

    spillThread.interrupt();

//讓spill線程不在運(yùn)行

spillThread.join();

//結(jié)束spill線程

  } catch (InterruptedException e) {

    throw new IOException("Spill failed", e);

  }

  // release sort buffer before the merge

  kvbuffer = null;

  mergeParts();

//合并spill文件

  Path outputPath = mapOutputFile.getOutputFile();

  fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());

}

flush的目的,首先讓緩沖區(qū)的所有KV對(duì)數(shù)據(jù)都進(jìn)入spill文件,因?yàn)槊看蝧pill都會(huì)產(chǎn)生一個(gè)spill文件,所有spill文件可能不止一個(gè),所以要把spill文件合并到單個(gè)文件中,分發(fā)給reduce。

所以如果有spill正在進(jìn)行必須等待其完成,也可能沒(méi)有spill但是緩沖區(qū)非空,需要再一次sortAndSpill,總之要把緩沖區(qū)清空為止。所有數(shù)據(jù)都spill完成后就可以進(jìn)行mergeParts了

代碼結(jié)構(gòu):

Maptask.runNewMapper--->NewOutputCollector.close--->MapOutputBuffer.flush--->MapOutputBuffer.mergeParts

源代碼如下:

private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {

  // get the approximate size of the final output/index files

  long finalOutFileSize = 0;

  long finalIndexFileSize = 0;

  final Path[] filename = new Path[numSpills];

//每次溢寫(xiě)都會(huì)有一個(gè)文件,所以數(shù)組的大小是numSpills。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

  final TaskAttemptID mapId = getTaskID();

  for(int i = 0; i < numSpills; i++) {

//統(tǒng)計(jì)所有這些文件合并之后的大小

filename[i] = mapOutputFile.getSpillFile(i);

//通過(guò)spill文件的編號(hào)獲取到指定的spill文件路徑

finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();//獲取文件大小

  }

  if (numSpills == 1) {

//合并輸出有倆文件一個(gè)是output/file.out,一個(gè)是output/file.out.index

sameVolRename(filename[0],

mapOutputFile.getOutputFileForWriteInVolume(filename[0]));

//換個(gè)文件名,在原文件名上加個(gè)file.out

if (indexCacheList.size() == 0) {

//索引塊緩存indexCacheList已空

  sameVolRename(mapOutputFile.getSpillIndexFile(0),            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));//spillIndexFile改名。

    } else {

//索引塊緩存indexCacheList中還有索引記錄,要寫(xiě)到索引文件

  indexCacheList.get(0).writeToFile(

//寫(xiě)入文件

mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);

}

    sortPhase.complete();

    return;

//如果只有一個(gè)spill合并已經(jīng)完成。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

  }

  // read in paged indices

  for (int i = indexCacheList.size(); i < numSpills; ++i) {

//如果spill文件不止一個(gè),需要合并

Path indexFileName = mapOutputFile.getSpillIndexFile(i);

    indexCacheList.add(new SpillRecord(indexFileName, job));

//先把所有的SpillIndexFile收集在一起。

  }

  //make correction in the length to include the sequence file header

  //lengths for each partition

  finalOutFileSize += partitions * APPROX_HEADER_LENGTH;

//每個(gè)partition都有header

  finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;

//IndexFile,每個(gè)partition一個(gè)記錄。

  Path finalOutputFile =

     mapOutputFile.getOutputFileForWrite(finalOutFileSize);

  Path finalIndexFile =

      mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

  //The output stream for the final single output file

  FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

//創(chuàng)建合并,最終輸出。

  if (numSpills == 0) {

//要是沒(méi)有SipillFile生成,也創(chuàng)建一個(gè)空文件

//create dummy files

    IndexRecord rec = new IndexRecord();

//創(chuàng)建索引記錄

SpillRecord sr = new SpillRecord(partitions);

//創(chuàng)建spill記錄

try {

      for (int i = 0; i < partitions; i++) {

        long segmentStart = finalOut.getPos();

        FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);

       writer<K, V> writer =

          new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);

       writer.close();

//創(chuàng)建后馬上關(guān)閉,形成空文件。

 rec.startOffset = segmentStart;

        rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);

        rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);

        sr.putIndex(rec, i);

      }

      sr.writeToFile(finalIndexFile, job);

//所以記錄寫(xiě)入索引文件

} finally {

      finalOut.close();

    }

    sortPhase.complete();

    return;

  }

  {

    sortPhase.addPhases(partitions); // Divide sort phase into sub-phases

    IndexRecord rec = new IndexRecord();

    final SpillRecord spillRec = new SpillRecord(partitions);

    for (int parts = 0; parts < partitions; parts++) {

//finalOut最終輸出文件。循環(huán)分區(qū)獲得所有spill文件的該分區(qū)數(shù)據(jù),合并寫(xiě)入finalOut

  //create the segments to be merged

      List<Segment<K,V>> segmentList =

        new ArrayList<Segment<K, V>>(numSpills);

//創(chuàng)建Segment,數(shù)據(jù)段

  for(int i = 0; i < numSpills; i++) {

//準(zhǔn)備合并所有的Spill文件

 IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

        Segment<K,V> s =

          new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,

                           indexRecord.partLength, codec, true);

        segmentList.add(i, s);

//把每個(gè)Spill文件中相同partition的區(qū)段位置收集起來(lái)。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

 if (LOG.isDebugEnabled()) {

          LOG.debug("MapId=" + mapId + " Reducer=" + parts +

              "Spill =" + i + "(" + indexRecord.startOffset + "," +

              indexRecord.rawLength + ", " + indexRecord.partLength + ")");

        }

      }

      int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);

//做merge操作時(shí)同時(shí)操作的stream數(shù)上限

  boolean sortSegments = segmentList.size() > mergeFactor;

      //對(duì)segment進(jìn)行排序

      @SuppressWarnings("unchecked")

      RawKeyValueIterator kvIter = Merger.merge(job, rfs,

                     keyClass, valClass, codec,

                     segmentList, mergeFactor,

                     new Path(mapId.toString()),

                     job.getOutputKeyComparator(), reporter, sortSegments,

                     null, spilledRecordsCounter, sortPhase.phase(),

                     TaskType.MAP);

//合并同一partition在所有spill文件中的內(nèi)容,可能還需要sort,合并后的結(jié)構(gòu)是一個(gè)序列。

  //write merged output to disk

      long segmentStart = finalOut.getPos();

      FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);

     writer<K, V> writer =

          new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,

                           spilledRecordsCounter);

      if (combinerRunner == null || numSpills < minSpillsForCombine) { // minSpillsForCombine在MapOutputBuffer構(gòu)造函數(shù)內(nèi)被初始化,numSpills 為mapTask已經(jīng)溢寫(xiě)到磁盤(pán)spill文件數(shù)量

        Merger.writeFile(kvIter, writer, reporter, job);

//將合并后的結(jié)果直接寫(xiě)入文件。下面看一下writeFile的源代碼;

public static <K extends Object, V extends Object>

void writeFile(RawKeyValueIterator records, Writer<K, V> writer,

 Progressable progressable, Configuration conf)

throws IOException {

long progressBar = conf.getLong(JobContext.RECORDS_BEFORE_PROGRESS,

    10000);

long recordCtr = 0;

while(records.next()) {

 writer.append(records.getKey(), records.getValue());

//追加的方式輸出到writer中

  if (((recordCtr++) % progressBar) == 0) {

    progressable.progress();

  }

}

回到主代碼:

  } else {

//有combiner

 combineCollector.setWriter(writer);

//就插入combiner環(huán)節(jié)

 combinerRunner.combine(kvIter, combineCollector);

//將合并的結(jié)果經(jīng)過(guò)combiner后寫(xiě)入文件

  }

      //close

     writer.close();//關(guān)閉writer通道

      sortPhase.startNextPhase();

      // record offsets

      rec.startOffset = segmentStart;

//從當(dāng)前段的起點(diǎn)開(kāi)始

  rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);

      rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);

      spillRec.putIndex(rec, parts);

    }

    spillRec.writeToFile(finalIndexFile, job);

//把spillFile寫(xiě)入合并的indexFle

finalOut.close();

//關(guān)閉最終輸出流

for(int i = 0; i < numSpills; i++) {

      rfs.delete(filename[i],true);

//刪除所有spill文件

}

  }

}

該方法會(huì)將所有臨時(shí)文件合并成一個(gè)大文件保存到output/file.out中,同時(shí)生成相應(yīng)的索引文件output/file.out.index。 在進(jìn)行文件合并的過(guò)程中,Map Task以分區(qū)為單位進(jìn)行合并。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式:每輪合并io.sort.factor,默認(rèn)是100,個(gè)文件,并將產(chǎn)生的文 件重新加入待合并列表中,對(duì)文件排序后,重復(fù)上述過(guò)程,直到只有一個(gè)文件。只生產(chǎn)一個(gè)文件可以避免同時(shí)打開(kāi)大量的文件和同時(shí)讀取大量的小文件產(chǎn)生的隨機(jī)讀 取帶來(lái)的開(kāi)銷。最后會(huì)刪除所有的spill文件。

另外需要注意的是,mergeParts()中也有combiner的操作,但是需要滿足一定的條件:1、用戶設(shè)置了combiner;2、spill文件的數(shù)量超過(guò)了minSpillsForCombine的值,對(duì)應(yīng)配置項(xiàng)"min.num.spills.for.combine",可自行設(shè)置,默認(rèn)是3。這倆必須同時(shí)具備才會(huì)在此啟動(dòng)combiner的本地聚集操作。所以在Map階段有可能combiner會(huì)執(zhí)行兩次,所以有可能你的combiner執(zhí)行兩次之后輸出數(shù)據(jù)不符合預(yù)期了。

這樣Map階段的任務(wù)就算完成了。主要是讀取數(shù)據(jù)然后寫(xiě)入內(nèi)存緩沖區(qū),緩存區(qū)滿足條件就會(huì)快排后并設(shè)置partition后,spill到本地文件和索引文件;如果有combiner,spill之前也會(huì)做一次聚集操作,待數(shù)據(jù)跑完會(huì)通過(guò)歸并合并所有spill文件和索引文件,如果有combiner,合并之前在滿足條件后會(huì)做一次綜合的聚集操作。map階段的結(jié)果都會(huì)存儲(chǔ)在本地中(如果有reducer的話),非HDFS。

Mapper完成對(duì)所有輸入文件的處理,并將緩沖區(qū)的數(shù)據(jù)寫(xiě)出到spill文件之后,spill文件的存在只有三種可能:沒(méi)有spill,一個(gè)spill,多個(gè)spill。針對(duì)這三種都需要一個(gè)最終的輸出文件,不管內(nèi)容有沒(méi)有,內(nèi)容多少。這個(gè)最終文件是和單個(gè)spill文件是一樣的,按照partition分成若干段,然后是排好序的KV數(shù)據(jù),這個(gè)merge操作結(jié)合之前的spill文件進(jìn)行sort。就構(gòu)成了一次mergeSort,這個(gè)mergeSort只針對(duì)同一個(gè)Mapper的多個(gè)spill文件,以后在Reducer那里還會(huì)有Merge針對(duì)不同的Mapper文件。

當(dāng)Maptask完成后,從runNewMapper返回,下一個(gè)操作就是done。也就是MapTask的收尾工作。MapTask的收尾涉及到怎么把生成的數(shù)據(jù)輸出交給ReduceTask。MapTask和ReduceTask都是擴(kuò)展自Task。但是他們都沒(méi)有自己定義done函數(shù),所以他們都調(diào)用了Task的done。

程序在這里跳出runNewMapper 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

if (useNewApi) {

  runNewMapper(job, splitMetaInfo, umbilical, reporter);

} else {

  runOldMapper(job, splitMetaInfo, umbilical, reporter);

}

done(umbilical, reporter);

這個(gè)done我們點(diǎn)進(jìn)去后發(fā)現(xiàn)是Task.done,源碼如下;

public void done(TaskUmbilicalProtocol umbilical,

   TaskReporter reporter

               ) throws IOException, InterruptedException {

LOG.info("Task:" + taskId + " is done."

         + " And is in the process of committing");

updateCounters();

//更新容器

boolean commitRequired = isCommitRequired();

if (commitRequired) {

  int retries = MAX_RETRIES;

  setState(TaskStatus.State.COMMIT_PENDING);

  // say the task tracker that task is commit pending

 while (true) {

    try {

      umbilical.commitPending(taskId, taskStatus);

      break;

//如果commitPending沒(méi)有發(fā)生異常,就退出,否則重試。

} catch (InterruptedException ie) {

      // ignore

    } catch (IOException ie) {

      LOG.warn("Failure sending commit pending: " +

                StringUtils.stringifyException(ie));

      if (--retries == 0) {

        System.exit(67);

      }

    }

  }

  //wait for commit approval and commit

  commit(umbilical, reporter, committer);

}

taskDone.set(true);

reporter.stopCommunicationThread();

// Make sure we send at least one set of counter increments. It's

// ok to call updateCounters() in this thread after comm thread stopped.

updateCounters();

sendLastUpdate(umbilical);

//signal the tasktracker that we are done

sendDone(umbilical);

實(shí)現(xiàn)sendDone的源代碼:

private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {

int retries = MAX_RETRIES;

while (true) {

  try {

    umbilical.done(getTaskID());

//實(shí)際上這里向MRAppMaster上的TaskAttemptImpl發(fā)送TA_DONE事件

LOG.info("Task '" + taskId + "' done.");

    return;

  } catch (IOException ie) {

    LOG.warn("Failure signalling completion: " +

             StringUtils.stringifyException(ie));

    if (--retries == 0) {

      throw ie;

    }

  }

}

}

umbilical.done(getTaskID()); 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

//實(shí)際上這里向MRAppMaster上的TaskAttemptImpl發(fā)送TA_DONE事件,在TA_DONE事件的驅(qū)動(dòng)下,相應(yīng)的TaskAttemptImpl對(duì)象的狀態(tài)機(jī)執(zhí)行CleanupContainerTransition.transition,然后轉(zhuǎn)入SUCCESS_CONTAINER_CLEANUP狀態(tài)。注意這里有一個(gè)TaskAttemptEventType.TA_DONE事件是由具體的MapTask所在節(jié)點(diǎn)上發(fā)出的,但不是引起的狀態(tài)機(jī)的跳變是在MRAppMaster節(jié)點(diǎn)上。對(duì)于Maptask,會(huì)有一個(gè)umbilical,就代表著MRAppMaster。

MPAppmaster接到CONTAINER_REMOTE_CLEANUP事件,ContainerLauncher通過(guò)RPC機(jī)制調(diào)用Maptask所在節(jié)點(diǎn)的ContainerManagerImpl.stopContainers.使這個(gè)MapTask的容器進(jìn)入KILLED_BY_APPMASTER狀態(tài)從而不在活躍。操作成功后向相應(yīng)的TaskAttemptImpl發(fā)送TO_CONTAINER_CLEANED事件。如果一次TaskAttempt成功了,就意味著嘗試的任務(wù)也成功了,所以TaskAttempt的狀態(tài)關(guān)系到TaskImpl對(duì)象,taskImpl的掃描和善后,包括向上層的JobImpl對(duì)象發(fā)送TaskState.SUCCESSED事件。向自身TaskImpl發(fā)送的SUCCESSED事件會(huì)導(dǎo)致TaskImpl.handleTaskAttemptCompletion操作。

Mapper節(jié)點(diǎn)上產(chǎn)生一個(gè)過(guò)程setMapOutputServerAdress函數(shù),把本節(jié)點(diǎn)的MapOutputServer地址設(shè)置成一個(gè)Web地址,意味著MapTask留下的數(shù)據(jù)輸出(合并后的spill文件)可以通過(guò)HTTP連接獲取。至此Mapper的所有過(guò)程完成。 獲取更多大數(shù)據(jù)視頻資料請(qǐng)加QQ群:947967114

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

網(wǎng)頁(yè)題目:大數(shù)據(jù):Map終結(jié)和Spill文件合并-創(chuàng)新互聯(lián)
網(wǎng)址分享:http://bm7419.com/article0/geeoo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、網(wǎng)站改版微信小程序、云服務(wù)器、靜態(tài)網(wǎng)站、商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)