四、flink--window、eventTime和wate

一、flink的window機制

1.1 window概述

? streaming流式計算是一種被設(shè)計用于處理無限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無限數(shù)據(jù)集是指一種不斷增長的本質(zhì)上無限的數(shù)據(jù)集,而window是一種切割無限數(shù)據(jù)為有限塊進行處理的手段。
? Window是無限數(shù)據(jù)流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。

創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供白塔網(wǎng)站建設(shè)、白塔做網(wǎng)站、白塔網(wǎng)站設(shè)計、白塔網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、白塔企業(yè)網(wǎng)站模板建站服務(wù),十多年白塔做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。

1.2 window的類型

window可以分為兩大類:
CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個Window,與時間無關(guān)。比較少用
TimeWindow:按照時間生成Window。非常常用,下面主要將時間窗口有哪些類型。主要有四類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)、會話窗口(Session Window)和全局窗口(global window比較少用 )。

1.2.1 滾動窗口(Tumbling Windows)

概述:將數(shù)據(jù)依據(jù)固定的窗口長度對數(shù)據(jù)進行切片。只有一個工作參數(shù),就是窗口大小
特點:時間對齊,窗口長度固定,沒有重疊。
? 滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會出現(xiàn)重疊(前后時間點都是緊接著的)。例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創(chuàng)建如下圖所示:
四、flink--window、eventTime和wate
? 圖 1.2.1 滾動窗口
適用場景:適合做BI統(tǒng)計等(做每個時間段的聚合計算)。

1.2.2 滑動窗口(Sliding Windows)

概述:滑動窗口是固定窗口的更廣義的一種形式,滑動窗口工作參數(shù)由固定的窗口長度和滑動間隔組成。
特點:時間對齊,窗口長度固定,有重疊。
? 滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數(shù)來配置,另一個窗口滑動參數(shù)控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數(shù)小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:
四、flink--window、eventTime和wate
? 圖 1.2.2 滑動窗口
適用場景:對最近一個時間段內(nèi)的統(tǒng)計(求某接口最近5min的失敗率來決定是否要報警)。

1.2.3 會話窗口(Session Windows)

概述:由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應(yīng)用的session,也就是一段時間沒有接收到新數(shù)據(jù)就會生成新的窗口。
特點:時間無對齊。窗口無固定長度
? session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結(jié)束時間的情況,相反,當(dāng)它在一個固定的時間周期內(nèi)不再收到元素,即非活動間隔產(chǎn)生,那個這個窗口就會關(guān)閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當(dāng)這個非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。
四、flink--window、eventTime和wate
? 圖1.2.3 會話窗口

1.3 window窗口api

1.3.1 window api分類

window數(shù)據(jù)源分為兩種,一種是典型的KV類型(keyedStream),另一種是非KV類型(Non-keyedStream)。
區(qū)別:
keyedStream:
需要在使用窗口操作前,調(diào)用 keyBy對KV按照key進行分區(qū),然后才可以調(diào)用window操作的api,比如 countWindow,timeWindow等

Non-keyedstream:
如果使用窗口操作前,沒有使用keyBy算子,那么就認(rèn)為是Non-keyedstream,調(diào)用的window api就是 xxxWindowAll,比如countWindowAll,timeWindowAll,而且因為是非KV,所以無法分區(qū),也就是只有一個分區(qū),那么這個窗口并行度只能是1。這個是要注意的。

1.3.2 countWindow

CountWindow根據(jù)窗口中相同key元素的數(shù)量來觸發(fā)執(zhí)行,執(zhí)行時只計算元素數(shù)量達到窗口大小的key對應(yīng)的結(jié)果。

有兩個用法:
countWindow(window_size):只指定窗口大小,此時窗口是滾動窗口
countWindow(window_size, slide):指定窗口大小以及滑動間隔,此時窗口是滑動窗口

