?FlinkSQL怎么用

這篇文章將為大家詳細(xì)講解有關(guān)Flink SQL怎么用,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

目前成都創(chuàng)新互聯(lián)已為超過(guò)千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、網(wǎng)站托管維護(hù)、企業(yè)網(wǎng)站設(shè)計(jì)、靜海網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。

Flink SQL 是 Flink 的核心模塊之一。作為一個(gè)分布式的 SQL 查詢引擎。Flink SQL 提供了各種異構(gòu)數(shù)據(jù)源的聯(lián)合查詢。開(kāi)發(fā)者可以很方便地在一個(gè)程序中通過(guò) SQL 編寫(xiě)復(fù)雜的分析查詢。通過(guò) CBO 優(yōu)化器、列式存儲(chǔ)、和代碼生成技術(shù),F(xiàn)link SQL 擁有非常高的查詢效率。同時(shí)借助于 Flink runtime 良好的容錯(cuò)和擴(kuò)展性,F(xiàn)link SQL 可以輕松處理海量數(shù)據(jù)。

在保證優(yōu)秀性能的同時(shí),易用性是 1.11 版本 Flink SQL 的重頭戲。易用性的提升主要體現(xiàn)在以下幾個(gè)方面:


Create Table Like

在生產(chǎn)中,用戶常常有調(diào)整現(xiàn)有表定義的需求。例如用戶想在一些外部的表定義(例如 Hive metastore)基礎(chǔ)上追加 Flink 特有的一些定義比如 watermark。在 ETL 場(chǎng)景中,將多張表的數(shù)據(jù)合并到一張表,目標(biāo)表的 schema 定義其實(shí)是上游表的合集,需要一種方便合并表定義的方式。
從 1.11 版本開(kāi)始,F(xiàn)link 提供了 LIKE 語(yǔ)法,用戶可以很方便的在已有的表定義上追加新的定義。

例如我們可以使用下面的語(yǔ)法給已有表 base_table 追加 watermark 定義:

  
    
  
  
  
CREATE [TEMPORARY] TABLE base_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id)) WITH (    'connector': 'kafka') CREATE [TEMPORARY] TABLE derived_table (    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)LIKE base_table;
           

           
這里 derived_table 表定義等價(jià)于如下定義:

  
    
  
  
  
CREATE [TEMPORARY] TABLE derived_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id),    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH (    ‘connector’: ‘kafka’)
           
           
           
對(duì)比之下,新的語(yǔ)法省去了重復(fù)的 schema 定義,用戶只需要定義追加屬性,非常方便簡(jiǎn)潔。

多屬性策略
有的小伙伴會(huì)問(wèn),原表和新表的屬性只是新增或追加嗎?如果我想覆蓋或者排除某些屬性該如何操作?這是一個(gè)好問(wèn)題,F(xiàn)link LIKE 語(yǔ)法提供了非常靈活的表屬性操作策略。

LIKE 語(yǔ)法支持使用不同的 keyword 對(duì)表屬性分類:

  • ALL:完整的表定義
  • CONSTRAINTS: primary keys, unique key 等約束
  • GENERATED: 主要指計(jì)算列和 watermark
  • OPTIONS: WITH (...) 語(yǔ)句內(nèi)定義的 table options
  • PARTITIONS: 表分區(qū)信息

在不同的屬性分類上可以追加不同的屬性行為:

  • INCLUDING:包含(默認(rèn)行為)
  • EXCLUDING:排除
  • OVERWRITING:覆蓋

下面這張表格說(shuō)明了不同的分類屬性允許的行為:


INCLUDING    
EXCLUDING    
OVERWRITING    
ALL    
??    
??    
?    
CONSTRAINTS    
??    
??    
?    
PARTITIONS    
??    
??    
?    
GENERATED    
??    
??    
??    
OPTIONS    
??    
??    
??    

