ApacheFlinkTime&Window深度解析-創(chuàng)新互聯(lián)

作者:邱從賢

創(chuàng)新互聯(lián)建站為企業(yè)級(jí)客戶(hù)提高一站式互聯(lián)網(wǎng)+設(shè)計(jì)服務(wù),主要包括做網(wǎng)站、網(wǎng)站設(shè)計(jì)、成都App定制開(kāi)發(fā)成都小程序開(kāi)發(fā)、宣傳片制作、LOGO設(shè)計(jì)等,幫助客戶(hù)快速提升營(yíng)銷(xiāo)能力和企業(yè)形象,創(chuàng)新互聯(lián)各部門(mén)都有經(jīng)驗(yàn)豐富的經(jīng)驗(yàn),可以確保每一個(gè)作品的質(zhì)量和創(chuàng)作周期,同時(shí)每年都有很多新員工加入,為我們帶來(lái)大量新的創(chuàng)意。 

1、 Window & Time 介紹

Apache Flink(以下簡(jiǎn)稱(chēng) Flink) 是一個(gè)天然支持無(wú)限流數(shù)據(jù)處理的分布式計(jì)算框架,在 Flink 中 Window 可以將無(wú)限流切分成有限流,是處理有限流的核心組件,現(xiàn)在 Flink 中 Window 可以是時(shí)間驅(qū)動(dòng)的(Time Window),也可以是數(shù)據(jù)驅(qū)動(dòng)的(Count Window)。

下面的代碼是在 Flink 中使用 Window 的兩個(gè)示例

2、 Window API 使用

從第一部分我們已經(jīng)知道 Window 的一些基本概念,以及相關(guān) API,下面我們以一個(gè)實(shí)際例子來(lái)看看怎么使用 Window 相關(guān)的 API。

代碼來(lái)自 flink-examples

上面的例子中我們首先會(huì)對(duì)每條數(shù)據(jù)進(jìn)行時(shí)間抽取,然后進(jìn)行 keyby,接著依次調(diào)用 window(),evictor(), trigger() 以及 maxBy()。下面我們重點(diǎn)來(lái)看 window(), evictor() 和 trigger() 這幾個(gè)方法。

2.1 WindowAssigner, Evictor 以及 Trigger

window 方法接收的輸入是一個(gè)WindowAssigner, WindowAssigner 負(fù)責(zé)將每條輸入的數(shù)據(jù)分發(fā)到正確的 window 中(一條數(shù)據(jù)可能同時(shí)分發(fā)到多個(gè) Window 中),F(xiàn)link 提供了幾種通用的 WindowAssigner:tumbling window(窗口間的元素?zé)o重復(fù)),sliding window(窗口間的元素可能重復(fù)),session window 以及 global window。如果需要自己定制數(shù)據(jù)分發(fā)策略,則可以實(shí)現(xiàn)一個(gè) class,繼承自 WindowAssigner。

Tumbling Window

Sliding Window

Session Window

Global Window

evictor 主要用于做一些數(shù)據(jù)的自定義操作,可以在執(zhí)行用戶(hù)代碼之前,也可以在執(zhí)行用戶(hù)代碼之后,更詳細(xì)的描述可以參考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 兩個(gè)方法。Flink 提供了如下三種通用的 evictor:

  • CountEvictor 保留指定數(shù)量的元素

  • DeltaEvictor 通過(guò)執(zhí)行用戶(hù)給定的 DeltaFunction 以及預(yù)設(shè)的 threshold,判斷是否刪除一個(gè)元素。

  • TimeEvictor設(shè)定一個(gè)閾值 interval,刪除所有不再 max_ts - interval 范圍內(nèi)的元素,其中 max_ts 是窗口內(nèi)時(shí)間戳的大值。

evictor 是可選的方法,如果用戶(hù)不選擇,則默認(rèn)沒(méi)有。