注意:CountWindow的window_size指的是相同Key的元素的個數(shù),不是輸入的所有元素的總數(shù)。

1、滾動窗口
默認(rèn)的CountWindow是一個滾動窗口,只需要指定窗口大小即可,當(dāng)元素數(shù)量達到窗口大小時,就會觸發(fā)窗口的執(zhí)行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("/test.txt");
        source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String s1 : s.split(" ")) {
                    collector.collect(new Tuple2<>(s1, 1));
                }
            }
        }).keyBy(0).countWindow(5).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        env.execute("滾動窗口");
    }

}

2、滑動窗口
動窗口和滾動窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時需要傳入兩個參數(shù),一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設(shè)置為了2,也就是說,每收到兩個相同key的數(shù)據(jù)就計算一次,每一次計算的window范圍是5個元素。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("/test.txt");
        source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String s1 : s.split(" ")) {
                    collector.collect(new Tuple2<>(s1, 1));
                }
            }
        }).keyBy(0).countWindow(5,2).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        env.execute("滑動窗口");
    }

}

1.3.3 timeWindow

? TimeWindow是將指定時間范圍內(nèi)的所有數(shù)據(jù)組成一個window,一次對一個window里面的所有數(shù)據(jù)進行計算。同樣支持類似上面的滾動窗口和滑動窗口模式。有兩個工作參數(shù):window_size和slide。只指定window_size時是滾動窗口。

1、滾動窗口
? Flink默認(rèn)的時間窗口根據(jù)Processing Time 進行窗口的劃分,將Flink獲取到的數(shù)據(jù)根據(jù)進入Flink的時間劃分到不同的窗口中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.readTextFile("/test.txt");
        source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String s1 : s.split(" ")) {
                    collector.collect(new Tuple2<>(s1, 1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(2)).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
            }
        }).print();

        env.execute("滾動窗口");
    }

}

2、滑動窗口
和上面類似,就是參數(shù)里面增加了slide參數(shù),也就是滑動時間間隔。時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。

1.3.4 window reduce

也就是在窗口算子之后執(zhí)行reduce算子,用法和普通的reduce一樣,只不過reduce的單位是一個窗口。即每一個窗口返回一次reduce結(jié)果。程序在上面,不重復(fù)了。

1.3.5 window fold

也就是在窗口算子之后執(zhí)行fold算子,用法和普通的fold一樣,只不過fold的單位是一個窗口。即每一個窗口返回一次reduce結(jié)果。程序在上面,不重復(fù)了。

1.3.6 window聚合操作

指的是max、min等這些聚合算子,只不過是在window算子之后使用,以窗口為單位,每一個窗口返回一次聚合結(jié)果,而不是像普通那樣,每一次聚合結(jié)果都返回。

二、time、watermark和window

2.1 flink中 time的分類

在flink中,time有不同分類,如下:
Event Time:
是事件創(chuàng)建的時間。它通常由事件中的時間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會記錄自己的生成時間,F(xiàn)link通過時間戳分配器訪問事件時間戳。

Ingestion Time:
是數(shù)據(jù)進入Flink的時間。

Processing Time:
是每一個執(zhí)行基于時間操作的算子的本地系統(tǒng)時間,與機器相關(guān),默認(rèn)的時間屬性就是Processing Time。也就是數(shù)據(jù)被處理時的當(dāng)前時間。

這些時間有什么不同呢?因網(wǎng)絡(luò)傳輸需要時間,所以Ingestion Time不一定和Event Time相等,很多情況下是不等的。同樣Processing Time表示數(shù)據(jù)處理時的時間,如果數(shù)據(jù)是很久之前采集的,現(xiàn)在才處理,那么很明顯,三個時間time都不會相等的。
四、flink--window、eventTime和wate
? 圖 2.1 flink--時間的概念

