Flink中keyBy有哪些方式指定key

這篇文章主要講解了“Flink中keyBy有哪些方式指定key”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink中keyBy有哪些方式指定key”吧!

創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括清河門網(wǎng)站建設(shè)、清河門網(wǎng)站制作、清河門網(wǎng)頁(yè)制作以及清河門網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,清河門網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到清河門省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

keyBy 如何指定key

不管是stream還是batch處理,都有一個(gè)keyBy(stream)和groupBy(batch)操作。那么該如何指定key?

Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.

 一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個(gè)key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數(shù)據(jù)按照key進(jìn)行分組。

DataSet

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

DataStream

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

類似于MySQL中的join操作:select a.* , b.* from a join b on a.id=b.id

這里的keyBy就是a.id=b.id

有哪幾種方式定義Key?

方式一:Tuple

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

可以傳字段的位置

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

可以傳字段位置的組合

這對(duì)于簡(jiǎn)單的使用時(shí)沒(méi)問(wèn)題的。但是對(duì)于內(nèi)嵌的Tuple,如下所示:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

如果使用keyBy(0),那么他就會(huì)使用整個(gè)Tuple2<Integer, Float>作為key,(因?yàn)門uple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0號(hào)位置)。如果想要指定key到Tuple2<Integer, Float>內(nèi)部中,可以使用下面的方式。

方式二:字段表達(dá)式

我們可以使用基于字符串字段表達(dá)式來(lái)引用內(nèi)嵌字段去定義key。

之前我們的算子寫法是這樣的:

text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示輸入是一個(gè)String,輸出是一個(gè)Tuple2<String, Integer>。這里我們重新定義一個(gè)內(nèi)部類:

public static class WC {
        private String word;
        private int count;

        public WC() {
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }

修改算子的寫法:

        text.flatMap(new FlatMapFunction<String, WC>() {
            @Override
            public void flatMap(String value, Collector<WC> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new WC(token, 1));
                    }
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);

將原來(lái)的輸出Tuple2<String, Integer>,修改為輸出WC類型;將原來(lái)的keyBy(0)修改為keyBy("word");將原來(lái)的sum(1)修改為sum("count")

因此,在這個(gè)例子中我們有一個(gè)POJO類,有兩個(gè)字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。

語(yǔ)法:

  • 字段名一定要與POJO類中的字段名一致。一定要提供默認(rèn)的構(gòu)造函數(shù),和get與set方法。

  • 使用Tuple時(shí),0表示第一個(gè)字段

  • 可以使用嵌套方式,舉例如下:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}
  • "count",指向的是WC中的字段count

  • "complex",指向的是復(fù)雜數(shù)據(jù)類型,會(huì)遞歸選擇所有ComplexNestedClass的字段

  • "complex.word.f2",指向的是Tuple3中的最后一個(gè)字段。

  • "complex.hadoopCitizen",指向的是Hadoop IntWritable type

scala寫法:

object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 引入隱式轉(zhuǎn)換
    import org.apache.flink.api.scala._

    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map(x => WC(x,1))
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .sum("count")
        .print()
        .setParallelism(1)

    env.execute("StreamingWCScalaApp");
  }
  case class WC(word: String, count: Int)
}

 方式三:key選擇器函數(shù)

.keyBy(new KeySelector<WC, Object>() {
            @Override
            public Object getKey(WC value) throws Exception {
                return value.word;
            }
        })

感謝各位的閱讀,以上就是“Flink中keyBy有哪些方式指定key”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink中keyBy有哪些方式指定key這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

網(wǎng)頁(yè)題目:Flink中keyBy有哪些方式指定key
網(wǎng)站鏈接:http://bm7419.com/article26/psddjg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供軟件開(kāi)發(fā)網(wǎng)站設(shè)計(jì)、網(wǎng)站導(dǎo)航、品牌網(wǎng)站制作、網(wǎng)站建設(shè)、建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都做網(wǎng)站