怎么分析Flink與SparkStreaming的對(duì)比

本篇文章為大家展示了怎么分析Flink與Spark Streaming的對(duì)比,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

創(chuàng)新互聯(lián)建站專注于云安網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供云安營(yíng)銷型網(wǎng)站建設(shè),云安網(wǎng)站制作、云安網(wǎng)頁(yè)設(shè)計(jì)、云安網(wǎng)站官網(wǎng)定制、小程序定制開(kāi)發(fā)服務(wù),打造云安網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供云安網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

 前言

流數(shù)據(jù)(或數(shù)據(jù)流)是指在時(shí)間分布和數(shù)量上無(wú)限的一系列動(dòng)態(tài)數(shù)據(jù)集合體,數(shù)據(jù)的價(jià)值隨著時(shí)間的流逝而降低,因此必須實(shí)時(shí)計(jì)算給出秒級(jí)響應(yīng)。流式計(jì)算,顧名思義,就是對(duì)數(shù)據(jù)流進(jìn)行處理,是實(shí)時(shí)計(jì)算。

架構(gòu)對(duì)比

生態(tài)

怎么分析Flink與Spark Streaming的對(duì)比
怎么分析Flink與Spark Streaming的對(duì)比

運(yùn)行模型

Spark Streaming 是微批處理,運(yùn)行的時(shí)候需要指定批處理的時(shí)間,每次運(yùn)行 job 時(shí)處理一個(gè)批次的數(shù)據(jù)

Flink  是基于事件驅(qū)動(dòng)的,事件可以理解為消息。事件驅(qū)動(dòng)的應(yīng)用程序是一種狀態(tài)應(yīng)用程序,它會(huì)從一個(gè)或者多個(gè)流中注入事件,通過(guò)觸發(fā)計(jì)算更新?tīng)顟B(tài),或外部動(dòng)作對(duì)注入的事件作出反應(yīng)。

運(yùn)行角色

Spark Streaming 運(yùn)行時(shí)的角色(standalone 模式)主要有:

Master:主要負(fù)責(zé)整體集群資源的管理和應(yīng)用程序調(diào)度;

Worker:負(fù)責(zé)單個(gè)節(jié)點(diǎn)的資源管理,driver 和 executor 的啟動(dòng)等;

Driver:用戶入口程序執(zhí)行的地方,即 SparkContext 執(zhí)行的地方,主要是 DAG 生成、stage 劃分、task 生成及調(diào)度;

Executor:負(fù)責(zé)執(zhí)行 task,反饋執(zhí)行狀態(tài)和執(zhí)行結(jié)果。

Flink 運(yùn)行時(shí)的角色(standalone 模式)主要有:

Jobmanager: 協(xié)調(diào)分布式執(zhí)行,他們調(diào)度任務(wù)、協(xié)調(diào) checkpoints、協(xié)調(diào)故障恢復(fù)等。至少有一個(gè)  JobManager。高可用情況下可以啟動(dòng)多個(gè) JobManager,其中一個(gè)選舉為 leader,其余為 standby;

Taskmanager: 負(fù)責(zé)執(zhí)行具體的 tasks、緩存、交換數(shù)據(jù)流,至少有一個(gè) TaskManager;

Slot: 每個(gè) task slot 代表 TaskManager 的一個(gè)固定部分資源,Slot 的個(gè)數(shù)代表著 taskmanager 可并行執(zhí)行的  task 數(shù)。

編程模型對(duì)比

編程模型對(duì)比,主要是對(duì)比 flink 和 Spark Streaming 兩者在代碼編寫上的區(qū)別。

Spark Streaming

Spark Streaming 與 kafka 的結(jié)合主要是兩種模型:

  • 基于 receiver dstream;

  • 基于 direct dstream。

以上兩種模型編程機(jī)構(gòu)近似,只是在 api 和內(nèi)部數(shù)據(jù)獲取有些區(qū)別,新版本的已經(jīng)取消了基于 receiver 這種模式,企業(yè)中通常采用基于 direct  Dstream 的模式。

val Array(brokers, topics) = args// 創(chuàng)建一個(gè)批處理時(shí)間是2s的context   val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")   val ssc = new StreamingContext(sparkConf, Seconds(2))   // 使用broker和topic創(chuàng)建DirectStream   val topicsSet = topics.split(",").toSet   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)   val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))   // Get the lines, split them into words, count the words and print   val lines = messages.map(_.value)   val words = lines.flatMap(_.split(" "))   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)   wordCounts.print() // 啟動(dòng)流   ssc.start()   ssc.awaitTermination()

