SparkCore的RDD

(1)RDD的介紹

???Spark Core 的RDD
?RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變(RDD中的數(shù)據(jù),不能增刪改),可分區(qū)、元素可并行計算的集合。
?具有數(shù)據(jù)流的模型的特點,自動容錯、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個查詢時顯示的將工作集緩存在內(nèi)存中。后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。
?RDD可以從 三方面理解:
?? - 數(shù)據(jù)集:RDD是數(shù)據(jù)集合的抽象,是復雜物理介質(zhì)上存在數(shù)據(jù)的一種邏輯視圖。從外部看RDD的確可以被看待成經(jīng)過封裝,帶擴展特性(如容錯性)的數(shù)據(jù)集合。
?? - 分布式:RDD的數(shù)據(jù)可能存儲在多個節(jié)點的磁盤上或者內(nèi)存中,也就是所謂的多級存儲。
?? - 彈性:雖然 RDD 內(nèi)部存儲的數(shù)據(jù)是只讀的,但是,我們可以去修改(例如通 過 repartition 轉(zhuǎn)換操作)并行計算計算單元的劃分結構,也就是分區(qū)的數(shù)量。
?總之:RDD就是一個大集合,將所有的數(shù)據(jù)都加載到內(nèi)存中,方便多次進行重用。它的數(shù)據(jù)可以在多個節(jié)點上,并且RDD可以保存在內(nèi)存中,當如果某個階段的RDD丟失,不需要重新計算,只需要提取上一次的RDD,在相應的計算即可。

成都創(chuàng)新互聯(lián)公司是一家集網(wǎng)站設計、成都網(wǎng)站建設、網(wǎng)站頁面設計、網(wǎng)站優(yōu)化SEO優(yōu)化為一體的專業(yè)網(wǎng)絡公司,已為成都等多地近百家企業(yè)提供網(wǎng)站建設服務。追求良好的瀏覽體驗,以探求精品塑造與理念升華,設計最適合用戶的網(wǎng)站頁面。 合作只是第一步,服務才是根本,我們始終堅持講誠信,負責任的原則,為您進行細心、貼心、認真的服務,與眾多客戶在蓬勃發(fā)展的市場環(huán)境中,互促共生。

(2)RDD的屬性

??Spark Core 的RDD

?1)A list of partitions(一組分片,數(shù)據(jù)集的基本單位)

??一個分區(qū)通常與一個任務向關聯(lián),分區(qū)的個數(shù)決定了并行的粒度。分區(qū)的個數(shù)可以在創(chuàng)建RDD的時候指定,如果不指定,那么默認的由節(jié)點的cores個數(shù)決定。最終每一個分區(qū)會被映射成為BlockManager 中的一個Block,而這個Block會被下一個task使用進行計算。

?2)A function for computing each split(算子)

??每一個RDD都會實現(xiàn)compute,用于分區(qū)進行計算

?3)A list of dependencies on other RDDs(RDD之間的依賴)

??RDD 的每次轉(zhuǎn)換都會生成一個新的 RDD,所以 RDD 之間就會形成類似于流水線一樣的前后依賴關系。在部分分區(qū)數(shù)據(jù)丟失時,Spark 可以通過這個依賴關系重新計算丟失的分區(qū)數(shù)據(jù), 而不是對 RDD 的所有分區(qū)進行重新計算。
寬依賴和窄依賴
Spark Core 的RDD
窄依賴(完全依賴):一個父分區(qū)唯一對應一個子分區(qū),例:map操作
寬依賴(部分依賴):一個父分區(qū)對應多個子分區(qū),如:reduce、group操作
區(qū)分寬依賴和窄依賴:當前這個算子的執(zhí)行過程中是否有shuffle操作。

?4)Optionally a Partitioner for key-value RDDs(分區(qū)函數(shù))

??當前 Spark 中實現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的 HashPartitioner,另外一個是基于范圍的 RangePartitioner。只有對于 key-value 的 RDD,才會有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函數(shù)不但決定了 RDD 本身的分片數(shù)量,也決 定了 parent RDD Shuffle 輸出時的分片數(shù)量。

?5)Optionally a list of preferred locations to compute each split on

??一個列表,存儲存取每個 Partition 的優(yōu)先位置(preferred location)。按照”移動數(shù)據(jù)不如移動計算”的理念,Spark 在進行任務調(diào)度的時候,會盡可能地將計算任務分配到其所要處理數(shù)據(jù)塊的存儲位置。而這個列表中就存放著每個分區(qū)的優(yōu)先位置。

