生產(chǎn)常用Spark累加器剖析之三(自定義累加器)

思路 & 需求

參考IntAccumulatorParam的實現(xiàn)思路(上述文章中有講):

網(wǎng)站建設哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、小程序設計、集團企業(yè)網(wǎng)站建設等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了桂林免費建站歡迎大家使用!

trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  def addAccumulator(t1: T, t2: T): T = {
    // addInPlace有很多具體的實現(xiàn)類
    // 如果想要實現(xiàn)自定義的話,就得實現(xiàn)這個方法
    addInPlace(t1, t2)
  }
}

自定義也可以通過這個方法去實現(xiàn),從而兼容我們自定義的累加器

需求:這里實現(xiàn)一個簡單的案例,用分布式的方法去實現(xiàn)隨機數(shù)

**
  * 自定義的AccumulatorParam
  *
  * Created by lemon on 2018/7/28.
  */
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
  override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
      // ++用于兩個集合相加
      r1++r2
    }
    override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
      var data: Map[Int, Int] = Map()
      data
    }
}
/**
  * 使用自定義的累加器,實現(xiàn)隨機數(shù)
  *
  * Created by lemon on 2018/7/28.
  */
object CustomAccumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
    val distData = sc.parallelize(1 to 10)
    val mapCount = distData.map(x => {
      val randomNum = new Random().nextInt(20)
      // 構造一個k-v對
      val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
      uniqueKeyAccumulator += map
    })
    println(mapCount.count())
    // 獲取到累加器的值 中的key值,并進行打印
    uniqueKeyAccumulator.value.keys.foreach(println)
    sc.stop()
  }
}

運行結果如下圖:## 思路 & 需求

參考IntAccumulatorParam的實現(xiàn)思路(上述文章中有講):

trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  def addAccumulator(t1: T, t2: T): T = {
    // addInPlace有很多具體的實現(xiàn)類
    // 如果想要實現(xiàn)自定義的話,就得實現(xiàn)這個方法
    addInPlace(t1, t2)
  }
}

自定義也可以通過這個方法去實現(xiàn),從而兼容我們自定義的累加器

需求:這里實現(xiàn)一個簡單的案例,用分布式的方法去實現(xiàn)隨機數(shù)

**
  * 自定義的AccumulatorParam
  *
  * Created by lemon on 2018/7/28.
  */
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
  override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
      // ++用于兩個集合相加
      r1++r2
    }
    override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
      var data: Map[Int, Int] = Map()
      data
    }
}
/**
  * 使用自定義的累加器,實現(xiàn)隨機數(shù)
  *
  * Created by lemon on 2018/7/28.
  */
object CustomAccumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
    val distData = sc.parallelize(1 to 10)
    val mapCount = distData.map(x => {
      val randomNum = new Random().nextInt(20)
      // 構造一個k-v對
      val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
      uniqueKeyAccumulator += map
    })
    println(mapCount.count())
    // 獲取到累加器的值 中的key值,并進行打印
    uniqueKeyAccumulator.value.keys.foreach(println)
    sc.stop()
  }
}

運行結果如下圖:
生產(chǎn)常用Spark累加器剖析之三(自定義累加器)

網(wǎng)頁名稱:生產(chǎn)常用Spark累加器剖析之三(自定義累加器)
分享路徑:http://bm7419.com/article8/psojip.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供微信公眾號ChatGPT、、面包屑導航、定制網(wǎng)站、移動網(wǎng)站建設

廣告

聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設