例子:
一條日志進入Flink的時間為2017-11-12 10:00:00.123,到達Window的系統(tǒng)時間為2017-11-12 10:00:01.234,日志的內(nèi)容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
可以看到,三個time都不相等。而對于業(yè)務(wù)來說,要統(tǒng)計1min內(nèi)的故障日志個數(shù),哪個時間是最有意義的?—— eventTime,因為我們要根據(jù)日志的生成時間進行統(tǒng)計。但是flink默認(rèn)的窗口的時間是Processing Time,那么如何引入eventTime呢?

2.2 eventTime的引入

? 在Flink的流式處理中,絕大部分的業(yè)務(wù)都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。默認(rèn)使用的是ProcessingTime。那么如何指定flink使用指定的time呢?

2.2.1 引入方式1:設(shè)置env時間類型

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(時間類型);

//三種類型的time對應(yīng)如下:
TimeCharacteristic.EventTime;  eventtime
TimeCharacteristic.IngestionTime;  到達flink的時間
TimeCharacteristic.ProcessingTime;  處理數(shù)據(jù)的時間

這種方式是整個env全局生效的,是直接將env默認(rèn)的時間設(shè)置為eventtime。后面的窗口操作默認(rèn)就會使用eventtime作為時間依據(jù)。如果想不同的窗口設(shè)置不同的時間類型,這種方式就行不通了。

2.2.2 引入方式2:單獨設(shè)置window的實際類型

stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.window這個api就是所有窗口總的api,其他窗口api都是通過這個api封裝出來的??梢酝ㄟ^這個總api,參數(shù)直接窗口的類型,比如上面的就是指定eventtime 的timewindow,這樣并不會影響整個env的時間類型。

同樣的,其他時間類型窗口,比如:
SlidingEventTimeWindows  滑動eventtime窗口

基本上看名字就知道是什么時間類型(三大時間類型)、以及什么類型(滑動、滾動、會話窗口)的窗口了。注意:eventtime沒有session窗口,processingTime和

2.3 watermark的原理

2.3.1 引入背景

? 我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。
四、flink--window、eventTime和wate
? 圖 2.3 數(shù)據(jù)的亂序
? 那么此時出現(xiàn)一個問題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了,這個特別的機制,就是Watermark。
解釋:
如果只按照到達的event的eventtime來觸發(fā)窗口操作,假設(shè)有event1~5。如果到達順序是亂的,比如event5最先達到,然后event1也達到了,那么flink這邊怎么知道這中間還有沒有數(shù)據(jù)呢?沒辦法的,不能確定數(shù)據(jù)是否完整到達,也不能無限制等待下去。所以需要一種機制來處理這種情況。

2.3.2 watermark機制原理

? Watermark是一種衡量Event Time進展的機制,它是數(shù)據(jù)本身的一個隱藏屬性,數(shù)據(jù)本身攜帶著對應(yīng)的Watermark。Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結(jié)合window來實現(xiàn)。
? 數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
? Watermark可以理解成一個延遲觸發(fā)機制,我們可以設(shè)置Watermark的延時時長t,每次系統(tǒng)會校驗已經(jīng)到達的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達,如果有窗口的停止時間等于maxEventTime – t,那么這個窗口被watermark觸發(fā)執(zhí)行。
解釋:
? watermark是一種概率性的機制。假設(shè)event1~5,如果event5已經(jīng)到達了,那么其實按照event產(chǎn)生的先后順序,正常情況下,前面的event1~4應(yīng)該也到達了。而為了保證前面的event1~4的到達(其實是更多的到達,但是不一定全部都到達),在event5到達了之后,提供一定的延遲時間t。當(dāng)event5到達,且經(jīng)過 t 時間之后,正常情況下,前面的event1~4 大概率會到達了,如果沒有到達,屬于少數(shù)情況,那么就認(rèn)為event5之前的event都到達了,無論是否真的全部到達了。如果在延遲時間之后到達了,這個舊數(shù)據(jù)直接會被丟棄。所以其實watermark就是一種保障更多event亂序到達的機制,提供了一定的延時機制,而因為只會延遲一定的時間,所以也不會導(dǎo)致flink無限期地等待下去。