(3)RDD的API(相關算子)

??RDD編程中有兩種中形式:Transformation(轉(zhuǎn)換)和Action(行動)。
?Transformation:表示把一個RDD ---->RDD。
?Action:表示把RDD----?集合或者scala對象。

?1)RDD的創(chuàng)建:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext()
        //由一個已經(jīng)存在的 Scala 數(shù)據(jù)集合創(chuàng)建
        val arr=Array(1,2,3,4)
        val arr1RDD: RDD[Int] = sc.parallelize(arr)
        val arr2RDD: RDD[Int] = sc.makeRDD(arr)

        //由外部存儲系統(tǒng)的數(shù)據(jù)創(chuàng)建(HDFS、HBase...)
        val HDFSRDD: RDD[String] = sc.textFile("/data/input")
    }
}

?2)Transformation:

??官網(wǎng):http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
?注意:RDD中所有的轉(zhuǎn)換(Transformation)都是延遲加載,也就是說,他們并不是直接計算結果,相反的,他們只是記住這些應用到基礎數(shù)據(jù)集,上的一個轉(zhuǎn)換動作,只有當發(fā)生一個要求返回一個Driver動作的時候,這些轉(zhuǎn)換才真正運行。

map()算子

        val HDFSRDD: RDD[String] = sc.textFile("/data/input")
        /**
          * map 算子,返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過function函數(shù)轉(zhuǎn)換后組成
          */
        val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))

flatMap()算子:

val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs")
        val lineRDD: RDD[String] = sc.parallelize(arr)
        /**
          * flagMap:類似于map,但是每一個元素輸入的元素可以被
          * 映射成為0個或者多個輸出的元素(返回的是一個序列,而不是單一的元素)
          */
        //返回一個集合hive hbase hadoop spark hadoop yarn hdfs
        val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))

filter()算子:

        val arr=Array(1,2,3,4,5)
        val arrRDD: RDD[Int] = sc.parallelize(arr)
        /**
          * filter過濾:返回一個新的RDD,該RDD由經(jīng)過func函數(shù)計算后返回
          * 值為true的輸入元素組成
          */
        val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)

mapPartitions()算子:

        val hdfsRDD: RDD[String] = sc.textFile("/data/input")
        /**
          * mapPartitions與map的唯一區(qū)別就是,mapPartitions迭代的是一個分區(qū),
          * 而map遍歷的每一個元素,mapPartitions參數(shù)是一個迭代對象,返回的也是一個迭代對象
          */
        val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => {
            val temp = x.toList.map(line => line + "!")
            temp.toIterator
        })

mapPartitionsWithIndex()算子:

        val hdfsRDD: RDD[String] = sc.textFile("/data/input")
        /**
          * 第一個參數(shù)是分區(qū)編號:分區(qū)編號是從0開始的不間斷的連續(xù)編號
          * 第二個參數(shù)和mapPartitions相同
          */
        val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => {
            println(parnum) //分區(qū)編號
            val temp = x.toList.map(line => line + "!")
            temp.toIterator
        })

sample()算子:

        val list=1 to 5000
        /**
          * sample方法有三個參數(shù):
          * withReplacement:代表是否有放回的抽?。╢alse 不放回,true:放回)
          * fraction:抽取樣本空間占總體的比例,(以分數(shù)的形式) 0<=fraction <=1
          * seed:隨機數(shù)生成器,new Random().nextInt(10),不傳表示使用系統(tǒng)的
          * 注意:我們使用的sample算子,不能保證提供集合大小就恰巧是rdd.size()*fraction,結果大小會上下浮動
          * sample在做抽樣調(diào)查的時候,特別受用
          */
        val listRDD: RDD[Int] = sc.parallelize(list)
        val sampleRDD: RDD[Int] = listRDD.sample(false,0.2)
        println(sampleRDD.count())  //大概是5000*0.2 上下浮動

groupByKey()算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * groupByKey,分組
          * 建議groupByKey在實踐中,能不用就不用,主要因為groupByKey的效率低,
          * 因為有大量的數(shù)據(jù)在網(wǎng)絡中傳輸,而且還沒有進行本地的預處理
          * 可以使用reduceByKey或者aggregateByKey或者combineByKey代替這個groupByKey
          */

        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        //分組
        val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey()
        //求平均值
        val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => {
            val avg = score.sum.toDouble / (score.size)
            (name, avg)
        }
        }