通過(guò)以上代碼我們可以 get 到:

  • 設(shè)置批處理時(shí)間

  • 創(chuàng)建數(shù)據(jù)流

  • 編寫transform

  • 編寫action

  • 啟動(dòng)執(zhí)行

Flink

接下來(lái)看 flink 與 kafka 結(jié)合是如何編寫代碼的。Flink 與 kafka 結(jié)合是事件驅(qū)動(dòng),大家可能對(duì)此會(huì)有疑問(wèn),消費(fèi) kafka 的數(shù)據(jù)調(diào)用  poll 的時(shí)候是批量獲取數(shù)據(jù)的(可以設(shè)置批處理大小和超時(shí)時(shí)間),這就不能叫做事件觸發(fā)了。而實(shí)際上,flink 內(nèi)部對(duì) poll 出來(lái)的數(shù)據(jù)進(jìn)行了整理,然后逐條  emit,形成了事件觸發(fā)的機(jī)制。 下面的代碼是 flink 整合 kafka 作為 data source 和 data sink:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.getConfig().disableSysoutLogging();  env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));  env.enableCheckpointing(5000); // create a checkpoint every 5 seconds  env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);   // ExecutionConfig.GlobalJobParameters  env.getConfig().setGlobalJobParameters(null); DataStream<KafkaEvent> input = env  .addSource( new FlinkKafkaConsumer010<>(  parameterTool.getRequired("input-topic"), new KafkaEventSchema(),  parameterTool.getProperties())  .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance()  .keyBy("word")  .map(new RollingAdditionMapper()).setParallelism(0);    input.addSink( new FlinkKafkaProducer010<>(  parameterTool.getRequired("output-topic"), new KafkaEventSchema(),  parameterTool.getProperties()));    env.execute("Kafka 0.10 Example");

從 Flink 與 kafka 結(jié)合的代碼可以 get 到:

  • 注冊(cè)數(shù)據(jù) source

  • 編寫運(yùn)行邏輯

  • 注冊(cè)數(shù)據(jù) sink

調(diào)用 env.execute 相比于 Spark Streaming 少了設(shè)置批處理時(shí)間,還有一個(gè)顯著的區(qū)別是 flink 的所有算子都是 lazy  形式的,調(diào)用 env.execute 會(huì)構(gòu)建 jobgraph。client 端負(fù)責(zé) Jobgraph 生成并提交它到集群運(yùn)行;而 Spark  Streaming的操作算子分 action 和 transform,其中僅有 transform 是 lazy 形式,而且 DAG 生成、stage  劃分、任務(wù)調(diào)度是在 driver 端進(jìn)行的,在 client 模式下 driver 運(yùn)行于客戶端處。

任務(wù)調(diào)度原理

Spark 任務(wù)調(diào)度

Spark Streaming 任務(wù)如上文提到的是基于微批處理的,實(shí)際上每個(gè)批次都是一個(gè) Spark Core 的任務(wù)。對(duì)于編碼完成的 Spark  Core 任務(wù)在生成到最終執(zhí)行結(jié)束主要包括以下幾個(gè)部分:

  • 構(gòu)建 DAG 圖;

  • 劃分 stage;

  • 生成 taskset;

  • 調(diào)度 task。

具體可參考圖 5:

怎么分析Flink與Spark Streaming的對(duì)比

對(duì)于 job 的調(diào)度執(zhí)行有 fifo 和 fair 兩種模式,Task 是根據(jù)數(shù)據(jù)本地性調(diào)度執(zhí)行的。 假設(shè)每個(gè) Spark Streaming 任務(wù)消費(fèi)的  kafka topic 有四個(gè)分區(qū),中間有一個(gè) transform操作(如 map)和一個(gè) reduce 操作,如圖 6 所示:

怎么分析Flink與Spark Streaming的對(duì)比

假設(shè)有兩個(gè) executor,其中每個(gè) executor 三個(gè)核,那么每個(gè)批次相應(yīng)的 task 運(yùn)行位置是固定的嗎?是否能預(yù)測(cè)?  由于數(shù)據(jù)本地性和調(diào)度不確定性,每個(gè)批次對(duì)應(yīng) kafka 分區(qū)生成的 task 運(yùn)行位置并不是固定的。

