Hadoop之MapReduce

1 MapReduce 概述

MapReduce 是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)基于 Hadoop 的數(shù)據(jù)分析應(yīng)用的核心框架。

十載的新晃網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。全網(wǎng)整合營銷推廣的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整新晃建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)建站從事“新晃網(wǎng)站設(shè)計(jì)”,“新晃網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

MapReduce 核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)Hadoop 集群上。

1.1 MapReduce 優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

  • MapReduce 易于編程

    它簡單的實(shí)現(xiàn)一些接口,就可以完成一個(gè)分布式程序,這個(gè)分布式程序可以分布到大量廉價(jià)的 PC 機(jī)器上運(yùn)行,也就是說寫一個(gè)分布式程序,跟寫一個(gè)簡單的串行程序是一模一樣的,就是因?yàn)檫@個(gè)特點(diǎn)使得 MapReduce 編程變得非常流行。

  • 良好的擴(kuò)展性

    當(dāng)計(jì)算資源不能得到滿足的時(shí)候,可以通過簡單的增加機(jī)器來擴(kuò)展它的計(jì)算能力。

  • 高容錯(cuò)性

    MapReduce 設(shè)計(jì)的初衷就是使程序能夠部署在廉價(jià)的 PC 機(jī)器上,這就要求它具有很高的容錯(cuò)性,比如其中一臺機(jī)器掛了,它可以把上面的計(jì)算任務(wù)轉(zhuǎn)移到另外一個(gè)節(jié)點(diǎn)上運(yùn)行,不至于這個(gè)任務(wù)運(yùn)行失敗,而且這個(gè)過程不需要人工參與,而完全是由 Hadoop 內(nèi)部完成的。

  • 適合 PB 級以上海量數(shù)據(jù)的離線處理

    可以實(shí)現(xiàn)上千臺服務(wù)器集群并發(fā)工作,提供數(shù)據(jù)處理能力。

缺點(diǎn):

  • 不擅長實(shí)時(shí)計(jì)算

    MapReduce 無法像 MySQL 一樣,在毫秒或者秒級內(nèi)返回結(jié)果。

  • 不擅長流式計(jì)算

    流式計(jì)算的輸入數(shù)據(jù)是動態(tài)的,而 MapReduce 的輸入數(shù)據(jù)集是靜態(tài)的,不能動態(tài)變化。這是因?yàn)?MapReduce 自身的設(shè)計(jì)特點(diǎn)決定了數(shù)據(jù)源必須是靜態(tài)的。

  • 不擅長DAG(有向圖)計(jì)算

    多個(gè)應(yīng)用程序存在依賴關(guān)系,后一個(gè)應(yīng)用程序的輸入為前一個(gè)的輸出,在這種情況下,MapReduce 并不是不能做,而是使用后,每個(gè) MapReduce 作業(yè)的輸出結(jié)果都會寫入到磁盤,會造成大量的磁盤 IO,導(dǎo)致性能非常的低下。

1.2 MapReduce 核心思想

Hadoop 之 MapReduce

分布式的運(yùn)算程序往往需要分成至少 2 個(gè)階段。

第一個(gè)階段的 MapTask 并發(fā)實(shí)例,完全并行運(yùn)行,互不相干。

第二個(gè)階段的 ReduceTask 并發(fā)實(shí)例互不相干,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有 MapTask 并發(fā)實(shí)例的輸出。

MapReduce 編程模型只能包含一個(gè) Map 階段和一個(gè) Reduce 階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個(gè)MapReduce 程序,串行運(yùn)行。

1.3 MapReduce 進(jìn)程

一個(gè)完整的 MapReduce 程序在分布式運(yùn)行時(shí)有三類實(shí)例進(jìn)程:

MrAppMaster 負(fù)責(zé)整個(gè)程序的過程調(diào)度及狀態(tài)協(xié)調(diào)

MapTask 負(fù)責(zé) Map 階段的整個(gè)數(shù)據(jù)處理流程。

ReduceTask 負(fù)責(zé) Reduce 階段的整個(gè)數(shù)據(jù)處理流程。

1.4 常用數(shù)據(jù)序列化類型

Java 類型Hadoop Writable 類型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

1.5 MapReduce 編程規(guī)范

用戶編寫的程序分成三個(gè)部分:

Mapper 階段

  • 用戶自定義的 Mapper 要繼承自己的父類
  • Mapper 的輸入數(shù)據(jù)是 KV 對的形式(KV的類型可自定義
  • Mapper 中的業(yè)務(wù)邏輯寫在 map() 方法中
  • Mapper的輸出數(shù)據(jù)是 KV對的形式(KV的類型可自定義)
  • map() 方法(MapTask 進(jìn)程)對每一個(gè) <k,v> 調(diào)用一次

Reduce 階段

  • 用戶自定義的 Reducer 要繼承自己的父類
  • Reducer 的輸入數(shù)據(jù)類型對應(yīng) Mapper 的輸出數(shù)據(jù)類型,也是 KV
  • Reducer 的業(yè)務(wù)邏輯寫在 reduce() 方法中
  • ReduceTask 進(jìn)程對每一組相同 k 的 <k,v> 組調(diào)用一次reduce()方法

Driver 階段

  • 相當(dāng)于 YARN 集群的客戶端,用于提交我們整個(gè)程序到 YARN 集群,提交的是封裝了 MapReduce 程序相關(guān)運(yùn)行參數(shù)的job對象

1.6 WordCount 案例實(shí)操

導(dǎo)入依賴

<dependencies>
    <dependency>
        <groupid>junit</groupid>
        <artifactid>junit</artifactid>
        <version>RELEASE</version>
    </dependency>
    <dependency>
        <groupid>org.apache.logging.log4j</groupid>
        <artifactid>log4j-core</artifactid>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-common</artifactid>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-client</artifactid>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-hdfs</artifactid>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupid>jdk.tools</groupid>
        <artifactid>jdk.tools</artifactid>
        <version>1.8</version>
        <scope>system</scope>
        <systempath>${JAVA_HOME}/lib/tools.jar</systempath>
    </dependency>
</dependencies>

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

WcMapper

package com.djm.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WcMapper extends Mapper<longwritable, text,text, intwritable> {

    private Text key = new Text();

    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            this.key.set(word);
            context.write(this.key, this.one);
        }
    }
}

WcReduce

package com.djm.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcReduce extends Reducer<text, intwritable, text, intwritable> {

    private IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<intwritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable count : values) {
            sum += 1;
        }
        this.total.set(sum);
        context.write(key, this.total);
    }
}

WcDriver

package com.djm.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 獲得任務(wù)
        Job job = Job.getInstance(new Configuration());
        // 設(shè)置Classpath
        job.setJarByClass(WcDriver.class);
        // 設(shè)置Mapper
        job.setMapperClass(WcMapper.class);
        // 設(shè)置Reducer
        job.setReducerClass(WcReduce.class);
        // 設(shè)置Mapper的輸出key和value的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 設(shè)置Reducer的輸出key和value的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 設(shè)置輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

2 Hadoop 序列化

2.1 為什么不使用 Java 序列化框架進(jìn)行序列化

Serializable 是一個(gè)重量級的 Java 序列框架,一個(gè)對象被序列化后,會產(chǎn)生很多額外的信息(各種校驗(yàn)信息,Header,繼承體系等),會產(chǎn)生大量的 IO,所以不適合在網(wǎng)絡(luò)中高效的傳輸,所以,Hadoop 自己開發(fā)了一個(gè)輕量級的序列化框架(Writable)。

Hadoop序列化特點(diǎn):

1、緊湊:高效使用存儲空間。

2、快速:讀寫數(shù)據(jù)的額外開銷小

3、可擴(kuò)展:隨著通信協(xié)議的升級而可升級。

4、 互操作:支持多語言的交互。

2.2 自定義 bean 對象實(shí)現(xiàn)序列化接口

在開發(fā)過程中往往提供的基本序列化類型不能滿足要求,一般情況都需要創(chuàng)建一個(gè) Bean 實(shí)現(xiàn) Writable 接口。

具體實(shí)現(xiàn) bean 對象序列化步驟如下 7 步:

1、實(shí)現(xiàn) Writable 接口

2、反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),必須提供空參構(gòu)造

3、重寫序列化方法

4、重寫反序列方法

5、反序列化和序列化的順序必須完全一致

6、要想把結(jié)果顯示在文件中,需要重寫 toString()

7、如果需要將自定義的 bean 放在 key 中傳輸,則還需要實(shí)現(xiàn) Comparable 接口,因?yàn)?MapReduce 框中的 Shuffle 過程要求對 key 必須能排序

2.3 序列化案例實(shí)操

