6.sparkcore之鍵值對(duì)操作-創(chuàng)新互聯(lián)

鍵值對(duì)RDD(pair RDD)是spark中許多操作所需要的常見數(shù)據(jù)類型,通常用來進(jìn)行聚合計(jì)算。

成都創(chuàng)新互聯(lián)-云計(jì)算及IDC服務(wù)提供商,涵蓋公有云、IDC機(jī)房租用、西部信息服務(wù)器托管、等保安全、私有云建設(shè)等企業(yè)級(jí)互聯(lián)網(wǎng)基礎(chǔ)服務(wù),歡迎來電:13518219792

創(chuàng)建Pair RDD

spark有多種方式可以創(chuàng)建pair RDD。比如:很多存儲(chǔ)鍵值對(duì)的數(shù)據(jù)格式在讀取時(shí)直接返回pair RDD;通過map()算子將普通的RDD轉(zhuǎn)為pair RDD。

scala

# 使用第一個(gè)單詞作為鍵創(chuàng)建一個(gè)pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))

java

# 使用第一個(gè)單詞作為鍵創(chuàng)建一個(gè)pair RDD
# jdk1.8后也支持lambda表達(dá)式方式
PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

python

# 使用第一個(gè)單詞作為鍵創(chuàng)建一個(gè)pair RDD
pairs = lines.map(lambda x: (x.split(" ")[0], x))

從一個(gè)內(nèi)存中的數(shù)據(jù)集創(chuàng)建pair RDD時(shí),scala和python只需要對(duì)這個(gè)二元組集合調(diào)用SparkContext的parallelize()方法即可;而java需要使用SparkContext.parallelizePairs()方法。

pair RDD轉(zhuǎn)化操作

轉(zhuǎn)化操作總覽

針對(duì)單個(gè)Pair RDD的轉(zhuǎn)化操作
函數(shù)名 作用 示例
reduceByKey(func)合并具有相同鍵的值rdd.reduceByKey((x, y) => x + y)
groupByKey()對(duì)具有相同鍵的值進(jìn)行分組rdd.groupByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)使用不同的返回類型合并具有相同鍵的值rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
mapValues(func)對(duì)pair RDD中的每個(gè)值應(yīng)用一個(gè)函數(shù)而不改變鍵rdd.mapValues(x => x + 1)
flatMapValues(func)對(duì)pair RDD中的每個(gè)值應(yīng)用一個(gè)返回迭代器的函數(shù),生成對(duì)應(yīng)原鍵的鍵值對(duì)記錄rdd.flatMapValues(x => (x to 5))
keys()返回一個(gè)僅包含鍵的RDDrdd.keys
values()返回一個(gè)僅包含值得RDDrdd.values
sortByKey()返回一個(gè)根據(jù)鍵排序的RDDrdd.sortByKey()
針對(duì)兩個(gè)Pair RDD的轉(zhuǎn)化操作
函數(shù)名 作用 示例
subtractByKey刪除RDD中鍵與other RDD中鍵相同的元素rdd.subtractByKey(other)
join對(duì)兩個(gè)RDD進(jìn)行內(nèi)連接rdd.join(other)
leftOuterJoin對(duì)兩個(gè)RDD進(jìn)行連接操作,確保第二個(gè)RDD的鍵必須存在(左外連接)rdd.leftOuterJoin(other)
rightOuterJoin對(duì)兩個(gè)RDD進(jìn)行連接操作,確保第一個(gè)RDD的鍵必須存在(右外連接)rdd.rightOuterJoin(other)
cogroup將兩個(gè)RDD中擁有相同鍵的數(shù)據(jù)分組在一起rdd.cogroup(other)

聚合

  • 使用mapValues()和reduceByKey()計(jì)算每個(gè)鍵對(duì)應(yīng)值的均值。
scala
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
python
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  • 使用flatMap()、map()和reduceByKey()計(jì)算單詞統(tǒng)計(jì)
scala
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
java
JavaRDD<String> input = sc.textFile("s3://...");
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
   public Iterable<String> call(String x) {
        return Arrays.asList(x.split(" "));
   }
});
JavaPairRDD<String, Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String x) {
    return new Tuple2(x, 1);
  }
}).reduceByKey(
    new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
)
python
rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
  • 使用combineByKey()返回與輸入數(shù)據(jù)不同類型的返回值,求每個(gè)鍵對(duì)應(yīng)的平均值
    執(zhí)行原理
    1.combineByKey()作用于rdd的每個(gè)分區(qū)。
    2.如果訪問的元素在分區(qū)中第一次出現(xiàn),就使用createCombiner()方法創(chuàng)建那個(gè)鍵對(duì)應(yīng)累加器的初始值。
    3.如果訪問的元素在當(dāng)前分區(qū)已經(jīng)出現(xiàn)過,就使用mergeValue()方法將該鍵的累加器對(duì)應(yīng)的當(dāng)前值和新值合并。
    4.如果有兩個(gè)或多個(gè)分區(qū)都有對(duì)應(yīng)同一個(gè)鍵的累加器時(shí),就使用mergeCombiners()方法將各個(gè)分區(qū)的結(jié)果進(jìn)行合并。