reduceByKey算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * reduceByKey:在一個(K,V)對的數(shù)據(jù)集上使用,返回一個(K,V)對的數(shù)據(jù)
          * 集,key 相同的值,都被使用指定的 reduce 函數(shù)聚合
          * 到一起。和 groupByKey 類似,任務的個數(shù)是可以通過
          * 第二個可選參數(shù)來配置的。
          */
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        //分組,求總分
        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
        sumRDD.foreach(println) //打?。海╤base,36)(math,18)(hbase,18)

sortByKey()算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * sortByKey:在一個(K,V)的 RDD 上調(diào)用,K 必須實現(xiàn) Ordered 接口,
          * 返回一個按照 key 進行排序的(K,V)的 RDD
          */

        //分組,求總分,排序
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
        sumRDD.foreach(println) //打?。海╤base,36)(math,18)(hbase,18)
        val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1))
        sortRDD.foreach(println)

sortBy算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * sortBy(func,[ascending], [numTasks])
          * 與 sortByKey 類似,但是更靈活
          * 第一個參數(shù)是根據(jù)什么排序
          * 第二個是怎么排序,true 正序,false 倒序
          * 第三個排序后分區(qū)數(shù),默認與原 RDD 一樣
          */
        //分組,求總分,排序
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
        sumRDD.foreach(println) //打?。海╤base,36)(math,18)(hbase,18)
        val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)

aggregateByKey()算子:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext()
        /**
          * aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
          * 先按分區(qū)聚合再總的聚合,每次要跟初始值交流
          * zeroValue:初始值
          * seqOp:迭代操作,拿RDD中的每一個元素跟初始值進行合并
          * combOp:分區(qū)結果的最終合并
          * numTasks:分區(qū)個數(shù)
          * aggregate+groupByKey=aggregateByKey
          * aggregate對單個值進行RDD,aggregateByKey對(K,V)值進行RDD
          */
        //aggregate
        val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val listRDD: RDD[Int] = sc.parallelize(list)
        //求平均值
        /**
          * seqOp: (U, T) => U
          * combOp: (U, U) => U
          * u:(Int,Int)   總和,總次數(shù)
          */
        val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => {
            (u._1 + x, u._2 + 1)
        }
            , (u1: (Int, Int), u2: (Int, Int)) => {
                (u1._1 + u2._1, u1._2 + u2._2)
            })
        println(result._1 / result._2)

        //aggregateByKey已經(jīng)根據(jù)(k,v)k 進行分組,以下的操作,是對v進行操作
        //以下操作時求平均值
        val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list1)
        val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => {
            (x._1 + y, x._2 + 1)
        }, (x: (Int, Int), y: (Int, Int)) => {
            (x._1 + y._1, x._2 + y._2)
        })
        reslutRDD2.foreach(kv=>{
            val name=kv._1
            val avg=kv._2._1.toDouble/kv._2._2
        })
    }
}

foldLeft()算子:(不是spark的算子,是scala的高級操作)

        /**
          *  foldLeft
          * (zeroValue: T)  初值值
          * (B, A) => B  B是一個元組,B._1 表示累加元素,B._2 表示個數(shù), A 表示下一個元素
          */

        //aggregate
        val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)})
        println(result._1.toDouble/result._2)

combineByKey()算子:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        /**
          * combineByKey:
          * 合并相同的 key 的值 rdd1.combineByKey(x => x, (a: Int,
          * b: Int) => a + b, (m: Int, n: Int) => m + n)
          */
        //求平均值
        val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list1)
        /**
          * createCombiner: V => C,
          * mergeValue: (C, V) => C,
          * mergeCombiners: (C, C) => C): RDD[(K, C)]
          */
        val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => {
            (x, 1)
        },
            (x: (Int, Int), y: Int) => {
                (x._1 + y, x._2 + 1)
            },
            (x: (Int, Int), y: (Int, Int)) => {
                (x._1 + y._1, x._2 + y._2)
            })
        resultRDD.foreach{case (name,(sum,count))=>{
            val avg=sum.toDouble/count
            println(s"${name}:${avg}")
        }}
    }
}