統(tǒng)計(jì)每一個(gè)手機(jī)號耗費(fèi)的總上行流量、下行流量、總流量

輸入數(shù)據(jù)格式:id 手機(jī)號碼 網(wǎng)絡(luò)ip 上行流量 下行流量 網(wǎng)絡(luò)狀態(tài)碼

輸出數(shù)據(jù)格式:手機(jī)號碼 上行流量 下行流量 總流量

FlowBean

package com.djm.mapreduce.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    public FlowBean() {
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = this.upFlow + this.downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
}

FlowMapper

package com.djm.mapreduce.flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<longwritable, text, flowbean> {

    private FlowBean flowBean = new FlowBean();
    private Text phone = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");
        phone.set(words[1]);
        long upFlow = Long.parseLong(words[words.length - 3]);
        long downFlow = Long.parseLong(words[words.length - 2]);
        flowBean.set(upFlow, downFlow);
        context.write(phone, flowBean);
    }
}

FlowReduce

package com.djm.mapreduce.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReduce extends Reducer<text, flowbean, text, flowbean> {

    private FlowBean totalFlow = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<flowbean> values, Context context) throws IOException, InterruptedException {
        long sumUpFlow = 0;
        long sumDownFlow = 0;
        for (FlowBean value : values) {
            long upFlow = value.getUpFlow();
            long downFlow = value.getDownFlow();
            sumUpFlow += upFlow;
            sumDownFlow += downFlow;
        }
        totalFlow.set(sumUpFlow, sumDownFlow);
        context.write(key, totalFlow);

    }
}

FlowDriver

package com.djm.mapreduce.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FlowDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3 MapReduce 框架原理

3.1 InputFormat 數(shù)據(jù)輸入

3.1.1 切片與MapTask并行度決定機(jī)制

一個(gè) Job 的 Map 階段并行度由客戶端在提交 Job 時(shí)的切片數(shù)決定

每一個(gè)Split切片分配一個(gè)MapTask并行實(shí)例處理

默認(rèn)情況下,切片大小=BlockSize

切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對每一個(gè)文件單獨(dú)切片

3.1.2 FileInputFormat 切片機(jī)制

切片機(jī)制:

  • 簡單的按照文件的內(nèi)容長度進(jìn)行切片
  • 切片大小等于 Block 大小
  • 切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對每個(gè)文件單獨(dú)切片

源碼中如何計(jì)算切片大小的?

  • Math.max(minSize, Math.min(maxSize, blockSize));
  • mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1
  • mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認(rèn)值Long.MAXValue

如何自定義切片大?。?/p>

  • maxsize(切片最大值):參數(shù)如果調(diào)得比blockSize小,則會讓切片變小,而且就等于配置的這個(gè)參數(shù)的值。
  • minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blockSize還大。
3.1.3 CombineTextInputFormat 切片機(jī)制

CombineTextInputFormat 用于小文件過多的場景,它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣,多個(gè)小文件就可以交給一個(gè) MapTask 處理。

Hadoop 之 MapReduce

3.1.4 FileInputFormat 的其他實(shí)現(xiàn)類

TextInputFormat:

TextInputForma 是默認(rèn)的 FileInputFormat 實(shí)現(xiàn)類,按行讀取每條記錄,鍵是存儲該行在整個(gè)文件中的起始字節(jié)偏移量,LongWritable 類型,值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符),Text類型。

KeyValueTextInputFormat:

每一行均為一條記錄,被分隔符分割為 key,value,可以通過在驅(qū)動類中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t"); 來設(shè)定分隔符,默認(rèn)分隔符是 tab。

NLineInputFormat:

如果使用 NlineInputFormat,代表每個(gè) map 進(jìn)程處理的 InputSplit 不再按 Block 塊去劃分,而是按 NlineInputFormat 指定的行數(shù)N來劃分,即輸入文件的總行數(shù) /N = 切片數(shù),如果不整除,切片數(shù) = 商 + 1。

3.1.5 自定義 InputFormat

無論 HDFS 還是 MapReduce,在處理小文件時(shí)效率都非常低,但又難免面臨處理大量小文件的場景,此時(shí),就需要有相應(yīng)解決方案??梢宰远x InputFormat 實(shí)現(xiàn)小文件的合并。
Hadoop 之 MapReduce

程序?qū)崿F(xiàn):

WholeFileInputformat

