本篇內(nèi)容主要講解“flink的Transformation數(shù)據(jù)處理方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“flink的Transformation數(shù)據(jù)處理方法是什么”吧!
為商丘等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及商丘網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、商丘網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!
將一個(gè)或多個(gè)DataStream生成新的DataStream的過程被稱為Transformation。轉(zhuǎn)換過程中,每種操作類型被定義為不同的Operator,F(xiàn)link能將多個(gè)Transformation組合為一個(gè)DataFlow的拓?fù)洹?/p>
所以DataStream的轉(zhuǎn)換操作可以分為SingleDataStream、MultiDataStream、物理分區(qū)三個(gè)類型。
SingleDataStream:單個(gè)DataStream的處理邏輯。
MultiDataStream:多個(gè)DataStream的處理邏輯。
物理分區(qū):對數(shù)據(jù)集中的并行度和數(shù)據(jù)分區(qū)調(diào)整轉(zhuǎn)換的處理邏輯。
常用作對數(shù)據(jù)集內(nèi)數(shù)據(jù)的清晰和轉(zhuǎn)換。如將輸入數(shù)據(jù)的每個(gè)數(shù)值全部加1,并將數(shù)據(jù)輸出到下游。
val dataStream = evn.formElements(("a",3),("d",4),("c",4),("c",5),("a",5)) //方法一 val mapStream:DataStream[(String,Int)] = dataStream.map(t => (t._1,t._2+1)) //方法二 val mapStream:DataStream[(String,Int)] = dataStream.map( new MapFunction[(String,Int),(String, Int)]{ override def map(t: (String,Int)): (String,Int) ={ (t._1, t._2+1) } })
主要應(yīng)用于處理輸入一個(gè)元素轉(zhuǎn)換為多個(gè)元素場景,如WordCount,將沒行文本數(shù)據(jù)分割,生成單詞序列。
val dataStream:DataStream[String] = environment.fromCollections() val resultStream[String] =dataStream.flatMap{str => str.split(" ")}
按條件對輸入數(shù)據(jù)集進(jìn)行篩選,輸出符合條件的數(shù)據(jù)。
//通配符 val filter:DataStream[Int] = dataStream.filter{ _ %2 == 0} //運(yùn)算表達(dá)式 val filter:DataStream[Int] = dataStream.filter { x => x % 2 ==0}
根據(jù)指定的key對輸入的數(shù)據(jù)集執(zhí)行Partition操作,將相同的key值的數(shù)據(jù)放置到相同的區(qū)域中。
將下標(biāo)為1相同的數(shù)據(jù)放到一個(gè)分區(qū)
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3)) //指定第一個(gè)字段為分區(qū)key val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
與MapReduce中reduce原理基本一致,將輸入的KeyedStream通過傳入用戶自定義的ReduceFunction滾動(dòng)進(jìn)行數(shù)據(jù)聚合處理,定義的ReduceFunction必須滿足運(yùn)算結(jié)合律和交換律。
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) //指定第一個(gè)字段為分區(qū)key val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0) //實(shí)現(xiàn)一:滾動(dòng)第二個(gè)字段進(jìn)行reduce相加求和 val reduceStream = keyedStream.reduce{(t1,t2) => (t1._1, t1._2+t2._2)} //實(shí)現(xiàn)二:實(shí)現(xiàn)ReduceFunction val reduceStream1 = keyedStream.reduce(new ReduceFunction[(String, Int)] { override def reduce(t1: (String,Int), t2:(String,Int)):(String, int) = { (t1._1, t1._2+ t2._2) } })
運(yùn)行結(jié)果為:(c,2)(c,7)(a,3)(d,4)(a,8),結(jié)果不是最后求和的值,是將每條記錄累加后的結(jié)果輸出。
DataStream提供的聚合算子,根據(jù)指定的字段進(jìn)行聚合操作,滾動(dòng)產(chǎn)生一系列數(shù)據(jù)聚合結(jié)果。實(shí)際是將Reduce算子中函數(shù)進(jìn)行封裝,封裝的聚合操作有sum、min、minBy、max、maxBy等。這樣就不需要用戶自己定義Reduce函數(shù)。
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3)) //指定第一個(gè)字段為分區(qū)key val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0) //對第二個(gè)字段進(jìn)行sum統(tǒng)計(jì) val sumStream: DataStream[(Int,Int)] = keyedStream.sum(1) //輸出統(tǒng)計(jì)結(jié)果 sumStream.print()
聚合函數(shù)中傳入?yún)?shù)必須是數(shù)值型,否則會(huì)拋出異常。
//統(tǒng)計(jì)計(jì)算指定key最小值 val minStream: DataStream[(Int,Int)] = keyedStream.min(1) //統(tǒng)計(jì)計(jì)算指定key最大值 val maxStream: DataStream[(Int,Int)] = keyedStream.max(1) //統(tǒng)計(jì)計(jì)算指定key最小值,返回最小值對應(yīng)元素 val minByStream: DataStream[(Int,Int)] = keyedStream.minBy(1) //統(tǒng)計(jì)計(jì)算指定key最大值,返回最大值對應(yīng)元素 val maxByStream: DataStream[(Int,Int)] = keyedStream.maxBy(1)
將兩個(gè)或多個(gè)輸入的數(shù)據(jù)集合并為一個(gè)數(shù)據(jù)集,需要保證輸入待合并數(shù)據(jù)集和輸出數(shù)據(jù)集格式一致。
//創(chuàng)建不同數(shù)據(jù)集 val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) val dataStream2: DataStream [(String ,Int)]= env.fromElements(("d",1),("s",2),("a",4),("e",5),("a",6)) val dataStream3: DataStream [(String ,Int)]= env.fromElements(("a",2),("d",1),("s",2),("c",3),("b",1)) //合并兩個(gè)數(shù)據(jù)集 val unionStream = dataStream1.union(dataStream2) //合并多個(gè)數(shù)據(jù)集 val allUnionStream = dataStream1.union(dataStream2,dataStream3)
該算子為了合并兩種或多種不同類型的數(shù)據(jù)集,合并后會(huì)保留原始數(shù)據(jù)集的數(shù)類型。連接操作允許共享狀態(tài)數(shù)據(jù),也就是說在多個(gè)數(shù)據(jù)集之間可以操作和查看對方數(shù)據(jù)集的狀態(tài)。
實(shí)例:dataStream1數(shù)據(jù)集為(String,Int)元祖類型,dataStream2數(shù)據(jù)集為Int類型,通過connect連接將兩種類型數(shù)據(jù)結(jié)合在一起,形成格式為ConnectedStream是的數(shù)據(jù)集,其內(nèi)部數(shù)據(jù)為[(String,Int),Int]的混合數(shù)據(jù)類型,保留兩個(gè)數(shù)據(jù)集的數(shù)據(jù)類型。
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) val dataStream2: DataStream [Int]= env.fromElements(1,2,4,5,6) //連接兩個(gè)數(shù)據(jù)集 val connectedStream :ConnectedStreams[(String, Int), Int] = dataStream1.connect(dataStream2)
注意:ConnectedStreams類型的數(shù)據(jù)集不能進(jìn)行類似Print()操作,需轉(zhuǎn)換為DataStream類型數(shù)據(jù)集。
ConnectedStreams提供map()和flatMap()需要定義CoMapFunction或CoFlatMapFunction分別處理輸入的DataStream數(shù)據(jù)集,或直接傳入MapFunction來分別處理兩個(gè)數(shù)據(jù)集。
map()實(shí)例如下:
val resultStream = connectedStream.map(new CoMapFunction[(String,Int),Int,(Int, String)]{ //定義第一個(gè)數(shù)據(jù)集函數(shù)處理邏輯,輸入值為第一個(gè)DataStream override def map1(in1: (String,Int)): (Int ,String) = { (int1._2 , in1._1) } //定義第二個(gè)數(shù)據(jù)集函數(shù)處理邏輯 override def amp2(in2: Int):(Int,String) = { (int2,"default") } })
以上實(shí)例中,兩個(gè)函數(shù)會(huì)多線程交替執(zhí)行產(chǎn)生結(jié)果,最后根據(jù)定義生成目標(biāo)數(shù)據(jù)集。
flatMap()方法中指定CoFlatMapFunction。兩個(gè)函數(shù)共享number變量,代碼如下:
val resultStream2 = connectedStream.flatMap(new CoFlatMapFunction[(String,Int), Int ,(String ,Int , Int)]{ //定義共享變量 var number=0 //定義第一個(gè)數(shù)據(jù)集處理函數(shù) override def flatMap1(in1:(String ,Int ), collector : Collector[(String,Int ,Int)]): Unit = { collector.collect((in1._1,in1._2,number)) } //定義第二個(gè)數(shù)據(jù)集處理函數(shù) override def flatMap2(in2: Int, collector : Collector[(String , Int ,Int)]):Unit = { number=in2 } })
如果想通過指定的條件對兩個(gè)數(shù)據(jù)集進(jìn)行關(guān)聯(lián),可以借助keyBy韓碩或broadcast廣播變量實(shí)現(xiàn)。keyBy會(huì)將相同key的數(shù)據(jù)路由在同一個(gè)Operator中。broadcast會(huì)在執(zhí)行計(jì)算邏輯前,將DataStream2數(shù)據(jù)集廣播到所有并行計(jì)算的Operator中,再根據(jù)條件對數(shù)據(jù)集進(jìn)行關(guān)聯(lián)。這兩種方式本質(zhì)是分布式j(luò)oin算子的基本實(shí)現(xiàn)方式。
//通過keyby函數(shù)根據(jù)指定的key連接兩個(gè)數(shù)據(jù)集 val keyedConnect: ConnectedStreams[(String ,Int ), Int] = dataStream1.connect(dataStream2).keyBy(1,0) //通過broadcast關(guān)聯(lián)兩個(gè)數(shù)據(jù)集 val broadcastConnect: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect(dataStream2.broadcast())
將一個(gè)DataStream數(shù)據(jù)集按條件進(jìn)行拆分,形成兩個(gè)數(shù)據(jù)集的過程,union的逆向操作。實(shí)例:如調(diào)用split函數(shù),指定條件判斷,根據(jù)第二個(gè)字段的奇偶性將數(shù)據(jù)集標(biāo)記出來,偶數(shù)標(biāo)記為event,奇數(shù)標(biāo)記為odd,再通過集合將標(biāo)記返回,最終生成SplitStream數(shù)據(jù)集。
//創(chuàng)建數(shù)據(jù)集 val DataStream1: DataStream[(String, Int)] = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5)) //合并連個(gè)DataStream數(shù)據(jù)集 val splitedStream : SplitStream[(String,Int)] = dataStream1.split(t => if(t._2 % 2 ==0 ) Seq("even") else Seq("odd"))
split函數(shù)只是標(biāo)記數(shù)據(jù),沒有拆分?jǐn)?shù)據(jù),因此需要select函數(shù)根據(jù)標(biāo)記將數(shù)據(jù)切分為不同數(shù)據(jù)集。
//篩選出偶數(shù)數(shù)據(jù)集 val evenStream: DataStream[(String,Int)] = splitedStream.select("even") //篩選出奇數(shù)數(shù)據(jù)集 val oddStream: DataStream[(String,Int)] = splitedStream.select("odd") //篩選出偶數(shù)和奇數(shù)數(shù)據(jù)集 val allStream: DataStream[(String,Int)] = splitedStream.select("even","odd")
Iterate適合于迭代計(jì)算,通過每一次的迭代計(jì)算,并將計(jì)算結(jié)果反饋到下一次迭代計(jì)算中。
//創(chuàng)建數(shù)據(jù)集,map處理為對數(shù)據(jù)分區(qū)根據(jù)默認(rèn)并行度進(jìn)行平衡 val DataStream = env.fromElements(3,1,2,1,5).map{ t:Int => t} val iterated = dataStream.iterate((input: ConnectedStreams[Int , String]) => { //定義兩個(gè)map處理數(shù)據(jù)集,第一個(gè)map反饋操作,第二個(gè)map將數(shù)據(jù)輸出到下游 val head= input.map(i => (i+1).toString, s => s) (head.filter( _ == "2"), head.filter (_ != "2")) },1000) //超過1000ms沒有數(shù)據(jù)接入終止迭代
根據(jù)指定的分區(qū)策略將數(shù)據(jù)重新分發(fā)到不同節(jié)點(diǎn)的Task實(shí)例上執(zhí)行,以此優(yōu)化DataStream自身API對數(shù)據(jù)的分區(qū)控制。
隨機(jī)將數(shù)據(jù)集中數(shù)據(jù)分配到下游算子的每個(gè)分區(qū)中,優(yōu)點(diǎn)數(shù)據(jù)相對均衡,缺點(diǎn)失去原有數(shù)據(jù)的分區(qū)結(jié)構(gòu)
val shuffleStream=dataStream.shuffle
循環(huán)將數(shù)據(jù)集中數(shù)據(jù)進(jìn)行重分區(qū),能盡可能保證每個(gè)分區(qū)的數(shù)據(jù)平衡,可有效解決數(shù)據(jù)集的傾斜問題。
val shuffleStream= dataStream.rebalance();
一種通過循環(huán)方式進(jìn)行數(shù)據(jù)重平衡的分區(qū)策略,與Roundrobin Partitioning不同,它僅會(huì)對上下游繼承的算子數(shù)據(jù)進(jìn)行重新平衡,具體主要根據(jù)上下游算子的并行度決定。如上游算子的并發(fā)度為2,下游算子的并發(fā)度為4,上游算子中第一個(gè)分區(qū)數(shù)據(jù)按照同等比例將數(shù)據(jù)路由在下游的固定兩個(gè)分區(qū)中,另一個(gè)分區(qū)也是一樣。
//通過調(diào)用DataStream API中rescale()方法實(shí)現(xiàn)Rescaling Partitioning操作 val shuffleStream = dataStream.rescale();
將輸入的數(shù)據(jù)集復(fù)制到下游算子的并行的Tasks實(shí)例中,下游算子Tasks可直接從本地內(nèi)存中獲取廣播數(shù)據(jù)集,不再依賴網(wǎng)絡(luò)傳輸。
這種分區(qū)策略適合于小集群,如大數(shù)據(jù)集關(guān)聯(lián)小數(shù)據(jù)集時(shí),可通過廣播方式將小數(shù)據(jù)分發(fā)到算子的分區(qū)中。
//通過DataStream API的broadcast() 方法實(shí)現(xiàn)廣播分區(qū) val shuffleStream= dataStream.broadcast()
實(shí)現(xiàn)自定義分區(qū)器,調(diào)用DataStream API上的partitionCustom()方法將創(chuàng)建的分區(qū)器應(yīng)用到數(shù)據(jù)集上。
如下,自定義分區(qū)器實(shí)現(xiàn)將字段中包含flink關(guān)鍵字的數(shù)據(jù)放在partition為0的分區(qū)中,其余數(shù)據(jù)執(zhí)行隨機(jī)分區(qū)策略,其中num Partitions是從系統(tǒng)中獲取的并行度參數(shù)。
Object customPartitioner extends Partitioner[String]{ //獲取隨機(jī)數(shù)生成器 val r=scala.util.Random override def partition(key: String, numPartitions: Int): Int ={ //定義分區(qū)策略,key中如果包含a則放入0分區(qū)中,其他情況則根據(jù)Partitions num隨機(jī)分區(qū) if(key.contains("flink")) 0 else r.nextInt(numPartitions) } }
完成自定義分區(qū)器,調(diào)用DataStream API的partitionCustom應(yīng)用分區(qū)器,第二個(gè)參數(shù)指定分區(qū)器使用到的字段,對于Tuple類型數(shù)據(jù),分區(qū)字段可以通過字段名稱指定,其他類型數(shù)據(jù)集則通過位置索引指定。
//通過數(shù)據(jù)集字段名稱指定分區(qū)字段 dataStream.partitionCustom(customPartitioner,"filed_name"); //通過數(shù)據(jù)集字段索引指定分區(qū)字段 dataStream.partitionCustom(customPartitioner,0)
到此,相信大家對“flink的Transformation數(shù)據(jù)處理方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
新聞標(biāo)題:flink的Transformation數(shù)據(jù)處理方法是什么
本文網(wǎng)址:http://bm7419.com/article28/jciccp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、手機(jī)網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、搜索引擎優(yōu)化、網(wǎng)頁設(shè)計(jì)公司、微信公眾號
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)