scala
val result = rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{case (key, value) => (key, value._1 / value._2.toFloat)}
java
public static class AvgCount implements Serializable {
    public int total_;
    public int num_;
    public AvgCount(int total, int num) {
        total_ = total;
        num_ = num;
    }
    public float avg() {
        return total_/(float)num_;
    }
}

Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
    public AvgCount call(Integer x) {
        return new AvgCount(x, 1);
    }
};

Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
        a.total_ += x;
        a.num_ += 1;
        return a;
    }
};

Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total_ += b.total_;
        a.num_ += b.num_;
        return a;
    }
};

AvgCount initial = new AvgCount(0, 0);
JavaPairRDD<String, AvgCount> avgCounts = input.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
    System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
python
sumCount = input.combineByKey((lambda x: (x, 1)), (lambda x, y: (x[0] + y, x[1] + 1)), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

分組

對(duì)于單個(gè)RDD數(shù)據(jù)進(jìn)行分組時(shí),使用groupByKey()。如果先使用groupByKey(),再使用reduce()或fold()時(shí),可能使用一種根據(jù)鍵進(jìn)行聚合的函數(shù)更高效。比如,rdd.reduceByKey(func)與rdd.groupByKey().mapValues(value => value.reduce(func))等價(jià),但前者更高效,因?yàn)楸苊饬藶槊總€(gè)鍵存放值列表的步驟。

對(duì)多個(gè)共享同一個(gè)鍵的RDD進(jìn)行分組時(shí),使用cogroup()。cogroup方法會(huì)得到結(jié)果RDD類型為[(K, (Iterable[V], Iterable[W]))]。

連接

將一組有鍵的數(shù)據(jù)與另一組有鍵的數(shù)據(jù)連接使用是對(duì)鍵值對(duì)數(shù)據(jù)執(zhí)行的常用操作。連接方式主要有:內(nèi)連接、左外連接、右外連接。

val storeAddress = sc.parallelize(Seq((Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"), (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")))
val storeRating = sc.parallelize(Seq(Store("Ritual"), 4.9), (Store("Philz"), 4.8)))
# 內(nèi)連接
storeAddress.join(storeRating)
#左外連接
storeAddress.leftOuterJoin(storeRating)
#右外連接
storeAddress.rightOuterJoin(storeRating)

排序

將數(shù)據(jù)排序輸出是很常見的場景。sortByKey()函數(shù)接收一個(gè)叫做ascending的參數(shù),表示是否讓結(jié)果升序排序(默認(rèn)true)。有時(shí),也可以提供自定義比較函數(shù)。比如,以字符串順序?qū)φ麛?shù)進(jìn)行自定義排序。

scala
implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
java
class IntegerComparator implements Comparator<Integer> {
    public int compare(Integer a, Integer b) {
        return String.valueOf(a).compareTo(String.valueOf(b))
    }
}
rdd.sortByKey(new IntegerComparator());
python
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))

Pair RDD行動(dòng)操作

和轉(zhuǎn)化操作一樣,所有基礎(chǔ)RDD支持的行動(dòng)操作也都在pair RDD上可用。另外,Pair RDD提供了一些額外的行動(dòng)操作。

函數(shù) 作用 示例
countByKey對(duì)每個(gè)鍵對(duì)應(yīng)的元素分別計(jì)數(shù)rdd.countByKey()
collectAsMap將結(jié)果以映射表的形式返回rdd.collectAsMap()
lookup(key)返回指定鍵對(duì)應(yīng)的所有值rdd.lookup(3)

忠于技術(shù),熱愛分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。

6.spark core之鍵值對(duì)操作

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

分享題目:6.sparkcore之鍵值對(duì)操作-創(chuàng)新互聯(lián)
URL分享:http://bm7419.com/article42/gijhc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站、靜態(tài)網(wǎng)站、品牌網(wǎng)站設(shè)計(jì)、定制開發(fā)、App設(shè)計(jì)、企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)