例如下面的語(yǔ)句:

  
    
  
  
  CREATE [TEMPORARY] TABLE base_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    PRIMARY KEY(id)) WITH (    'connector': 'kafka',    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',    'format': 'json') CREATE [TEMPORARY] TABLE derived_table (    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)WITH (    'connector.starting-offset': '0')LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);

等價(jià)的表屬性定義為:

  
    
  
  
  
CREATE [TEMPORARY] TABLE derived_table (    id BIGINT,    name STRING,    tstmp TIMESTAMP,    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH (    'connector': 'kafka',    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',    'format': 'json')
           
           
           
細(xì)節(jié)參見(jiàn):  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Dynamic Table Options

在生產(chǎn)中,調(diào)整參數(shù)是一個(gè)常見(jiàn)需求,很多的時(shí)候是臨時(shí)修改(比如通過(guò)終端查詢和展示),比如下面這張 Kafka 表:


 
 
 
 
   

 
 
 
create table kafka_table (  id bigint,  age int,  name STRING) WITH (  'connector' = 'kafka',  'topic' = 'employees',  'scan.startup.mode' = 'timestamp',  'scan.startup.timestamp-millis' = '123456',  'format' = 'csv',  'csv.ignore-parse-errors' = 'false')

在之前的版本,如果用戶有如下需求:

  • 用戶需要指定特性的消費(fèi)時(shí)間戳,即修改 scan.startup.timestamp-millis 屬性
  • 用戶想忽略掉解析錯(cuò)誤,需要將 format.ignore-parse-errors 改為 true
只能使用 ALTER TABLE 這樣的語(yǔ)句修改表的定義,從 1.11 開(kāi)始,用戶可以通過(guò)動(dòng)態(tài)參數(shù)的形式靈活地設(shè)置表的屬性參數(shù),覆蓋或者追加原表的 WITH (...) 語(yǔ)句內(nèi)定義的 table options。

基本語(yǔ)法為:

  
    
  
  
  
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */
           
           
           
OPTIONS 內(nèi)的鍵值對(duì)會(huì)覆蓋原表的 table options,用戶可以在各種 SQL 語(yǔ)境中使用這樣的語(yǔ)法,例如:

  
    
  
  
  
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- override table options in query sourceselect id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- override table options in joinselect * from    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1    join    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2    on t1.id = t2.id;
-- override table options for INSERT target tableinsert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
           

           
動(dòng)態(tài)參數(shù)的使用沒(méi)有語(yǔ)境限制,只要是引用表的地方都可以追加定義。在指定的表后面追加的動(dòng)態(tài)參數(shù)會(huì)自動(dòng)追加到原表定義中,是不是很方便呢 :)

由于可能對(duì)查詢結(jié)果有影響,動(dòng)態(tài)參數(shù)功能默認(rèn)是關(guān)閉的, 使用下面的方式開(kāi)啟該功能:

  
    
  
  
  
// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.dynamic-table-options.enabled", "true");
           

           
細(xì)節(jié)參見(jiàn):  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

SQL API 改進(jìn)

隨著 Flink SQL 支持的語(yǔ)句越來(lái)越豐富,老的 API 容易引起一些困惑:

  • 原先的 sqlUpdate() 方法傳遞 DDL 語(yǔ)句會(huì)立即執(zhí)行,而 INSERT INTO 語(yǔ)句在調(diào)用 execute 方法時(shí)才會(huì)執(zhí)行
  • Table 程序的執(zhí)行入口不夠清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以觸發(fā) table 程序執(zhí)行
  • execute 方法沒(méi)有返回值。像 SHOW TABLES 這樣的語(yǔ)句沒(méi)有很好地方式返回結(jié)果。另外,sqlUpdate 方法加入了越來(lái)越多的語(yǔ)句導(dǎo)致接口定義不清晰,sqlUpdate 可以執(zhí)行 SHOW TABLES 就是一個(gè)反例
  • 在 Blink planner 一直提供多 sink 優(yōu)化執(zhí)行的能力,但是在 API 層沒(méi)有體現(xiàn)出來(lái)

1.11 重新梳理了 TableEnv 上的 sql 相關(guān)接口,提供了更清晰的執(zhí)行語(yǔ)義,同時(shí)執(zhí)行任意 sql 語(yǔ)句現(xiàn)在都有返回值,用戶可以通過(guò)新的 API 靈活的組織多行 sql 語(yǔ)句一起執(zhí)行。

更清晰的執(zhí)行語(yǔ)義
新的接口 TableEnvironment#executeSql 統(tǒng)一返回抽象 TableResult,用戶可以迭代 TableResult 拿到執(zhí)行結(jié)果。根據(jù)執(zhí)行語(yǔ)句的不同,返回結(jié)果的數(shù)據(jù)結(jié)構(gòu)也有變化,比如 SELECT 語(yǔ)句會(huì)返回查詢結(jié)果,而 INSERT 語(yǔ)句會(huì)異步提交作業(yè)到集群。

組織多條語(yǔ)句一起執(zhí)行
新的接口 TableEnvironment#createStatementSet 允許用戶添加多條 INSERT 語(yǔ)句并一起執(zhí)行,在多 sink 場(chǎng)景,Blink planner 會(huì)針對(duì)性地對(duì)執(zhí)行計(jì)劃做優(yōu)化。

新舊 API 對(duì)比
一張表格感受新老 API 的變化:

sqlUpdate vs executeSql  

Current Interface

New Interface

tEnv.sqlUpdate("CREATE TABLE ...");

TableResult result = tEnv.executeSql("CREATE TABLE ...");

tEnv.sqlUpdate("INSERT INTO ... SELECT ...");

tEnv.execute("test");

TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ...");

execute vs createStatementSet

Current Interface    
New Interface    
tEnv.sqlUpdate("insert into xx ...")
tEnv.sqlUpdate("insert into yy ...")
tEnv.execute("test")
StatementSet ss = tEnv.createStatementSet();
ss.addInsertSql("insert into xx ...");
ss.addInsertSql("insert into yy ...");
TableResult result = ss.execute();

   
tEnv.insertInto("sink1", table1)
tEnv.insertInto("sink2", table2)
tEnv.execute("test")
StatementSet ss = tEnv.createStatementSet();
ss.addInsert("sink1", table1);
ss.addInsert("sink2", table2);
TableResult result = ss.execute()
詳情參見(jiàn):  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Hive 語(yǔ)法兼容加強(qiáng)

從 1.11 開(kāi)始,F(xiàn)link  SQL 將 Hive parser 模塊獨(dú)立出來(lái),用以兼容 Hive 的語(yǔ)法,目前 DDL 層面,DB、Table、View、Function 相關(guān)的語(yǔ)法均已支持。搭配 HiveCatalog,Hive 的同學(xué)可以直接使用 Hive 的語(yǔ)法來(lái)進(jìn)行相關(guān)的操作。

在使用 hive 語(yǔ)句之前需要設(shè)置正確的 Dialect:

  
    
  
  
  
EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// to use hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// use the hive catalogtableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);tableEnv.useCatalog(hiveCatalog.getName());
           

           
之后我們便可以使用 Hive 的語(yǔ)法來(lái)執(zhí)行一些 DDL,例如最常見(jiàn)的建表操作:

  
    
  
  
  
create external table tbl1 (  d decimal(10,0),  ts timestamp)partitioned by (p string)location '%s'tblproperties('k1'='v1');  create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;
create table tbl3 (  m map<timestamp,binary>)partitioned by (p1 bigint, p2 tinyint)row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';
create table tbl4 (  x int,  y smallint)row format delimited fields terminated by '|' lines terminated by '\n';
           

           
對(duì)于 DQL 的 Hive 語(yǔ)法兼容已經(jīng)在規(guī)劃中,1.12 版本會(huì)兼容更多 query 語(yǔ)法 ~

詳情參見(jiàn):  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html

更簡(jiǎn)潔的 connector 屬性

1.11 重新規(guī)范了 connector 的屬性定義,新的屬性 key 更加直觀簡(jiǎn)潔,和原有的屬性 key 相比主要做了如下改動(dòng):

  • 使用 connector 作為 connector 的類型 key,connector 版本信息直接放到 value 中,比如 0.11 的 kafka 為 kafka-0.11 
  • 去掉了其余屬性中多余的 connector 前綴
  • 使用 scan 和 sink 前綴標(biāo)記 source 和 sink 專有屬性
  • format.type 精簡(jiǎn)為 format ,同時(shí) format 自身屬性使用 format 的值作為前綴,比如 csv format 的自身屬性使用 csv 統(tǒng)一作前綴

例如,1.11 Kafka 表的定義如下:

  
    
  
  
  CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset')

詳情參見(jiàn):  https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

JDBC catalog

在之前的版本中,用戶只能通過(guò)顯示建表的方式創(chuàng)建關(guān)系型數(shù)據(jù)庫(kù)的鏡像表。用戶需要手動(dòng)追蹤 Flink SQL 的表 schema 和數(shù)據(jù)庫(kù)的 schema 變更。在 1.11,F(xiàn)link SQL 提供了一個(gè) JDBC catalog 接口對(duì)接各種外部的數(shù)據(jù)庫(kù)系統(tǒng),例如 Postgres、MySQL、MariaDB、AWS Aurora、etc。

當(dāng)前 Flink 內(nèi)置了 Postgres 的 catalog 實(shí)現(xiàn),使用下面的代碼配置 JDBC catalog:

CREATE CATALOG mypg WITH(    'type' = 'jdbc',    'default-database' = '...',    'username' = '...',    'password' = '...',    'base-url' = '...');
USE CATALOG mypg;

用戶也可以實(shí)現(xiàn) JDBCCatalog 接口定制其他數(shù)據(jù)庫(kù)的 catalog ~


Python UDF 增強(qiáng)

1.11 版本的 py-flink 在 python UDF 方面提供了很多增強(qiáng),包括 DDL 的定義方式、支持了標(biāo)量的向量化 python UDF,支持全套的 python UDF metrics 定義,以及在 SQL-CLI 中定義 python UDF。

DDL 定義 python UDF
1.10.0 版本引入了對(duì) python UDF 的支持。但是僅僅支持 python table api 的方式。1.11 提供了 SQL DDL 的方式定義 python UDF, 用戶可以在 Java/Scala table API 以及 SQL-CLI 場(chǎng)景下使用。

例如,現(xiàn)在用戶可以使用如下方式定義 Java table API 程序使用 python UDF:

  
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
   
向量化支持
向量化 Python  UDF 相較于普通函數(shù)大大提升了性能。用戶可以使用流行的 python 庫(kù)例如 Pandas、Numpy 來(lái)實(shí)現(xiàn)向量化的 python UDF。用戶只需在裝飾器 udf 中添加額外的參數(shù) udf_type="pandas" 即可。
例如,下面的樣例展示了如何定義向量化的 Python 標(biāo)量函數(shù)以及在 python table api 中的應(yīng)用:

  
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")def add(i, j):  return i + j
table_env = BatchTableEnvironment.create(env)
# register the vectorized Python scalar functiontable_env.register_function("add", add)
# use the vectorized Python scalar function in Python Table APImy_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL APItable_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

關(guān)于“Flink SQL怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

名稱欄目:?FlinkSQL怎么用
文章鏈接:http://bm7419.com/article26/psdojg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、靜態(tài)網(wǎng)站、小程序開(kāi)發(fā)、App設(shè)計(jì)、網(wǎng)站營(yíng)銷(xiāo)、自適應(yīng)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化