這篇文章主要講解了“Flink中keyBy有哪些方式指定key”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink中keyBy有哪些方式指定key”吧!
創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括清河門網(wǎng)站建設(shè)、清河門網(wǎng)站制作、清河門網(wǎng)頁(yè)制作以及清河門網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,清河門網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到清河門省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
不管是stream還是batch處理,都有一個(gè)keyBy(stream)和groupBy(batch)操作。那么該如何指定key?
Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.
一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個(gè)key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數(shù)據(jù)按照key進(jìn)行分組。
DataSet
DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
類似于MySQL中的join操作:select a.* , b.* from a join b on a.id=b.id
這里的keyBy就是a.id=b.id
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
可以傳字段的位置
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
可以傳字段位置的組合
這對(duì)于簡(jiǎn)單的使用時(shí)沒(méi)問(wèn)題的。但是對(duì)于內(nèi)嵌的Tuple,如下所示:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
如果使用keyBy(0),那么他就會(huì)使用整個(gè)Tuple2<Integer, Float>作為key,(因?yàn)門uple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0號(hào)位置)。如果想要指定key到Tuple2<Integer, Float>內(nèi)部中,可以使用下面的方式。
我們可以使用基于字符串字段表達(dá)式來(lái)引用內(nèi)嵌字段去定義key。
之前我們的算子寫法是這樣的:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示輸入是一個(gè)String,輸出是一個(gè)Tuple2<String, Integer>。這里我們重新定義一個(gè)內(nèi)部類:
public static class WC { private String word; private int count; public WC() { } public WC(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } }
修改算子的寫法:
text.flatMap(new FlatMapFunction<String, WC>() { @Override public void flatMap(String value, Collector<WC> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for (String token : tokens) { if (token.length() > 0) { out.collect(new WC(token, 1)); } } } }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);
將原來(lái)的輸出Tuple2<String, Integer>,修改為輸出WC類型;將原來(lái)的keyBy(0)修改為keyBy("word");將原來(lái)的sum(1)修改為sum("count")
因此,在這個(gè)例子中我們有一個(gè)POJO類,有兩個(gè)字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。
語(yǔ)法:
字段名一定要與POJO類中的字段名一致。一定要提供默認(rèn)的構(gòu)造函數(shù),和get與set方法。
使用Tuple時(shí),0表示第一個(gè)字段
可以使用嵌套方式,舉例如下:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
"count",指向的是WC中的字段count
"complex",指向的是復(fù)雜數(shù)據(jù)類型,會(huì)遞歸選擇所有ComplexNestedClass的字段
"complex.word.f2",指向的是Tuple3中的最后一個(gè)字段。
"complex.hadoopCitizen",指向的是Hadoop IntWritable
type
scala寫法:
object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隱式轉(zhuǎn)換 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map(x => WC(x,1)) .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } case class WC(word: String, count: Int) }
.keyBy(new KeySelector<WC, Object>() { @Override public Object getKey(WC value) throws Exception { return value.word; } })
感謝各位的閱讀,以上就是“Flink中keyBy有哪些方式指定key”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink中keyBy有哪些方式指定key這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
網(wǎng)頁(yè)題目:Flink中keyBy有哪些方式指定key
網(wǎng)站鏈接:http://bm7419.com/article26/psddjg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供軟件開(kāi)發(fā)、網(wǎng)站設(shè)計(jì)、網(wǎng)站導(dǎo)航、品牌網(wǎng)站制作、網(wǎng)站建設(shè)、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)