FlinkCDC怎么監(jiān)聽MySQL表

這篇文章主要講解了“Flink CDC怎么監(jiān)聽MySQL表”,文中的講解內(nèi)容簡單清晰,易于學(xué)習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習“Flink CDC怎么監(jiān)聽MySQL表”吧!

創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設(shè)計、網(wǎng)站制作與策劃設(shè)計,溫江網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)十年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:溫江等地區(qū)。溫江做網(wǎng)站價格咨詢:18982081108

// 前景提要:開啟mysql binlog監(jiān)控。(目錄:C:\ProgramData\MySQL\MySQL Server 5.6\my.ini)ProgramData 為隱藏目錄。注意:binlog_format=ROW
// 創(chuàng)建Blink Streaming的TableEnvironmentEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// 創(chuàng)建表,connector使用mysql-cdcbsTableEnv.executeSql("CREATE TABLE mysql_binlog " +"(id STRING, " +"times STRING, " +"temp STRING) " +"WITH " +"('connector' = 'mysql-cdc', " +" 'hostname' = '127.0.0.1', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'test', " +" 'table-name' = 'sersor_temp'" +")");// 打印控制臺bsTableEnv.executeSql("CREATE TABLE sink_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE) " +"WITH " +"('connector' = 'print'" +")");// 將CDC數(shù)據(jù)源和下游數(shù)據(jù)表對接起來bsTableEnv.executeSql("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE sink_kafka_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'test_mysql_binlog'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'properties.group.id' = 'testGroup'," +" 'properties.bootstrap.servers' = 'node2:9092', " +" 'format' = 'canal-json' " +")");// 將CDC數(shù)據(jù)與 kafka表對接起來bsTableEnv.executeSql("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE hTable (" +" id STRING," +" f ROW<times STRING, temp STRING>," +" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'hbase-2.2'," +" 'table-name' = 'regional:binlog'," +" 'zookeeper.quorum' = 'node2:2181'" +")");// 將CDC數(shù)據(jù)存儲到 Hbase中bsTableEnv.executeSql("INSERT INTO hTable SELECT id, ROW(times, temp) FROM mysql_binlog");

-- ----------------------------
-- Table structure for sersor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sersor_temp`;
CREATE TABLE `sersor_temp`  (
  `id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `temp` decimal(10, 2) NOT NULL,
  `times` varchar(10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of sersor_temp
-- ----------------------------
INSERT INTO `sersor_temp` VALUES ('sensor_1', 22.20, '1547718527');
INSERT INTO `sersor_temp` VALUES ('sensor_2', 25.20, '1547718214');
INSERT INTO `sersor_temp` VALUES ('sensor_3', 46.40, '1547718520');
INSERT INTO `sersor_temp` VALUES ('sensor_5', 32.62, '1547718325');
 

注意:此處 表中 temp 字段為 decimal 類型,在SQL中使用  DECIMAL 、DOUBLE 類型 存儲到hbase中都會出現(xiàn)亂碼問題,遂 都換成 STRING

感謝各位的閱讀,以上就是“Flink CDC怎么監(jiān)聽MySQL表”的內(nèi)容了,經(jīng)過本文的學(xué)習后,相信大家對Flink CDC怎么監(jiān)聽MySQL表這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

網(wǎng)站名稱:FlinkCDC怎么監(jiān)聽MySQL表
文章地址:http://bm7419.com/article4/pscpie.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、面包屑導(dǎo)航、域名注冊、小程序開發(fā)微信小程序、網(wǎng)頁設(shè)計公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名