生產(chǎn)常用Spark累加器剖析之一

由于最近在項(xiàng)目中需要用到Spark的累加器,同時(shí)需要自己去自定義實(shí)現(xiàn)Spark的累加器,從而滿足生產(chǎn)上的需求。對(duì)此,對(duì)Spark的累加器實(shí)現(xiàn)機(jī)制進(jìn)行了追蹤學(xué)習(xí)。

專(zhuān)注于為中小企業(yè)提供成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、成都外貿(mào)網(wǎng)站建設(shè)公司服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)利州免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了1000+企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

本系列文章,將從以下幾個(gè)方面入手,對(duì)Spark累加器進(jìn)行剖析:

  1. Spark累加器的基本概念

  2. 累加器的重點(diǎn)類(lèi)構(gòu)成

  3. 累加器的源碼解析

  4. 累加器的執(zhí)行過(guò)程

  5. 累加器使用中的坑

  6. 自定義累加器的實(shí)現(xiàn)

    Spark累加器基本概念

Spark提供的Accumulator,主要用于多個(gè)節(jié)點(diǎn)對(duì)一個(gè)變量進(jìn)行共享性的操作。Accumulator只提供了累加的功能,只能累加,不能減少累加器只能在Driver端構(gòu)建,并只能從Driver端讀取結(jié)果,在Task端只能進(jìn)行累加。

至于這里為什么只能在Task累加呢?下面的內(nèi)容將會(huì)進(jìn)行詳細(xì)的介紹,先簡(jiǎn)單介紹下:

在Task節(jié)點(diǎn),準(zhǔn)確的就是說(shuō)在executor上;
每個(gè)Task都會(huì)有一個(gè)累加器的變量,被序列化傳輸?shù)絜xecutor端運(yùn)行之后再返回過(guò)來(lái)都是獨(dú)立運(yùn)行的;
如果在Task端去獲取值的話,只能獲取到當(dāng)前Task的,Task與Task之間不會(huì)有影響

累加器不會(huì)改變Spark lazy計(jì)算的特點(diǎn),只會(huì)在Job觸發(fā)的時(shí)候進(jìn)行相關(guān)的累加操作

現(xiàn)有累加器類(lèi)型:
生產(chǎn)常用Spark累加器剖析之一

累加器的重點(diǎn)類(lèi)介紹

class Accumulator extends Accumulable

源碼(源碼中已經(jīng)對(duì)這個(gè)類(lèi)的作用做了十分詳細(xì)的解釋?zhuān)?/p>

/**
 * A simpler value of [[Accumulable]] where the result type being accumulated is the same
 * as the types of elements being merged, i.e. variables that are only "added" to through an
 * associative operation and can therefore be efficiently supported in parallel. They can be used
 * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
 * value types, and programmers can add support for new types.
 *
 * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
 * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
 * However, they cannot read its value. Only the driver program can read the accumulator's value,
 * using its value method.
 *
 * @param initialValue initial value of accumulator
 * @param param helper object defining how to add elements of type `T`
 * @tparam T result type
 */
class Accumulator[T] private[spark] (
    @transient private[spark] val initialValue: T,
    param: AccumulatorParam[T],
    name: Option[String],
    internal: Boolean)
  extends Accumulable[T, T](initialValue, param, name, internal) {
  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
    this(initialValue, param, name, false)
  }
  def this(initialValue: T, param: AccumulatorParam[T]) = {
    this(initialValue, param, None, false)
  }
}
主要實(shí)現(xiàn)了累加器的初始化及封裝了相關(guān)的累加器操作方法
同時(shí)在類(lèi)對(duì)象構(gòu)建的時(shí)候向Accumulators注冊(cè)累加器
累加器的add操作的返回值類(lèi)型和傳入進(jìn)去的值類(lèi)型可以不一樣
所以一定要定義好兩步操作(即add方法):累加操作/合并操作

object Accumulators

該方法在Driver端管理著累加器,也包含了累加器的聚合操作

trait AccumulatorParam[T] extends AccumulableParam[T, T]

源碼:

/**
 * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
 * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
 * available when you create Accumulators of a specific type.
 *
 * @tparam T type of value to accumulate
 */
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  def addAccumulator(t1: T, t2: T): T = {
    addInPlace(t1, t2)
  }
}
AccumulatorParam的addAccumulator操作的泛型封裝
具體的實(shí)現(xiàn)還是需要在具體實(shí)現(xiàn)類(lèi)里面實(shí)現(xiàn)addInPlace方法
自定義實(shí)現(xiàn)累加器的關(guān)鍵

object AccumulatorParam

源碼:

object AccumulatorParam {
  // The following implicit objects were in SparkContext before 1.2 and users had to
  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  // them automatically. However, as there are duplicate codes in SparkContext for backward
  // compatibility, please update them accordingly if you modify the following implicit objects.
  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
    def zero(initialValue: Double): Double = 0.0
  }
  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
    def zero(initialValue: Int): Int = 0
  }
  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
    def zero(initialValue: Long): Long = 0L
  }
  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
    def zero(initialValue: Float): Float = 0f
  }
  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
}
從源碼中大量的implicit關(guān)鍵詞,可以發(fā)現(xiàn)該類(lèi)主要進(jìn)行隱式類(lèi)型轉(zhuǎn)換的操作

TaskContextImpl

在Executor端管理著我們的累加器,累加器是通過(guò)該類(lèi)進(jìn)行返回的

累加器的源碼解析

Driver端

??accumulator方法

以下列這段代碼中的accumulator方法為入口點(diǎn),進(jìn)入到相應(yīng)的源碼中去

val acc = new Accumulator(initialValue, param, Some(name))

源碼:

class Accumulator[T] private[spark] (
    @transient private[spark] val initialValue: T,
    param: AccumulatorParam[T],
    name: Option[String],
    internal: Boolean)
  extends Accumulable[T, T](initialValue, param, name, internal) {
  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
    this(initialValue, param, name, false)
  }
  def this(initialValue: T, param: AccumulatorParam[T]) = {
    this(initialValue, param, None, false)
  }
}

??繼承的Accumulable[T, T]

源碼:

class Accumulable[R, T] private[spark] (
    initialValue: R,
    param: AccumulableParam[R, T],
    val name: Option[String],
    internal: Boolean)
  extends Serializable {
…
// 這里的_value并不支持序列化
// 注:有@transient的都不會(huì)被序列化
@volatile @transient private var value_ : R = initialValue // Current value on master
  …
  // 注冊(cè)了當(dāng)前的累加器
  Accumulators.register(this)
  …,
  }

??Accumulators.register()

源碼:

// 傳入?yún)?shù),注冊(cè)累加器
def register(a: Accumulable[_, _]): Unit = synchronized {
// 構(gòu)造成WeakReference
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}

至此,Driver端的初始化已經(jīng)完成

Executor端

Executor端的反序列化是一個(gè)得到我們的對(duì)象的過(guò)程
初始化是在反序列化的時(shí)候就完成的,同時(shí)反序列化的時(shí)候還完成了Accumulator向TaskContextImpl的注冊(cè)

??TaskRunner中的run方法

// 在計(jì)算的過(guò)程中,會(huì)將RDD和function經(jīng)過(guò)序列化之后傳給Executor端
private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {
...
  class TaskRunner(
      execBackend: ExecutorBackend,
      val taskId: Long,
      val attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer)
    extends Runnable {
…
override def run(): Unit = {
    …
val (value, accumUpdates) = try {
         // 調(diào)用TaskRunner中的task.run方法,觸發(fā)task的運(yùn)行
         val res = task.run(
           taskAttemptId = taskId,
           attemptNumber = attemptNumber,
           metricsSystem = env.metricsSystem)
         threwException = false
         res
       } finally {
        …
       }
…
}

??Task中的collectAccumulators()方法

private[spark] abstract class Task[T](
final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem)
  : (T, AccumulatorUpdates) = {
  …
    try {
      // 返回累加器,并運(yùn)行task
      // 調(diào)用TaskContextImpl的collectAccumulators,返回值的類(lèi)型為一個(gè)Map
      (runTask(context), context.collectAccumulators())
    } finally {
  …
 }
 …
 }
)

??ResultTask中的runTask方法

override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  // 反序列化是在調(diào)用ResultTask的runTask方法的時(shí)候做的
  // 會(huì)反序列化出來(lái)RDD和自己定義的function
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  metrics = Some(context.taskMetrics)
  func(context, rdd.iterator(partition, context))
}

??Accumulable中的readObject方法

// 在反序列化的過(guò)程中會(huì)調(diào)用Accumulable.readObject方法
  // Called by Java when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    // value的初始值為zero;該值是會(huì)被序列化的
    value_ = zero
    deserialized = true
    // Automatically register the accumulator when it is deserialized with the task closure.
    //
    // Note internal accumulators sent with task are deserialized before the TaskContext is created
    // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
    // metrics, still need to register here.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      // 當(dāng)前反序列化所得到的對(duì)象會(huì)被注冊(cè)到TaskContext中
      // 這樣TaskContext就可以獲取到累加器
      // 任務(wù)運(yùn)行結(jié)束之后,就可以通過(guò)context.collectAccumulators()返回給executor
      taskContext.registerAccumulator(this)
    }
  }

??Executor.scala