Flink 任務(wù)調(diào)度

對(duì)于 flink 的流任務(wù)客戶端首先會(huì)生成 StreamGraph,接著生成 JobGraph,然后將 jobGraph 提交給 Jobmanager  由它完成 jobGraph 到 ExecutionGraph 的轉(zhuǎn)變,最后由 jobManager 調(diào)度執(zhí)行。

怎么分析Flink與Spark Streaming的對(duì)比

如圖 7 所示有一個(gè)由 data source、MapFunction和 ReduceFunction 組成的程序,data source 和  MapFunction 的并發(fā)度都為 4,而 ReduceFunction 的并發(fā)度為 3。一個(gè)數(shù)據(jù)流由 Source-Map-Reduce 的順序組成,在具有  2 個(gè)TaskManager、每個(gè) TaskManager 都有 3 個(gè) Task Slot 的集群上運(yùn)行。

可以看出 flink 的拓?fù)渖商峤粓?zhí)行之后,除非故障,否則拓?fù)洳考?zhí)行位置不變,并行度由每一個(gè)算子并行度決定,類似于 storm。而 spark  Streaming 是每個(gè)批次都會(huì)根據(jù)數(shù)據(jù)本地性和資源情況進(jìn)行調(diào)度,無(wú)固定的執(zhí)行拓?fù)浣Y(jié)構(gòu)。 flink 是數(shù)據(jù)在拓?fù)浣Y(jié)構(gòu)里流動(dòng)執(zhí)行,而 Spark  Streaming 則是對(duì)數(shù)據(jù)緩存批次并行處理。

時(shí)間機(jī)制對(duì)比

流處理的時(shí)間

流處理程序在時(shí)間概念上總共有三個(gè)時(shí)間概念:

  • 處理時(shí)間

處理時(shí)間是指每臺(tái)機(jī)器的系統(tǒng)時(shí)間,當(dāng)流程序采用處理時(shí)間時(shí)將使用運(yùn)行各個(gè)運(yùn)算符實(shí)例的機(jī)器時(shí)間。處理時(shí)間是最簡(jiǎn)單的時(shí)間概念,不需要流和機(jī)器之間的協(xié)調(diào),它能提供最好的性能和最低延遲。然而在分布式和異步環(huán)境中,處理時(shí)間不能提供消息事件的時(shí)序性保證,因?yàn)樗艿较鬏斞舆t,消息在算子之間流動(dòng)的速度等方面制約。

  • 事件時(shí)間

事件時(shí)間是指事件在其設(shè)備上發(fā)生的時(shí)間,這個(gè)時(shí)間在事件進(jìn)入 flink 之前已經(jīng)嵌入事件,然后 flink  可以提取該時(shí)間?;谑录r(shí)間進(jìn)行處理的流程序可以保證事件在處理的時(shí)候的順序性,但是基于事件時(shí)間的應(yīng)用程序必須要結(jié)合 watermark  機(jī)制?;谑录r(shí)間的處理往往有一定的滯后性,因?yàn)樗枰却罄m(xù)事件和處理無(wú)序事件,對(duì)于時(shí)間敏感的應(yīng)用使用的時(shí)候要慎重考慮。

  • 注入時(shí)間

注入時(shí)間是事件注入到 flink 的時(shí)間。事件在 source 算子處獲取 source  的當(dāng)前時(shí)間作為事件注入時(shí)間,后續(xù)的基于時(shí)間的處理算子會(huì)使用該時(shí)間處理數(shù)據(jù)。

相比于事件時(shí)間,注入時(shí)間不能夠處理無(wú)序事件或者滯后事件,但是應(yīng)用程序無(wú)序指定如何生成  watermark。在內(nèi)部注入時(shí)間程序的處理和事件時(shí)間類似,但是時(shí)間戳分配和 watermark 生成都是自動(dòng)的。

圖 8 可以清晰地看出三種時(shí)間的區(qū)別:

怎么分析Flink與Spark Streaming的對(duì)比

Spark 時(shí)間機(jī)制

Spark Streaming 只支持處理時(shí)間,Structured streaming 支持處理時(shí)間和事件時(shí)間,同時(shí)支持 watermark  機(jī)制處理滯后數(shù)據(jù)。

Flink 時(shí)間機(jī)制