trigger 用來(lái)判斷一個(gè)窗口是否需要被觸發(fā),每個(gè) WindowAssigner 都自帶一個(gè)默認(rèn)的 trigger,如果默認(rèn)的 trigger 不能滿(mǎn)足你的需求,則可以自定義一個(gè)類(lèi),繼承自 Trigger 即可,我們?cè)敿?xì)描述下 Trigger 的接口以及含義:

  • onElement() 每次往 window 增加一個(gè)元素的時(shí)候都會(huì)觸發(fā)

  • onEventTime() 當(dāng) event-time timer 被觸發(fā)的時(shí)候會(huì)調(diào)用

  • onProcessingTime() 當(dāng) processing-time timer 被觸發(fā)的時(shí)候會(huì)調(diào)用

  • onMerge() 對(duì)兩個(gè) trigger 的 state 進(jìn)行 merge 操作

  • clear() window 銷(xiāo)毀的時(shí)候被調(diào)用

上面的接口中前三個(gè)會(huì)返回一個(gè) TriggerResult,TriggerResult 有如下幾種可能的選擇:

  • CONTINUE 不做任何事情

  • FIRE 觸發(fā) window

  • PURGE 清空整個(gè) window 的元素并銷(xiāo)毀窗口

  • FIRE_AND_PURGE 觸發(fā)窗口,然后銷(xiāo)毀窗口

2.2 Time & Watermark

了解完上面的內(nèi)容后,對(duì)于時(shí)間驅(qū)動(dòng)的窗口,我們還有兩個(gè)概念需要澄清:Time 和 Watermark。

我們知道在分布式環(huán)境中 Time 是一個(gè)很重要的概念,在 Flink 中 Time 可以分為三種Event-Time,Processing-Time 以及 Ingestion-Time,三者的關(guān)系我們可以從下圖中得知:

Event Time、Ingestion Time、Processing Time

Event-Time 表示事件發(fā)生的時(shí)間,Processing-Time 則表示處理消息的時(shí)間(墻上時(shí)間),Ingestion-Time 表示進(jìn)入到系統(tǒng)的時(shí)間。

在 Flink 中我們可以通過(guò)下面的方式進(jìn)行 Time 類(lèi)型的設(shè)置

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 設(shè)置使用 ProcessingTime

了解了 Time 之后,我們還需要知道 Watermark 相關(guān)的概念。

我們可以考慮一個(gè)這樣的例子:某 App 會(huì)記錄用戶(hù)的所有點(diǎn)擊行為,并回傳日志(在網(wǎng)絡(luò)不好的情況下,先保存在本地,延后回傳)。A 用戶(hù)在 11:02 對(duì) App 進(jìn)行操作,B 用戶(hù)在 11:03 操作了 App,但是 A 用戶(hù)的網(wǎng)絡(luò)不太穩(wěn)定,回傳日志延遲了,導(dǎo)致我們?cè)诜?wù)端先接受到 B 用戶(hù) 11:03 的消息,然后再接受到 A 用戶(hù) 11:02 的消息,消息亂序了。

那我們?cè)趺幢WC基于 event-time 的窗口在銷(xiāo)毀的時(shí)候,已經(jīng)處理完了所有的數(shù)據(jù)呢?這就是 watermark 的功能所在。watermark 會(huì)攜帶一個(gè)單調(diào)遞增的時(shí)間戳 t,watermark(t) 表示所有時(shí)間戳不大于 t 的數(shù)據(jù)都已經(jīng)到來(lái)了,未來(lái)小于等于t的數(shù)據(jù)不會(huì)再來(lái),因此可以放心地觸發(fā)和銷(xiāo)毀窗口了。下圖中給了一個(gè)亂序數(shù)據(jù)流中的 watermark 例子

2.3 遲到的數(shù)據(jù)