// 在executor端拿到accumuUpdates值之后,會(huì)去構(gòu)造一個(gè)DirectTaskResult
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
…
// 最終由ExecutorBackend的statusUpdate方法發(fā)送至Driver端
// ExecutorBackend為一個(gè)Trait,有多種實(shí)現(xiàn)
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

??CoarseGrainedExecutorBackend中的statusUpdate方法

// 通過(guò)ExecutorBackend的一個(gè)實(shí)現(xiàn)類(lèi):CoarseGrainedExecutorBackend 中的statusUpdate方法
// 將數(shù)據(jù)發(fā)送至Driver端
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

??CoarseGrainedSchedulerBackend中的receive方法

// Driver端在接收到消息之后,會(huì)調(diào)用CoarseGrainedSchedulerBackend中的receive方法
override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        // 會(huì)在DAGScheduler的handleTaskCompletion方法中將結(jié)果返回
        scheduler.statusUpdate(taskId, state, data.value)
    …
}

??TaskSchedulerImpl的statusUpdate方法

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
  …
            if (state == TaskState.FINISHED) {
              taskSet.removeRunningTask(tid)
              // 將成功的Task入隊(duì)
              taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
            } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
              taskSet.removeRunningTask(tid)
              taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
            }
  …
}

??TaskResultGetter的enqueueSuccessfulTask方法

def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
…
          result.metrics.setResultSize(size)
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
…

??TaskSchedulerImpl的handleSuccessfulTask方法

def handleSuccessfulTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      taskResult: DirectTaskResult[_]): Unit = synchronized {
    taskSetManager.handleSuccessfulTask(tid, taskResult)
  }

??DAGScheduler的taskEnded方法

def taskEnded(
     task: Task[_],
     reason: TaskEndReason,
     result: Any,
     accumUpdates: Map[Long, Any],
     taskInfo: TaskInfo,
     taskMetrics: TaskMetrics): Unit = {
 eventProcessLoop.post(
     // 給自身的消息循環(huán)體發(fā)了個(gè)CompletionEvent
     // 這個(gè)CompletionEvent會(huì)被handleTaskCompletion方法所接收到
     CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
 }

??DAGScheduler的handleTaskCompletion方法

// 與上述CoarseGrainedSchedulerBackend中的receive方法章節(jié)對(duì)應(yīng)
// 在handleTaskCompletion方法中,接收CompletionEvent
// 不論是ResultTask還是ShuffleMapTask都會(huì)去調(diào)用updateAccumulators方法,更新累加器的值
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
    …
    event.reason match {
      case Success =>
        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
          event.reason, event.taskInfo, event.taskMetrics))
        stage.pendingPartitions -= task.partitionId
        task match {
          case rt: ResultTask[_, _] =>
            // Cast to ResultStage here because it's part of the ResultTask
            // TODO Refactor this out to a function that accepts a ResultStage
            val resultStage = stage.asInstanceOf[ResultStage]
            resultStage.activeJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  updateAccumulators(event)
          case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            updateAccumulators(event)
}
…
}

??DAGScheduler的updateAccumulators方法

private def updateAccumulators(event: CompletionEvent): Unit = {
   val task = event.task
   val stage = stageIdToStage(task.stageId)
   if (event.accumUpdates != null) {
     try {
       // 調(diào)用了累加器的add方法
       Accumulators.add(event.accumUpdates)

??Accumulators的add方法

def add(values: Map[Long, Any]): Unit = synchronized {
    // 遍歷傳進(jìn)來(lái)的值
    for ((id, value) <- values) {
      if (originals.contains(id)) {
        // Since we are now storing weak references, we must check whether the underlying data
        // is valid.
        // 根據(jù)id從注冊(cè)的Map中取出對(duì)應(yīng)的累加器
        originals(id).get match {
          // 將值給累加起來(lái),最終將結(jié)果加到value里面
          // ++=是被重載了
          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
          case None =>
            throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
        }
      } else {
        logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
      }
    }
  }

??Accumulators的++=方法

def ++= (term: R) { value_ = param.addInPlace(value_, term)}

??Accumulators的value方法

def value: R = {
   if (!deserialized) {
     value_
   } else {
     throw new UnsupportedOperationException("Can't read accumulator value in task")
   }
 }

此時(shí)我們的應(yīng)用程序就可以通過(guò) .value 的方式去獲取計(jì)數(shù)器的值了

分享名稱(chēng):生產(chǎn)常用Spark累加器剖析之一
文章鏈接:http://bm7419.com/article32/igsspc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)App開(kāi)發(fā)、域名注冊(cè)全網(wǎng)營(yíng)銷(xiāo)推廣、網(wǎng)站維護(hù)建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作