flink 支持三種時(shí)間機(jī)制:事件時(shí)間,注入時(shí)間,處理時(shí)間,同時(shí)支持 watermark 機(jī)制處理滯后數(shù)據(jù)。

kafka 動(dòng)態(tài)分區(qū)檢測(cè)

Spark Streaming

對(duì)于有實(shí)時(shí)處理業(yè)務(wù)需求的企業(yè),隨著業(yè)務(wù)增長(zhǎng)數(shù)據(jù)量也會(huì)同步增長(zhǎng),將導(dǎo)致原有的 kafka 分區(qū)數(shù)不滿足數(shù)據(jù)寫入所需的并發(fā)度,需要擴(kuò)展 kafka  的分區(qū)或者增加 kafka 的 topic,這時(shí)就要求實(shí)時(shí)處理程序,如 SparkStreaming、flink 能檢測(cè)到 kafka 新增的 topic  、分區(qū)及消費(fèi)新增分區(qū)的數(shù)據(jù)。

接下來(lái)結(jié)合源碼分析,Spark Streaming 和 flink 在 kafka 新增 topic 或 partition  時(shí)能否動(dòng)態(tài)發(fā)現(xiàn)新增分區(qū)并消費(fèi)處理新增分區(qū)的數(shù)據(jù)。 Spark Streaming 與 kafka 結(jié)合有兩個(gè)區(qū)別比較大的版本,如圖 9  所示是官網(wǎng)給出的對(duì)比數(shù)據(jù):

怎么分析Flink與Spark Streaming的對(duì)比

其中確認(rèn)的是 Spark Streaming 與 kafka 0.8 版本結(jié)合不支持動(dòng)態(tài)分區(qū)檢測(cè),與 0.10 版本結(jié)合支持,接著通過(guò)源碼分析。

Spark Streaming 與 kafka 0.8 版本結(jié)合

*源碼分析只針對(duì)分區(qū)檢測(cè)

入口是 DirectKafkaInputDStream 的 compute:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {// 改行代碼會(huì)計(jì)算這個(gè)job,要消費(fèi)的每個(gè)kafka分區(qū)的最大偏移  val untilOffsets = clamp(latestLeaderOffsets(maxRetries))// 構(gòu)建KafkaRDD,用指定的分區(qū)數(shù)和要消費(fèi)的offset范圍  val rdd = KafkaRDD[K, V, U, T, R](  context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.  val offsetRanges = currentOffsets.map { case (tp, fo) =>  val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset)  } val description = offsetRanges.filter { offsetRange =>  // Don't display empty ranges.  offsetRange.fromOffset != offsetRange.untilOffset  }.map { offsetRange =>  s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +  s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"  }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user  val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata)  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)   currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd)  }

第一行就是計(jì)算得到該批次生成 KafkaRDD 每個(gè)分區(qū)要消費(fèi)的最大 offset。 接著看  latestLeaderOffsets(maxRetries)

@tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 可以看到的是用來(lái)指定獲取最大偏移分區(qū)的列表還是只有currentOffsets,沒(méi)有發(fā)現(xiàn)關(guān)于新增的分區(qū)的內(nèi)容。  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually  if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err)  } else {  logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs)  latestLeaderOffsets(retries - 1)  }  } else {  o.right.get  }  }

其中 protected var currentOffsets = fromOffsets,這個(gè)僅僅是在構(gòu)建  DirectKafkaInputDStream 的時(shí)候初始化,并在 compute 里面更新:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

中間沒(méi)有檢測(cè) kafka 新增 topic 或者分區(qū)的代碼,所以可以確認(rèn) Spark Streaming 與 kafka 0.8  的版本結(jié)合不支持動(dòng)態(tài)分區(qū)檢測(cè)。

Spark Streaming 與 kafka 0.10 版本結(jié)合

入口同樣是 DirectKafkaInputDStream 的 compute 方法,撿主要的部分說(shuō),Compute 里第一行也是計(jì)算當(dāng)前 job 生成  kafkardd 要消費(fèi)的每個(gè)分區(qū)的最大 offset:

// 獲取當(dāng)前生成job,要用到的KafkaRDD每個(gè)分區(qū)最大消費(fèi)偏移值 val untilOffsets =  clamp(latestOffsets())

具體檢測(cè) kafka 新增 topic 或者分區(qū)的代碼在 latestOffsets()

