flink中如何使用sql將流式數(shù)據(jù)寫入hive

這篇文章將為大家詳細(xì)講解有關(guān)flink中如何使用sql將流式數(shù)據(jù)寫入hive,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

創(chuàng)新互聯(lián)是一家專業(yè)提供威寧企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、H5技術(shù)、小程序制作等業(yè)務(wù)。10年已為威寧眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

修改hive配置

上一篇介紹了使用sql將流式數(shù)據(jù)寫入文件系統(tǒng),這次我們來介紹下使用sql將文件寫入hive,對(duì)于如果想寫入已經(jīng)存在的hive表,則至少需要添加以下兩個(gè)屬性.  寫入hive底層還是和寫入文件系統(tǒng)一樣的,所以對(duì)于其他具體的配置參考上一篇.

alter table table_name set TBLPROPERTIES ('is_generic'='false'); 

alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore'); 


//如果想使用eventtime分區(qū)
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time'); 

   

案例講解

下面我們講解一下,如何使用java程序來構(gòu)建一個(gè)flink程序來寫入hive。

 

引入相關(guān)的pom

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
   

構(gòu)造hive catalog

  //構(gòu)造hive catalog
  String name = "myhive";
  String defaultDatabase = "default";
  String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
  String version = "3.1.2";

  HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
  tEnv.registerCatalog("myhive", hive);
  tEnv.useCatalog("myhive");
  tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  tEnv.useDatabase("db1");
   

創(chuàng)建hive表

如果目前系統(tǒng)中沒有存在相應(yīng)的hive表,可以通過在程序中執(zhí)行相應(yīng)的DDL建表語句來建表,如果已經(jīng)存在了,就把這段代碼省略,使用上面的hive命令修改現(xiàn)有表,添加相應(yīng)的屬性。

CREATE EXTERNAL TABLE `fs_table`(
  `user_id` string, 
  `order_amount` double)
PARTITIONED BY ( 
  `dt` string, 
  `h` string, 
  `m` string)
stored as ORC 
TBLPROPERTIES (
  'sink.partition-commit.policy.kind'='metastore',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
   

將流數(shù)據(jù)插入hive,

 String insertSql = "insert into  fs_table SELECT userId, amount, " +
                     " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
  tEnv.executeSql(insertSql);
 

完整的代碼請(qǐng)參考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java

 

遇到的坑

 

問題詳解

對(duì)于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了'sink.partition-commit.trigger'='partition-time',最后發(fā)現(xiàn)程序沒法提交分區(qū)。

分析了一下源碼,問題是出在了這個(gè)方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先貼上代碼:


 @Override
 public List<String> committablePartitions(long checkpointId) {
  if (!watermarks.containsKey(checkpointId)) {
   throw new IllegalArgumentException(String.format(
     "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
     checkpointId, watermarks));
  }

  long watermark = watermarks.get(checkpointId);
  watermarks.headMap(checkpointId, true).clear();

  List<String> needCommit = new ArrayList<>();
  Iterator<String> iter = pendingPartitions.iterator();
  while (iter.hasNext()) {
   String partition = iter.next();
   //通過分區(qū)的值抽取分區(qū)的時(shí)間.
   LocalDateTime partTime = extractor.extract(
     partitionKeys, extractPartitionValues(new Path(partition)));
   //判斷水印是否大于分區(qū)創(chuàng)建時(shí)間+延遲時(shí)間
   if (watermark > toMills(partTime) + commitDelay) {
    needCommit.add(partition);
    iter.remove();
   }
  }
  return needCommit;
 }

 

系統(tǒng)通過分區(qū)值來抽取相應(yīng)的分區(qū)創(chuàng)建時(shí)間,然后進(jìn)行比對(duì),比如我們?cè)O(shè)置的pattern是      h:$m:00 , 某一時(shí)刻我們正在往 /2020-07-06/18/20/ 這個(gè)分區(qū)下寫數(shù)據(jù),那么程序根據(jù)分區(qū)值,得到的pattern將會(huì)是2020-07-06 18:20:00,這個(gè)值在sql中是根據(jù)DATA_FORMAT函數(shù)獲取的。