有序數(shù)據(jù)流的watermark如下:(watermark設(shè)置為0)
四、flink--window、eventTime和wate
? 圖 2.4 有序數(shù)據(jù)流的watermark
亂序數(shù)據(jù)流的watermark如下:(watermark設(shè)置為2)
四、flink--window、eventTime和wate
? 圖 2.5 亂序數(shù)據(jù)流的watermark
? 當(dāng)Flink接收到每一條數(shù)據(jù)時,都會產(chǎn)生一條Watermark,這條Watermark就等于當(dāng)前所有到達數(shù)據(jù)中的maxEventTime - 延遲時長t,也就是說,Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時間要晚,那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠都不被觸發(fā)。
? 上圖中,我們設(shè)置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應(yīng)的Watermark是5s,時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時間戳為7s的事件到達時的Watermarker恰好觸發(fā)窗口1,時間戳為12s的事件到達時的Watermark恰好觸發(fā)窗口2。
? Window會不斷產(chǎn)生,屬于這個Window范圍的數(shù)據(jù)會被不斷加入到Window中,所有未被觸發(fā)的Window都會等待觸發(fā),只要Window還沒觸發(fā),屬于這個Window范圍的數(shù)據(jù)就會一直被加入到Window中,直到Window被觸發(fā)才會停止數(shù)據(jù)的追加,而當(dāng)Window觸發(fā)之后才接受到的屬于被觸發(fā)Window的數(shù)據(jù)會被丟棄。如果產(chǎn)生的窗口中沒有新到的數(shù)據(jù),也就不會有watermark,那么窗口就不會被觸發(fā)計算。

2.3.3 watermark的觸發(fā)計算的條件

watermark時間(max_eventTime-t) >= window_end_time;
在[window_start_time,window_end_time)中有數(shù)據(jù)存在。

2.3.4 watermark的產(chǎn)生方式

Punctuated:不間斷產(chǎn)生
數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark。
在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。

Periodic:周期性產(chǎn)生
周期性的(一定時間間隔或者達到一定的記錄條數(shù))產(chǎn)生一個Watermark。
在實際的生產(chǎn)中Periodic的方式必須結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會有很大的延時。

這兩種有不同的api實現(xiàn),下面會講

2.4 watermark的引入以及接口

2.4.1 watermark引入

需要先引入eventime,然后引入watermark

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStreamSource<String> source = env.readTextFile("/test.txt");

//引入的watermark的實現(xiàn)類
source.assignTimestampsAndWatermarks(xx)

watermark的實現(xiàn)有兩大類,對應(yīng)上面的兩種watermark的產(chǎn)生方式,有兩個接口:

AssignerWithPeriodicWatermarks;   周期性產(chǎn)生watermark,即Period
AssignerWithPunctuatedWatermarks;  Punctuated:不間斷產(chǎn)生

2.4.2 AssignerWithPeriodicWatermarks接口

看看AssignerWithPeriodicWatermarks這個接口的源碼,主要用于周期性產(chǎn)生watermark

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    //獲取當(dāng)前的watermark
    @Nullable
    Watermark getCurrentWatermark();
}

//父接口===================
public interface TimestampAssigner<T> extends Function {
    //獲取當(dāng)前的時間戳
    long extractTimestamp(T var1, long var2);
}

主要就是有兩個方法需要覆蓋,getCurrentWatermark()用于生成watermark,extractTimestamp用于獲取每個event的timestamp。
由于這是一個周期性產(chǎn)生watermark的接口,所以需要指定這個生成周期有多長,需要env的配置中指定,如:

env.getConfig().setAutoWatermarkInterval(n ms);
記住間隔時間單位是毫秒

例子:

/*根據(jù)eventTime 創(chuàng)建處理watermark
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    //watermark延遲時間 t,單位是毫秒
    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    //保存當(dāng)前最大的時間戳
    private long currentMaxTimestamp;

    //根據(jù)傳遞進來的event,獲取time,然后如果比當(dāng)前最大的time還大,就替換,否則保持。因為數(shù)據(jù)亂序到達是無法保證時間是遞增的
    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    //返回watermark
    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

再加上設(shè)置的setAutoWatermarkInterval(n ms),就可以周期性生成watermark。

2.4.3 AssignerWithPunctuatedWatermarks接口

看看AssignerWithPunctuatedWatermarks這個接口的源碼,主要用于實時產(chǎn)生watermark

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
    //獲取最新的watermark
    @Nullable
    Watermark checkAndGetNextWatermark(T var1, long var2);
}

//父接口
public interface TimestampAssigner<T> extends Function {
    //從event中獲取timestamp
    long extractTimestamp(T var1, long var2);
}

寫法其實和上面的類似,只是這里不會設(shè)置生成watermark的時間間隔

2.4.4 flink自帶的watermark實現(xiàn)類

1、BoundedOutOfOrdernessTimestampExtractor
繼承了AssignerWithPeriodicWatermarks接口的一個類,看看它的源碼

package org.apache.flink.streaming.api.functions.timestamps;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = -9223372036854775808L;
    private final long maxOutOfOrderness;

    //構(gòu)造方法中接收一個參數(shù),就是延遲時間 t
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0L) {
            throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        } else {
            this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
            this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness;
        }
    }

    public long getMaxOutOfOrdernessInMillis() {
        return this.maxOutOfOrderness;
    }

    //需要重寫的方法,用于獲取timestamp
    public abstract long extractTimestamp(T var1);

    //獲取watermark的方法已經(jīng)寫好了,用傳遞進來的延遲時間t來計算得出watermark
    public final Watermark getCurrentWatermark() {
        long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
        if (potentialWM >= this.lastEmittedWatermark) {
            this.lastEmittedWatermark = potentialWM;
        }

        return new Watermark(this.lastEmittedWatermark);
    }

    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = this.extractTimestamp(element);
        if (timestamp > this.currentMaxTimestamp) {
            this.currentMaxTimestamp = timestamp;
        }

        return timestamp;
    }
}

這個類就是實現(xiàn)了用戶可以自定義設(shè)定延遲時間t 的一個watermark。

2、AscendingTimestampExtractor
也是繼承了AssignerWithPeriodicWatermarks接口的一個類。具有穩(wěn)定的遞增時間戳的數(shù)據(jù)源,比如kafka的分區(qū)數(shù)據(jù),每一條信息都是遞增+1的,適用于這個類。只需要重寫
extractAscendingTimestamp方法。

2.5 eventTime、window和watermark結(jié)合使用例子

package flinktest;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class EventTimeTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        DataStreamSource<String> source = env.readTextFile("/tmp/test.txt");

        source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(3000)) {
            @Override
            public long extractTimestamp(String s) {
                return Integer.valueOf(s.split(" ")[0]);
            }
        }).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Tuple2<String, Integer> tmpTuple = new Tuple2<>();
                for (String s1 : s.split(" ")) {
                    tmpTuple.setFields(s1, 1);
                    collector.collect(tmpTuple);
                }
            }
        }).keyBy(0)
                .timeWindow(Time.seconds(10))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                        return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
                    }
                })
                .print();
         try {
            env.execute("eventtime test");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

window api的類繼承結(jié)構(gòu)

網(wǎng)站欄目:四、flink--window、eventTime和wate
轉(zhuǎn)載來源:http://bm7419.com/article10/igoogo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航、Google響應(yīng)式網(wǎng)站、面包屑導(dǎo)航、網(wǎng)站建設(shè)網(wǎng)站維護

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名