/**  * Returns the latest (highest) available offsets, taking new partitions into account. */  protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer  paranoidPoll(c) // 獲取所有的分區(qū)信息  val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets  // 做差獲取新增的分區(qū)信息  val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit  // 新分區(qū)消費(fèi)位置,沒(méi)有記錄的化是由auto.offset.reset決定  currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause  c.pause(newPartitions.asJava) // find latest available offsets  c.seekToEnd(currentOffsets.keySet.asJava)  parts.map(tp => tp -> c.position(tp)).toMap  }

該方法內(nèi)有獲取 kafka 新增分區(qū),并將其更新到 currentOffsets 的過(guò)程,所以可以驗(yàn)證 Spark Streaming 與 kafka  0.10 版本結(jié)合支持動(dòng)態(tài)分區(qū)檢測(cè)。

Flink

入口類是 FlinkKafkaConsumerBase,該類是所有 flink 的 kafka 消費(fèi)者的父類。

怎么分析Flink與Spark Streaming的對(duì)比

在 FlinkKafkaConsumerBase 的 run 方法中,創(chuàng)建了 kafkaFetcher,實(shí)際上就是消費(fèi)者:

this.kafkaFetcher = createFetcher(  sourceContext,  subscribedPartitionsToStartOffsets,  periodicWatermarkAssigner,  punctuatedWatermarkAssigner,  (StreamingRuntimeContext) getRuntimeContext(),  offsetCommitMode,  getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),  useMetrics);

接是創(chuàng)建了一個(gè)線程,該線程會(huì)定期檢測(cè) kafka 新增分區(qū),然后將其添加到 kafkaFetcher 里。