package com.djm.mapreduce.inputformat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WholeFileInputformat extends FileInputFormat<text, byteswritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    public RecordReader<text, byteswritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new WholeRecordReader();
    }
}

WholeRecordReader

package com.djm.mapreduce.inputformat;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class WholeRecordReader extends RecordReader<text, byteswritable> {

    private boolean notRead  = true;

    private Text key = new Text();

    private BytesWritable value = new BytesWritable();

    private FSDataInputStream fis;

    private FileSplit fs;

    /**
     * 初始化方法,框架會在開始的時(shí)候調(diào)用一次
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 轉(zhuǎn)換切換類型為文件切片
        fs = (FileSplit) split;
        // 通過切片獲取文件路徑
        Path path = fs.getPath();
        // 通過路徑獲取文件系統(tǒng)
        FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
        // 開流
        fis = fileSystem.open(path);
    }

    /**
     * 讀取下一組KV
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (notRead) {
            // 讀K
            key.set(fs.getPath().toString());
            // 讀V
            byte[] buf = new byte[(int) fs.getLength()];
            fis.read(buf);
            value.set(buf, 0, buf.length);
            notRead = false;
            return true;
        } else {
            return false;
        }
    }

    /**
     * 獲取當(dāng)前讀到的key
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public Text getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    /**
     * 獲取當(dāng)前讀到的value
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    /**
     * 當(dāng)前數(shù)據(jù)讀取的進(jìn)度
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public float getProgress() throws IOException, InterruptedException {
        return notRead ? 0 : 1;
    }

    /**
     * 關(guān)閉資源
     * @throws IOException
     */
    public void close() throws IOException {
        if (fis != null) {
            fis.close();
        }

    }
}

WholeFileDriver

package com.djm.mapreduce.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

public class WholeFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WholeFileDriver.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setInputFormatClass(WholeFileInputformat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

3.2 MapReduce工作流程

Hadoop 之 MapReduce

Hadoop 之 MapReduce

上面的流程是整個(gè) MapReduce 最全工作流程,但是 Shuffle 過程只是從第 7 步開始到第 16 步結(jié)束,具體 Shuffle 過程詳解,如下:

1)MapTask 收集我們的 map() 方法輸出的 KV 對,放到內(nèi)存緩沖區(qū)中

2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個(gè)文件

3)多個(gè)溢出文件會被合并成大的溢出文件

4)在溢出過程及合并的過程中,都要調(diào)用 Partitioner 進(jìn)行分區(qū)和針對 key 進(jìn)行排序

5)ReduceTask 根據(jù)自己的分區(qū)號,去各個(gè) MapTask 機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)

6)ReduceTask 會取到同一個(gè)分區(qū)的來自不同 MapTask 的結(jié)果文件,ReduceTask 會將這些文件再進(jìn)行合并(歸并排序)

7)合并成大文件后,Shuffle 的過程也就結(jié)束了,后面進(jìn)入 ReduceTask 的邏輯運(yùn)算過程(從文件中取出一個(gè)一個(gè)的鍵值對 Group,調(diào)用用戶自定義的 reduce() 方法)

3.3 Shuffle 機(jī)制

Hadoop 之 MapReduce

3.3.1 Partition 分區(qū)

分區(qū)可以將統(tǒng)計(jì)結(jié)果按照條件輸出到不同的文件中

默認(rèn) Partition 分區(qū):

public class HashPartitioner<k, v> extends Partitioner<k, v> {

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

默認(rèn)分區(qū)是根據(jù) key 的 hashCode 對 ReduceTasks 個(gè)數(shù)取模決定的。

自定義 Partition 步驟:

  • 自定義類繼承 Partitioner,重寫 getPartition() 方法
public class CustomPartitioner extends Partitioner<text, flowbean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 控制分區(qū)代碼邏輯
        return partition;
    }
}
  • 在驅(qū)動類中,指定 Partitioner
  • 自定義 Partition 后,要根據(jù)自定義 Partitioner 的邏輯設(shè)置相應(yīng)數(shù)量的 ReduceTask

注意:

  • 如果 ReduceTask 的數(shù)量 > getPartition 的結(jié)果數(shù),則會多產(chǎn)生幾個(gè)空的輸出文件 part-r-000xx;
  • 如果 1< ReduceTask的數(shù)量 < getPartition 的結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放,會 Exception;
  • 如果 ReduceTask 的數(shù)量 = 1,則不管 MapTask 端輸出多少個(gè)分區(qū)文件,最終結(jié)果都交給這一個(gè) ReduceTask,最終也就只會產(chǎn)生一個(gè)結(jié)果文件 part-r-00000;
  • 分區(qū)號必須從零開始,逐一累加。

