DataJoin類實(shí)現(xiàn)不同格式數(shù)據(jù)reduce側(cè)連接

                      實(shí)驗(yàn)名稱:Datajoin數(shù)據(jù)連接

創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站建設(shè)、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的偃師網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

實(shí)驗(yàn)?zāi)康模?/p>

 1、記錄我的Hadoop 實(shí)驗(yàn)過(guò)程,我是NCU HANG TIAN BAN 的學(xué)生。將會(huì)附上完整可運(yùn)行的代碼。程序中框架是一套模板百度的、書上也有但是重要算法是我自己寫的將會(huì)標(biāo)注。 http://blog.csdn.net/wawmg/article/details/8759076 這是我參考的框架模板。

    2、提示大致瀏覽可看加粗部分【1、2、3、4】

實(shí)驗(yàn)要求:

 任務(wù)1、多個(gè)數(shù)據(jù)源的內(nèi)連接

【數(shù)據(jù)樣例】

輸入:

factory:

factoryname addressID

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

Nanchang Univ 5

address:

addressID addressname

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

輸出:

factorynameaddressIDaddressname

Bank of Beijing1Beijing

Beijing Red Star1Beijing 

Beijing Rising1eijing 

Guangzhou Development Bank2 Guangzhou 

Guangzhou Honda2 Guangzhou

Shenzhen Thunder3 Shenzhen 

Tencent3 Shenzhen

[代碼開始了]【1】

// 先是TaggedWritable類 抄的不作改動(dòng)
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
/*TaggedMapOutput是一個(gè)抽象數(shù)據(jù)類型,封裝了標(biāo)簽與記錄內(nèi)容
 此處作為DataJoinMapperBase的輸出值類型,需要實(shí)現(xiàn)Writable接口,所以要實(shí)現(xiàn)兩個(gè)序列化方法
 自定義輸入類型*/
public class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}

public TaggedWritable(Writable data) // 構(gòu)造函數(shù)
{
this.tag = new Text(); // tag可以通過(guò)setTag()方法進(jìn)行設(shè)置
this.data = data;
}

@Override
public void readFields(DataInput in) throws IOException {
tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
data.readFields(in);
}


@Override
public void write(DataOutput out) throws IOException {
tag.write(out);
out.writeUTF(this.data.getClass().getName());
data.write(out);
}


@Override
public Writable getData() {
return data;
}
}
// http://blog.csdn.net/wawmg/article/details/8759076

【2】Map階段 算法自己寫的

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class JoinMapper extends DataJoinMapperBase {
    // 這個(gè)在任務(wù)開始時(shí)調(diào)用,用于產(chǎn)生標(biāo)簽
    // 此處就直接以文件名作為標(biāo)簽
    @Override
    protected Text generateInputTag(String inputFile) {
        System.out.println("inputFile = " + inputFile);
        return new Text(inputFile);
    }
    // 這里我們已經(jīng)確定分割符為',',更普遍的,用戶應(yīng)能自己指定分割符和組鍵。
    // 設(shè)置組鍵
    @Override
        protected Text generateGroupKey(TaggedMapOutput record) {
        String tag = ((Text) record.getTag()).toString();
    
        if(tag.indexOf("factory") != -1){
        String line = ((Text) record.getData()).toString();
        String[] tokens = line.split(" ");
        int len = tokens.length - 1;
        return new Text(tokens[len]);
        }else{
            String line = ((Text) record.getData()).toString();
            String[] tokens = line.split(" ");
            return new Text(tokens[0]);
        }
    }

    // 返回一個(gè)任何帶任何我們想要的Text標(biāo)簽的TaggedWritable
    @Override
    protected TaggedMapOutput generateTaggedMapOutput(Object value) {
        TaggedWritable retv = new TaggedWritable((Text) value);
        retv.setTag(this.inputTag); // 不要忘記設(shè)定當(dāng)前鍵值的標(biāo)簽
        return retv;
    }
}

【3】reduce階段 算法也是自己寫的

import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class JoinReducer extends DataJoinReducerBase {
    // 兩個(gè)參數(shù)數(shù)組大小一定相同,并且最多等于數(shù)據(jù)源個(gè)數(shù)
    @Override
    protected TaggedMapOutput combine(Object[] tags, Object[] values) {
        if (tags.length < 2) return null; // 這一步,實(shí)現(xiàn)內(nèi)聯(lián)結(jié)
        String joinedStr = "";
        String dd = "  ";
        for (int i = 0; i < values.length; i++) {
             // 以逗號(hào)作為原兩個(gè)數(shù)據(jù)源記錄鏈接的分割符
            TaggedWritable tw = (TaggedWritable) values[i];
            String line = ((Text) tw.getData()).toString();
            // 將一條記錄劃分兩組,去掉第一組的組鍵名。
            if( i == 0){
                String[] tokens = line.split(" ");
                dd += tokens[1];
            }
            if(i == 1){
                joinedStr += line;
                System.out.println(joinedStr);
            }
        }
        joinedStr += dd;
        TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
        retv.setTag((Text) tags[1]); // 這只retv的組鍵,作為最終輸出鍵。
        return retv;
    }
}

【4】Driver 驅(qū)動(dòng)類 抄的不作改動(dòng)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoinDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Path in = new Path("hdfs://localhost:9000/user/c/input/*.txt");
        Path out = new Path("hdfs://localhost:9000/user/c/output2");
        JobConf job = new JobConf(conf, DataJoinDriver.class);
        job.setJobName("DataJoin");
        FileSystem hdfs = FileSystem.get(conf);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TaggedWritable.class);
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new DataJoinDriver(),
                args);
        System.exit(res);
    }

}

 最后:輸出有點(diǎn)小問(wèn)題,就是沒有做排序。

分享名稱:DataJoin類實(shí)現(xiàn)不同格式數(shù)據(jù)reduce側(cè)連接
地址分享:http://bm7419.com/article14/gipege.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、用戶體驗(yàn)定制網(wǎng)站、網(wǎng)站排名手機(jī)網(wǎng)站建設(shè)、虛擬主機(jī)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司