if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Override  public void run() { try { // --------------------- partition discovery loop ---------------------   List<KafkaTopicPartition> discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before  // performing the next operation, so that we can escape the loop as soon as possible   while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());  } try {  discoveredPartitions = partitionDiscoverer.discoverPartitions();  } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery;  // this would only happen if the consumer was canceled; simply escape the loop  break;  } // no need to add the discovered partitions if we were closed during the meantime  if (running && !discoveredPartitions.isEmpty()) {  kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);  } // do not waste any time sleeping if we're not running anymore  if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis);  } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop  break;  }  }  }  } catch (Exception e) {  discoveryLoopErrorRef.set(e);  } finally { // calling cancel will also let the fetcher loop escape  // (if not running, cancel() was already called)  if (running) {  cancel();  }  }  }  }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());   discoveryLoopThread.start();  kafkaFetcher.runFetchLoop();

上面,就是 flink 動(dòng)態(tài)發(fā)現(xiàn) kafka 新增分區(qū)的過(guò)程。不過(guò)與 Spark 無(wú)需做任何配置不同的是,flink 動(dòng)態(tài)發(fā)現(xiàn) kafka  新增分區(qū),這個(gè)功能需要被使能的。也很簡(jiǎn)單,需要將 flink.partition-discovery.interval-millis 該屬性設(shè)置為大于 0  即可。

容錯(cuò)機(jī)制及處理語(yǔ)義

本節(jié)內(nèi)容主要是想對(duì)比兩者在故障恢復(fù)及如何保證僅一次的處理語(yǔ)義。這個(gè)時(shí)候適合拋出一個(gè)問(wèn)題:實(shí)時(shí)處理的時(shí)候,如何保證數(shù)據(jù)僅一次處理語(yǔ)義?

Spark Streaming 保證僅一次處理

對(duì)于 Spark Streaming 任務(wù),我們可以設(shè)置 checkpoint,然后假如發(fā)生故障并重啟,我們可以從上次 checkpoint  之處恢復(fù),但是這個(gè)行為只能使得數(shù)據(jù)不丟失,可能會(huì)重復(fù)處理,不能做到恰一次處理語(yǔ)義。

對(duì)于 Spark Streaming 與 kafka 結(jié)合的 direct Stream 可以自己維護(hù) offset 到 zookeeper、kafka  或任何其它外部系統(tǒng),每次提交完結(jié)果之后再提交 offset,這樣故障恢復(fù)重啟可以利用上次提交的 offset  恢復(fù),保證數(shù)據(jù)不丟失。但是假如故障發(fā)生在提交結(jié)果之后、提交 offset 之前會(huì)導(dǎo)致數(shù)據(jù)多次處理,這個(gè)時(shí)候我們需要保證處理結(jié)果多次輸出不影響正常的業(yè)務(wù)。

由此可以分析,假設(shè)要保證數(shù)據(jù)恰一次處理語(yǔ)義,那么結(jié)果輸出和 offset 提交必須在一個(gè)事務(wù)內(nèi)完成。在這里有以下兩種做法:

  • repartition(1) Spark Streaming 輸出的 action 變成僅一個(gè) partition,這樣可以利用事務(wù)去做:

Dstream.foreachRDD(rdd=>{  rdd.repartition(1).foreachPartition(partition=>{ // 開(kāi)啟事務(wù)  partition.foreach(each=>{// 提交數(shù)據(jù)  }) // 提交事務(wù)  })  })
  • 將結(jié)果和 offset 一起提交

也就是結(jié)果數(shù)據(jù)包含 offset。這樣提交結(jié)果和提交 offset 就是一個(gè)操作完成,不會(huì)數(shù)據(jù)丟失,也不會(huì)重復(fù)處理。故障恢復(fù)的時(shí)候可以利用上次提交結(jié)果帶的  offset。

Flink 與 kafka 0.11 保證僅一次處理

若要 sink 支持僅一次語(yǔ)義,必須以事務(wù)的方式寫數(shù)據(jù)到 Kafka,這樣當(dāng)提交事務(wù)時(shí)兩次 checkpoint  間的所有寫入操作作為一個(gè)事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時(shí)這些寫入操作能夠被回滾。

在一個(gè)分布式且含有多個(gè)并發(fā)執(zhí)行 sink  的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因?yàn)樗薪M件都必須對(duì)這些提交或回滾達(dá)成共識(shí),這樣才能保證得到一致性的結(jié)果。Flink  使用兩階段提交協(xié)議以及預(yù)提交(pre-commit)階段來(lái)解決這個(gè)問(wèn)題。

本例中的 Flink 應(yīng)用如圖 11 所示包含以下組件:

  • 一個(gè)source,從Kafka中讀取數(shù)據(jù)(即KafkaConsumer)

  • 一個(gè)時(shí)間窗口化的聚會(huì)操作

  • 一個(gè)sink,將結(jié)果寫回到Kafka(即KafkaProducer)

怎么分析Flink與Spark Streaming的對(duì)比

下面詳細(xì)講解 flink 的兩段提交思路:

怎么分析Flink與Spark Streaming的對(duì)比

如圖 12 所示,F(xiàn)link checkpointing 開(kāi)始時(shí)便進(jìn)入到 pre-commit 階段。具體來(lái)說(shuō),一旦 checkpoint  開(kāi)始,F(xiàn)link 的 JobManager 向輸入流中寫入一個(gè) checkpoint barrier ,將流中所有消息分割成屬于本次 checkpoint  的消息以及屬于下次 checkpoint 的,barrier 也會(huì)在操作算子間流轉(zhuǎn)。對(duì)于每個(gè) operator 來(lái)說(shuō),該 barrier 會(huì)觸發(fā)  operator 狀態(tài)后端為該 operator 狀態(tài)打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint  barrier 傳遞到后續(xù)的 operator。

這種方式僅適用于 operator 僅有它的內(nèi)部狀態(tài)。內(nèi)部狀態(tài)是指 Flink state backends 保存和管理的內(nèi)容(如第二個(gè) operator  中 window 聚合算出來(lái)的 sum)。

當(dāng)一個(gè)進(jìn)程僅有它的內(nèi)部狀態(tài)的時(shí)候,除了在 checkpoint 之前將需要將數(shù)據(jù)更改寫入到 state backend,不需要在預(yù)提交階段做其他的動(dòng)作。在  checkpoint 成功的時(shí)候,F(xiàn)link 會(huì)正確的提交這些寫入,在 checkpoint 失敗的時(shí)候會(huì)終止提交,過(guò)程可見(jiàn)圖 13。

怎么分析Flink與Spark Streaming的對(duì)比

當(dāng)結(jié)合外部系統(tǒng)的時(shí)候,外部系統(tǒng)必須要支持可與兩階段提交協(xié)議捆綁使用的事務(wù)。顯然本例中的 sink 由于引入了 kafka sink,因此在預(yù)提交階段  data sink 必須預(yù)提交外部事務(wù)。如下圖:

怎么分析Flink與Spark Streaming的對(duì)比

當(dāng) barrier 在所有的算子中傳遞一遍,并且觸發(fā)的快照寫入完成,預(yù)提交階段完成。所有的觸發(fā)狀態(tài)快照都被視為 checkpoint 的一部分,也可以說(shuō)  checkpoint 是整個(gè)應(yīng)用程序的狀態(tài)快照,包括預(yù)提交外部狀態(tài)。出現(xiàn)故障可以從 checkpoint 恢復(fù)。下一步就是通知所有的操作算子  checkpoint 成功。該階段 jobmanager 會(huì)為每個(gè) operator 發(fā)起 checkpoint 已完成的回調(diào)邏輯。

本例中 data source 和窗口操作無(wú)外部狀態(tài),因此該階段,這兩個(gè)算子無(wú)需執(zhí)行任何邏輯,但是 data sink  是有外部狀態(tài)的,因此,此時(shí)我們必須提交外部事務(wù),如下圖:

怎么分析Flink與Spark Streaming的對(duì)比

以上就是 flink 實(shí)現(xiàn)恰一次處理的基本邏輯。

Back pressure

消費(fèi)者消費(fèi)的速度低于生產(chǎn)者生產(chǎn)的速度,為了使應(yīng)用正常,消費(fèi)者會(huì)反饋給生產(chǎn)者來(lái)調(diào)節(jié)生產(chǎn)者生產(chǎn)的速度,以使得消費(fèi)者需要多少,生產(chǎn)者生產(chǎn)多少。

*back pressure 后面一律稱為背壓。

Spark Streaming 的背壓

Spark Streaming 跟 kafka 結(jié)合是存在背壓機(jī)制的,目標(biāo)是根據(jù)當(dāng)前 job 的處理情況來(lái)調(diào)節(jié)后續(xù)批次的獲取 kafka  消息的條數(shù)。為了達(dá)到這個(gè)目的,Spark Streaming 在原有的架構(gòu)上加入了一個(gè) RateController,利用的算法是  PID,需要的反饋數(shù)據(jù)是任務(wù)處理的結(jié)束時(shí)間、調(diào)度時(shí)間、處理時(shí)間、消息條數(shù),這些數(shù)據(jù)是通過(guò) SparkListener 體系獲得,然后通過(guò)  PIDRateEsimator 的 compute 計(jì)算得到一個(gè)速率,進(jìn)而可以計(jì)算得到一個(gè)  offset,然后跟限速設(shè)置最大消費(fèi)條數(shù)比較得到一個(gè)最終要消費(fèi)的消息最大 offset。

PIDRateEsimator 的 compute 方法如下:

def compute( time: Long, // in milliseconds  numElements: Long, processingDelay: Long, // in milliseconds  schedulingDelay: Long // in milliseconds  ): Option[Double] = {  logTrace(s"\ntime = $time, # records = $numElements, " +  s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000   val processingRate = numElements.toDouble / processingDelay * 1000   val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2)  val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error -  integral * historicalError -  derivative * dError).max(minRate)  logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)   latestTime = time if (firstRun) {  latestRate = processingRate  latestError = 0D  firstRun = false  logTrace("First run, rate estimation skipped") None  } else {  latestRate = newRate  latestError = error  logTrace(s"New rate = $newRate") Some(newRate)  }  } else {  logTrace("Rate estimation skipped") None  }  }  }

Flink 的背壓

與 Spark Streaming 的背壓不同的是,F(xiàn)link 背壓是 jobmanager 針對(duì)每一個(gè) task 每 50ms 觸發(fā) 100 次  Thread.getStackTrace() 調(diào)用,求出阻塞的占比。過(guò)程如圖 16 所示:

怎么分析Flink與Spark Streaming的對(duì)比

阻塞占比在 web 上劃分了三個(gè)等級(jí):

  • OK: 0 <= Ratio <= 0.10,表示狀態(tài)良好;

  • LOW: 0.10 < Ratio <= 0.5,表示有待觀察;

  • HIGH: 0.5 < Ratio <= 1,表示要處理了。

上述內(nèi)容就是怎么分析Flink與Spark Streaming的對(duì)比,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

本文題目:怎么分析Flink與SparkStreaming的對(duì)比
URL地址:http://bm7419.com/article44/goseee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營(yíng)銷型網(wǎng)站建設(shè)、網(wǎng)站導(dǎo)航搜索引擎優(yōu)化、微信小程序、云服務(wù)器網(wǎng)站設(shè)計(jì)公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都做網(wǎng)站