需求分析:

Hadoop 之 MapReduce

代碼實(shí)現(xiàn):

# ProvincePartitioner
package com.djm.mapreduce.partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<flowbean, text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        switch (text.toString().substring(0, 3)) {
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }
    }
}

# PartitionerFlowDriver
package com.djm.mapreduce.partitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionerFlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(PartitionerFlowDriver.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReduce.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(5);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
3.3.2 WritableComparable 排序

排序是 MapReduce 框架中最重要的操作之一,MapTask 和 ReduceTask 均會對數(shù)據(jù)按照 key 進(jìn)行排序,該操作屬于Hadoop 的默認(rèn)行為,任何應(yīng)用程序中的數(shù)據(jù)均會被排序,而不管邏輯上是否需要。

默認(rèn)排序是按照字典順序排序,且實(shí)現(xiàn)該排序的方法是快速排序:

對于 MapTask,它會將處理的結(jié)果暫時(shí)放到環(huán)形緩沖區(qū)中,當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后,再對緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序,并將這些有序數(shù)據(jù)溢寫到磁盤上,而當(dāng)數(shù)據(jù)處理完畢后,它會對磁盤上所有文件進(jìn)行歸并排序。

對于 ReduceTask,它從每個(gè) MapTask 上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值,則溢寫磁盤上,否則存儲在內(nèi)存中,如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次歸并排序以生成一個(gè)更大文件,如果內(nèi)存中文件大小或者數(shù)目超過一定閾值,則進(jìn)行一次合并后將數(shù)據(jù)溢寫到磁盤上,當(dāng)所有數(shù)據(jù)拷貝完畢后,ReduceTask 統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序。

排序分類:

Hadoop 之 MapReduce

需求分析:
Hadoop 之 MapReduce

代碼實(shí)現(xiàn):

package com.djm.mapreduce.partitioner;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class FlowBean implements WritableComparable<flowbean> {

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = this.upFlow + this.downFlow;
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    @Override
    public int compareTo(FlowBean o) {
        return this.sumFlow > o.sumFlow ? -1:1;
    }
}
3.3.3 GroupingComparator 分組

對 Reduce 階段的數(shù)據(jù)根據(jù)某一個(gè)或幾個(gè)字段進(jìn)行分組。

分組排序步驟:

  • 自定義類繼承WritableComparator

  • 重寫compare()方法

  • 創(chuàng)建一個(gè)構(gòu)造將比較對象的類傳給父類

    protected OrderGroupingComparator() {
      super(OrderBean.class, true);
    }

需求分析:

Hadoop 之 MapReduce

代碼實(shí)現(xiàn):

# OrderBean
package com.djm.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class OrderBean implements WritableComparable<orderbean> {

    private String orderId;

    private String productId;

    private double price;

    @Override
    public int compareTo(OrderBean o) {
        int compare = this.orderId.compareTo(o.orderId);

        if (compare == 0) {
            return Double.compare(o.price, this.price);
        } else {
            return compare;
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(productId);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.productId = in.readUTF();
        this.price = in.readDouble();
    }
}

# OrderSortGroupingComparator
package com.djm.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderSortGroupingComparator extends WritableComparator {

    public OrderSortGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;

        return oa.getOrderId().compareTo(ob.getOrderId());
    }
}

# OrderSortDriver
package com.djm.mapreduce.order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(OrderSortDriver.class);
        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderSortReduce.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.4 MapTask 工作機(jī)制

Hadoop 之 MapReduce

1)Read 階段:MapTask 通過用戶編寫的 RecordReader,從輸入 InputSplit 中解析出一個(gè)個(gè) key/value。

2)Map階段:該節(jié)點(diǎn)主要是將解析出的 key/value 交給用戶編寫 map() 函數(shù)處理,并產(chǎn)生一系列新的 key/value。