上面的 watermark 讓我們能夠應(yīng)對(duì)亂序的數(shù)據(jù),但是真實(shí)世界中我們沒(méi)法得到一個(gè)完美的 watermark 數(shù)值 — 要么沒(méi)法獲取到,要么耗費(fèi)太大,因此實(shí)際工作中我們會(huì)使用近似 watermark — 生成 watermark(t) 之后,還有較小的概率接受到時(shí)間戳 t 之前的數(shù)據(jù),在 Flink 中將這些數(shù)據(jù)定義為 “l(fā)ate elements”, 同樣我們可以在 window 中指定是允許延遲的大時(shí)間(默認(rèn)為 0),可以使用下面的代碼進(jìn)行設(shè)置

設(shè)置allowedLateness 之后,遲來(lái)的數(shù)據(jù)同樣可以觸發(fā)窗口,進(jìn)行輸出,利用 Flink 的 side output 機(jī)制,我們可以獲取到這些遲到的數(shù)據(jù),使用方式如下:

需要注意的是,設(shè)置了 allowedLateness 之后,遲到的數(shù)據(jù)也可能觸發(fā)窗口,對(duì)于 Session window 來(lái)說(shuō),可能會(huì)對(duì)窗口進(jìn)行合并,產(chǎn)生預(yù)期外的行為。

3 Window 內(nèi)部實(shí)現(xiàn)

在討論 Window 內(nèi)部實(shí)現(xiàn)的時(shí)候,我們?cè)偻ㄟ^(guò)下圖回顧一下 Window 的生命周期

每條數(shù)據(jù)過(guò)來(lái)之后,會(huì)由 WindowAssigner 分配到對(duì)應(yīng)的 Window,當(dāng) Window 被觸發(fā)之后,會(huì)交給 Evictor(如果沒(méi)有設(shè)置 Evictor 則跳過(guò)),然后處理 UserFunction。其中 WindowAssigner,Trigger,Evictor 我們都在上面討論過(guò),而 UserFunction 則是用戶(hù)編寫(xiě)的代碼。

整個(gè)流程還有一個(gè)問(wèn)題需要討論:Window 中的狀態(tài)存儲(chǔ)。我們知道 Flink 是支持 Exactly Once 處理語(yǔ)義的,那么 Window 中的狀態(tài)存儲(chǔ)和普通的狀態(tài)存儲(chǔ)又有什么不一樣的地方呢?

首先給出具體的答案:從接口上可以認(rèn)為沒(méi)有區(qū)別,但是每個(gè) Window 會(huì)屬于不同的 namespace,而非 Window 場(chǎng)景下,則都屬于 VoidNamespace ,最終由 State/Checkpoint 來(lái)保證數(shù)據(jù)的 Exactly Once 語(yǔ)義,下面我們從 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator 摘取一段代碼進(jìn)行闡述

從上面我們可以知道,Window 中的的元素同樣是通過(guò) state 進(jìn)行維護(hù),然后由 Checkpoint 機(jī)制保證 Exactly Once 語(yǔ)義。

至此,Time、Window 相關(guān)的所有內(nèi)容都已經(jīng)講解完畢,主要包括為什么要有 Window; Window 中的三個(gè)核心組件:WindowAssigner、Trigger 和 Evictor;Window 中怎么處理亂序數(shù)據(jù),亂序數(shù)據(jù)是否允許延遲,以及怎么處理遲到的數(shù)據(jù);最后我們梳理了整個(gè) Window 的數(shù)據(jù)流程,以及 Window 中怎么保證 Exactly Once 語(yǔ)義。

更多資訊請(qǐng)?jiān)L問(wèn) Apache Flink 中文社區(qū)網(wǎng)站

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性?xún)r(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿(mǎn)足用戶(hù)豐富、多元化的應(yīng)用場(chǎng)景需求。

網(wǎng)頁(yè)名稱(chēng):ApacheFlinkTime&Window深度解析-創(chuàng)新互聯(lián)
轉(zhuǎn)載來(lái)于:http://bm7419.com/article8/igdop.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設(shè)、建站公司、品牌網(wǎng)站制作企業(yè)建站、做網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

營(yíng)銷(xiāo)型網(wǎng)站建設(shè)