這篇文章將為大家詳細(xì)講解有關(guān)Flink 1.10中SQL、HiveCatalog與事件時(shí)間整合的示例分析,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
渦陽(yáng)網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),渦陽(yáng)網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為渦陽(yáng)超過(guò)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站建設(shè)要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的渦陽(yáng)做網(wǎng)站的公司定做!
添加依賴項(xiàng)
Maven 下載:
https://maven.aliyun.com/mvn/search
<properties>
<scala.bin.version>2.11</scala.bin.version>
<flink.version>1.10.0</flink.version>
<hive.version>1.1.0</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka-0.11_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
最后,找到 Hive 的配置文件 hive-site.xml,準(zhǔn)備工作就完成了。
注冊(cè) HiveCatalog、創(chuàng)建數(shù)據(jù)庫(kù)
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
val catalog = new HiveCatalog(
"rtdw", // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog("rtdw", catalog)
tableEnv.useCatalog("rtdw")
val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods"
tableEnv.sqlUpdate(createDbSql)
創(chuàng)建 Kafka 流表并指定事件時(shí)間
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541
CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 處理時(shí)間 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始o(jì)ffset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自動(dòng)推導(dǎo)解析JSON 'update-mode' = 'append')
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT
https://www.jianshu.com/p/c612e95a5028
val createTableSql = """ |上文的SQL語(yǔ)句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql)
開(kāi)窗計(jì)算 PV、UV
SELECT eventType,TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd,COUNT(userId) AS pv,COUNT(DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND)
SQL 文檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
val queryActiveSql =
"""
|......
|......
""".stripMargin
val result = tableEnv.sqlQuery(queryActiveSql)
result
.toAppendStream[Row]
.print()
.setParallelism(1)
關(guān)于“Flink 1.10中SQL、HiveCatalog與事件時(shí)間整合的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
網(wǎng)頁(yè)題目:Flink1.10中SQL、HiveCatalog與事件時(shí)間整合的示例分析
分享URL:http://bm7419.com/article6/igosog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設(shè)、建站公司、云服務(wù)器、手機(jī)網(wǎng)站建設(shè)、營(yíng)銷型網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)