3)Collect 收集階段:在用戶編寫 map() 函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會調(diào)用 OutputCollector.collect() 輸出結(jié)果,在該函數(shù)內(nèi)部,它會將生成的 key/value 分區(qū)(調(diào)用Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。

4)Spill 階段:即溢寫,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce 會將數(shù)據(jù)寫到本地磁盤上,生成一個(gè)臨時(shí)文件,需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對數(shù)據(jù)進(jìn)行合并、壓縮等操作。

  • 利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號 Partition 進(jìn)行排序,然后按照 key 進(jìn)行排序,這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照 key 有序。
  • 按照分區(qū)編號由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文件 output/spillN.out(N表示當(dāng)前溢寫次數(shù))中,如果用戶設(shè)置了 Combiner,則寫入文件之前,對每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
  • 將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu) SpillRecord 中,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小,如果當(dāng)前內(nèi)存索引大小超過 1MB,則將內(nèi)存索引寫到文件 output/spillN.out.index中。

5)Combine 階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask 對所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會生成一個(gè)數(shù)據(jù)文件。

6)當(dāng)所有數(shù)據(jù)處理完后,MapTask 會將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件 output/file.out 中,同時(shí)生成相應(yīng)的索引文件 output/file.out.index。

7)在進(jìn)行文件合并過程中,MapTask 以分區(qū)為單位進(jìn)行合并,對于某個(gè)分區(qū),它將采用多輪遞歸合并的方式,每輪合并io.sort.factor(默認(rèn)10)個(gè)文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復(fù)以上過程,直到最終得到一個(gè)大文件。

8)讓每個(gè) MapTask 最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來的開銷。

3.5 ReduceTask 工作機(jī)制

Hadoop 之 MapReduce

1)Copy 階段:ReduceTask 從各個(gè) MapTask 上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。

2)Merge 階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),ReduceTask 啟動了兩個(gè)后臺線程對內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過多或磁盤上文件過多。

3)Sort 階段:按照 MapReduce 語義,用戶編寫 reduce() 函數(shù)輸入數(shù)據(jù)是按 key 進(jìn)行聚集的一組數(shù)據(jù),為了將 key 相同的數(shù)據(jù)聚在一起,Hadoop 采用了基于排序的策略,由于各個(gè) MapTask 已經(jīng)實(shí)現(xiàn)對自己的處理結(jié)果進(jìn)行了局部排序,因此,ReduceTask 只需對所有數(shù)據(jù)進(jìn)行一次歸并排序即可。

4)Reduce 階段:reduce() 函數(shù)將計(jì)算結(jié)果寫到 HDFS 上。

ReduceTask 的并行度同樣影響整個(gè) Job 的執(zhí)行并發(fā)度和執(zhí)行效率,但與 MapTask 的并發(fā)數(shù)由切片數(shù)決定不同,ReduceTask 數(shù)量的決定是可以直接手動設(shè)置:

job.setNumReduceTasks(4);

注意事項(xiàng):

  • ReduceTask=0,表示沒有 Reduce 階段,輸出文件個(gè)數(shù)和 Map 個(gè)數(shù)一致
  • ReduceTask 默認(rèn)值就是 1,所以輸出文件個(gè)數(shù)為一個(gè)
  • 如果數(shù)據(jù)分布不均勻,就有可能在 Reduce 階段產(chǎn)生數(shù)據(jù)傾斜
  • ReduceTask 數(shù)量并不是任意設(shè)置,還要考慮業(yè)務(wù)邏輯需求,有些情況下,需要計(jì)算全局匯總結(jié)果,就只能有 1 個(gè)ReduceTask
  • 具體多少個(gè) ReduceTask,需要根據(jù)集群性能而定
  • 如果分區(qū)數(shù)不是 1,但是 ReduceTask 為 1,不會執(zhí)行分區(qū)過程

3.6 OutputFormat 數(shù)據(jù)輸出

3.6.1 OutputFormat 接口實(shí)現(xiàn)類

Hadoop 之 MapReduce

3.6.2 自定義 OutputFormat

Hadoop 之 MapReduce

需求分析:

Hadoop 之 MapReduce

代碼實(shí)現(xiàn):

# FilterOutputFormat
package com.djm.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FilterOutputFormat extends FileOutputFormat<text, nullwritable> {
    @Override
    public RecordWriter<text, nullwritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new FilterRecordWriter(job);
    }
}