這個(gè)值是帶有時(shí)區(qū)的, 也是我想要的, 比如我們的時(shí)區(qū)設(shè)置為東八區(qū),2020-07-06 18:20:00這個(gè)時(shí)間是東八區(qū)的時(shí)間,換成標(biāo)準(zhǔn)UTC時(shí)間是減去八個(gè)小時(shí),也就是2020-07-06 10:20:00,而源碼中的toMills函數(shù)在處理這個(gè)東八區(qū)的時(shí)間時(shí),并沒有任何加入任何時(shí)區(qū)的處理,把這個(gè)其實(shí)應(yīng)該是東八區(qū)的時(shí)間當(dāng)做了UTC時(shí)間來處理,這樣計(jì)算出來的值就比實(shí)際值大8小時(shí),導(dǎo)致一直沒有觸發(fā)分區(qū)的提交。

如果我們?cè)跀?shù)據(jù)源構(gòu)造的分區(qū)是UTC時(shí)間,也就是不帶分區(qū)的時(shí)間,那么這個(gè)邏輯就是沒有問題的,但是這樣又不符合我們的實(shí)際情況,比如對(duì)于分區(qū)2020-07-06 18:20:00,我希望我的分區(qū)肯定是東八區(qū)的時(shí)間,而不是比東八區(qū)小8個(gè)小時(shí)的UTC時(shí)間2020-07-06 10:20:00。

所以針對(duì)上述情況,有兩種解決方案,一種是自定義一個(gè)分區(qū)抽取類,第二,就是修改源碼,改一下現(xiàn)在的缺省的時(shí)間分區(qū)抽取類。我個(gè)人認(rèn)為修改一下缺省類更好理解,因?yàn)槟壳皩懭胛募蚳ive這塊配置和概念有點(diǎn)多,我不想太增加過多的配置來增加用戶的難度,應(yīng)該盡可能的用缺省值就能使程序很好的運(yùn)行。

我們看下flink中的StreamingFileSink類,構(gòu)造分區(qū)桶的時(shí)候默認(rèn)是使用的DateTimeBucketAssigner,其構(gòu)造分區(qū)路徑就是帶有時(shí)區(qū)概念的,默認(rèn)就用的是本地時(shí)區(qū)。

public DateTimeBucketAssigner(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }
 
   

修改方案

這個(gè)問題,也不知道算不算一個(gè)bug,我給官方提交了一個(gè)ISSUE,但是官方?jīng)]有采納,不過我覺得不符合我的習(xí)慣,所以我對(duì)這個(gè)功能進(jìn)行了修改,讓partition.time-extractor.timestamp-pattern提取的partiiton是帶有時(shí)區(qū)的,默認(rèn)情況下是本地時(shí)區(qū)。如果是非本地時(shí)區(qū),可以指定時(shí)區(qū),通過參數(shù)partition.time-extractor.time-zone來指定,我們可以通下面的代碼獲取有效的時(shí)區(qū)。

 Set<String> zoneIds = ZoneId.getAvailableZoneIds();
 zoneIds.stream().forEach(System.out::println);
 

比如我們東八區(qū)默認(rèn)使用 Asia/Shanghai。

我基于社區(qū)的flink的tag release-1.11.0-rc4,我改了一下代碼 將代碼放到了github上。

關(guān)于flink中如何使用sql將流式數(shù)據(jù)寫入hive就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

當(dāng)前題目:flink中如何使用sql將流式數(shù)據(jù)寫入hive
本文地址:http://bm7419.com/article16/pcojdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站網(wǎng)站改版、外貿(mào)網(wǎng)站建設(shè)、定制開發(fā)、定制網(wǎng)站、虛擬主機(jī)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

手機(jī)網(wǎng)站建設(shè)