連接操作

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val arr1 = Array(1, 2, 4, 5)
        val arr1RDD = sc.parallelize(arr1)
        val arr2 = Array(4, 5, 6, 7)
        val arr2RDD = sc.parallelize(arr2)
        //cartesian  笛卡爾積
        val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD)
        //union : 連接
        val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD)
        //subtract,求,差集
        val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD)

        //join
        val list1 = List(("a", 1), ("b", 2), ("c", 3))
        val list1RDD = sc.parallelize(list1)
        val list2 = List(("a", "zs"), ("b", "sl"))
        val list2RDD = sc.parallelize(list2)
        /**
          * 根據(jù)元組中的key進行join 操作,相同的key向連接
          * 返回的是RDD[(String, (Int, String))] (key,連接結果)
          */
        val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD)

        //cogroup
        /**
          * (String key   ,
          * (Iterable[Int] arr1中的相應的key所有value的集合
          * , Iterable[String]))  arr2中的相應的key所有value的集合
          */
        val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD)
    }
}

分區(qū)操作

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
        /**
          * 表示在執(zhí)行了filter操作之后,由于大量的數(shù)據(jù)被過濾,導致之前設定的分區(qū)task個數(shù),
          * 處理剩下的數(shù)據(jù)導致資源浪費,為了合理高效的利用資源,
          * 可以對task重新定義,在coalesce方法中的分區(qū)個數(shù)一定要小于之前設置的分區(qū)個數(shù)。
          */
        hdfsRDD.coalesce(2)
        //打亂數(shù)據(jù),重新分區(qū),分區(qū)規(guī)則為隨機分區(qū)
        hdfsRDD.repartition(3)

        //自定義分區(qū)規(guī)則(注意,只在有key-value的RDD中可以使用)
        var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2)
            ("b", 2), ("e", 2)
            , ("b", 2)
            , ("f", 2), ("g", 2), ("h", 2))
        val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4)
        arrRDD.partitionBy(new MyPartitioner(3))

    }
}
class MyPartitioner(val numPTN:Int) extends Partitioner{
    //分區(qū)個數(shù)
    override def numPartitions: Int = numPTN
    //分區(qū)規(guī)則
    override def getPartition(key: Any): Int = {
        val num=key.hashCode()&Integer.MAX_VALUE%numPTN
        return num
    }
}

總結
   - Transformation返回的仍然是一個RDD
   - 它使用了鏈式調(diào)用的設計模式,對一個 RDD 進行計 算后,變換成另外一個 RDD,然后這個 RDD 又可以進行另外一次轉(zhuǎn)換。這個過程是分布式的。

?3)Action:

常見操作

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        //action  rdd ---map
        listRDD.reduceByKeyLocally((x,y)=>x+y)

        //調(diào)用collect的目的是:觸發(fā)所有的計算,最終收集當前這個調(diào)用者RDD的所有數(shù)據(jù),返回到客戶端,如果數(shù)據(jù)量比較大,謹慎使用
        listRDD.collect()

        //統(tǒng)計RDD中有多少記錄
        listRDD.count()
        //取出RDD中的第一條記錄
        listRDD.first()
        //取出RDD前幾條記錄
        listRDD.take(5)
        //隨機采樣
        listRDD.takeSample(false,20)
        //按照某種格式,排序后的前幾條
        listRDD.top(50)
        //按照升序或者降序,取相應的條數(shù)的記錄(其中的元素必須繼承Ordered)
        listRDD.takeOrdered(3)
        //統(tǒng)計每一個key中的value有多少個
        listRDD.countByKey()
        //統(tǒng)計有多少個元素
        listRDD.countByValue()
        //遍歷RDD中每一個元素
        listRDD.foreach(kv=>{})
        //分區(qū)遍歷RDD中的元素
        listRDD.foreachPartition(kv=>{})
        //將RDD的結果,保存到相應的文件系統(tǒng)中(注意這個目錄一定是不存在的目錄)
        listRDD.saveAsTextFile("/data/output")
    }
}

總結:Action返回值不是一個RDD。它要么是一個scala的集合,要么是一個值,要么是空。最終返回到Driver程序,或者把RDD寫入到文件系統(tǒng)中。

網(wǎng)站名稱:SparkCore的RDD
網(wǎng)頁路徑:http://bm7419.com/article32/pcgcsc.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、網(wǎng)站導航網(wǎng)站內(nèi)鏈、虛擬主機網(wǎng)站設計公司、網(wǎng)頁設計公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設計