# FilterRecordWriter
package com.djm.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class FilterRecordWriter extends RecordWriter<text, nullwritable> {

    private FSDataOutputStream atguiguOut = null;
    private FSDataOutputStream otherOut = null;

    public FilterRecordWriter() {
    }

    public FilterRecordWriter(TaskAttemptContext job) {
        FileSystem fs;

        try {
            fs = FileSystem.get(job.getConfiguration());
            Path atguigu = new Path("C:\\Application\\Apache\\hadoop-2.7.2\\djm.log");
            Path other = new Path("C:\\Application\\Apache\\hadoop-2.7.2\\other.log");
            atguiguOut = fs.create(atguigu);
            otherOut = fs.create(other);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        if (key.toString().contains("atguigu")) {
            atguiguOut.write(key.toString().getBytes());
        } else {
            otherOut.write(key.toString().getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    }
}

# FilterDriver
package com.djm.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FilterDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FilterDriver.class);
        job.setMapperClass(FilterMapper.class);
        job.setReducerClass(FilterReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(FilterOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.7 Join

3.7.1 Reduce Join

工作原理:

  • Map 端

    為來自不同表或文件的 key/value 對,打標(biāo)簽以區(qū)別不同來源的記錄,然后用連接字段作為 key,其余部分和新加的標(biāo)志作為 value,最后進(jìn)行輸出。

  • Reduce端

    在 Reduce 端以連接字段作為 key 的分組已經(jīng)完成,我們只需要在每一個(gè)分組當(dāng)中將那些來源于不同文件的記錄分開,最后進(jìn)行合并。

需求分析:

Hadoop 之 MapReduce

代碼實(shí)現(xiàn):

# TableBean
package com.djm.mapreduce.table;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class TableBean implements Writable {

    private String orderId;

    private String productId;

    private int amount;

    private String pname;

    private String flag;

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(productId);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.productId = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();

    }
}

# TableMapper
package com.djm.mapreduce.table;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class TableMapper extends Mapper<longwritable, text, tablebean>{

    String name;
    TableBean bean = new TableBean();
    Text k = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        name = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (name.startsWith("order")) {// 訂單表處理
            String[] fields = line.split("\t");
            bean.setOrder_id(fields[0]);
            bean.setP_id(fields[1]);
            bean.setAmount(Integer.parseInt(fields[2]));
            bean.setPname("");
            bean.setFlag("order");
            k.set(fields[1]);
        }else {
            String[] fields = line.split("\t");
            bean.setP_id(fields[0]);
            bean.setPname(fields[1]);
            bean.setFlag("pd");
            bean.setAmount(0);
            bean.setOrder_id("");
            k.set(fields[0]);
        }
        context.write(k, bean);
    }
}

# TableReducer
package com.djm.mapreduce.table;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<text, tablebean, nullwritable> {
    @Override
    protected void reduce(Text key, Iterable<tablebean> values, Context context) throws IOException, InterruptedException {
        ArrayList<tablebean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();
        for (TableBean bean : values) {
            if ("order".equals(bean.getFlag())) {
                TableBean orderBean = new TableBean();
                try {
                    BeanUtils.copyProperties(orderBean, bean);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBeans.add(orderBean);
            }
            else {
                try {
                    BeanUtils.copyProperties(pdBean, bean);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    e.printStackTrace();
                }
            }

        }
        for (TableBean bean :orderBeans) {
            bean.setPname (pdBean.getPname());
            context.write(bean, NullWritable.get());
        }
    }
}

package com.djm.mapreduce.table;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.addCacheFile(new URI("file:///C:/Application/Apache/hadoop-2.7.2/input/pd.txt"));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
3.7.2 Map Join

Map Join 適用于一張表十分小、一張表很大的場景。

優(yōu)點(diǎn):

在 Map 端緩存多張表,提前處理業(yè)務(wù)邏輯,這樣增加 Map 端業(yè)務(wù),減少 Reduce 端數(shù)據(jù)的壓力,就可以盡可能的減少數(shù)據(jù)傾斜。

需求分析:

Hadoop 之 MapReduce

代碼實(shí)現(xiàn):

# TableMapper
package com.djm.mapreduce.table;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class TableMapper extends Mapper<longwritable, text, nullwritable> {

    private Text k = new Text();

    private Map<string, string> pdMap = new HashMap<>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] cacheFiles = context.getCacheFiles();
        String path = cacheFiles[0].getPath();

        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
        String line;
        while(StringUtils.isNotEmpty(line = reader.readLine())){
            String[] fields = line.split("\t");
            pdMap.put(fields[0], fields[1]);
        }
        reader.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] fields = value.toString().split("\t");
        String pId = fields[1];
        String pdName = pdMap.get(pId);
        k.set(fields[0] + "\t"+ pdName + "\t" + fields[2]);
        context.write(k, NullWritable.get());
    }
}

# TableDriver
package com.djm.mapreduce.table;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.addCacheFile(new URI("file:///C:/Application/Apache/hadoop-2.7.2/input/pd.txt"));
        job.setNumReduceTasks(0);
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.8 ETL

在運(yùn)行核心業(yè)務(wù) MapReduce 程序之前,往往要先對數(shù)據(jù)進(jìn)行清洗,清理掉不符合用戶要求的數(shù)據(jù)。清理的過程往往只需要運(yùn)行 Mapper 程序,不需要運(yùn)行 Reduce 程序。

需求分析:

需要在 Map 階段對輸入的數(shù)據(jù)根據(jù)規(guī)則進(jìn)行過濾清洗。

代碼實(shí)現(xiàn):

# LogBean
package com.djm.mapreduce.etl;

@Data
public class LogBean {

    private String remoteAddr;

    private String remoteUser;

    private String timeLocal;

    private String request;

    private String status;

    private String bodyBytesSent;

    private String httpReferer;

    private String httpUserAgent;

    private boolean valid = true;
}
# LogMapper
package com.djm.mapreduce.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<longwritable, text, nullwritable> {

    private Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        LogBean bean = parseLog(line);
        if (!bean.isValid()) {
            return;
        }
        k.set(bean.toString());
        context.write(k, NullWritable.get());
    }

    private LogBean parseLog(String line) {
        LogBean logBean = new LogBean();
        String[] fields = line.split(" ");
        if (fields.length > 11) {
            logBean.setRemoteAddr(fields[0]);
            logBean.setRemoteUser(fields[1]);
            logBean.setTimeLocal(fields[3].substring(1));
            logBean.setRequest(fields[6]);
            logBean.setStatus(fields[8]);
            logBean.setBodyBytesSent(fields[9]);
            logBean.setHttpReferer(fields[10]);
            if (fields.length > 12) {
                logBean.setHttpUserAgent(fields[11] + " " + fields[12]);
            } else {
                logBean.setHttpUserAgent(fields[11]);
            }
            if (Integer.parseInt(logBean.getStatus()) >= 400) {
                logBean.setValid(false);
            }
        } else {
            logBean.setValid(false);
        }
        return logBean;
    }
}

# LogDriver
package com.djm.mapreduce.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

3.9 MapReduce 開發(fā)總結(jié)

在編寫 MapReduce 程序時(shí),需要考慮如下幾個(gè)方面:

Mapper

  • 用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:map() setup() cleanup ()

Partitioner分區(qū)

  • 有默認(rèn)實(shí)現(xiàn) HashPartitioner,邏輯是根據(jù) key 的哈希值和 numReduces 來返回一個(gè)分區(qū)號

    key.hashCode()&Integer.MAXVALUE % numReduces
  • 如果業(yè)務(wù)上有特別的需求,可以自定義分區(qū)

Comparable

  • 當(dāng)我們用自定義的對象作為 key 來輸出時(shí),就必須要實(shí)現(xiàn) WritableComparable 接口,重寫其中的 compareTo() 方法
  • 部分排序:對最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序
  • 全排序:對所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè) Reduce
  • 二次排序:排序的條件有兩個(gè)

Combiner

  • Combiner 合并可以提高程序執(zhí)行效率,減少 IO 傳輸,但是使用時(shí)必須不能影響原有的業(yè)務(wù)處理結(jié)果

GroupingComparator

  • 在 Reduce 端對 key 進(jìn)行分組

Reducer

  • 用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:reduce() setup() cleanup ()

OutputFormat

  • 默認(rèn)實(shí)現(xiàn)類是 TextOutputFormat,功能邏輯是:將每一個(gè) KV 對,向目標(biāo)文本文件輸出一行
  • 將 SequenceFileOutputFormat 輸出作為后續(xù) MapReduce任務(wù)的輸入,這便是一種好的輸出格式,因?yàn)樗母袷骄o湊,很容易被壓縮
  • 用戶還可以自定義 OutputFormat

網(wǎng)站標(biāo)題:Hadoop之MapReduce
URL分享:http://bm7419.com/article16/igopdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、虛擬主機(jī)、商城網(wǎng)站網(wǎng)站營銷、網(wǎng)站策劃、網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)