這篇文章將為大家詳細(xì)講解有關(guān)Flink SQL怎么用,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
目前成都創(chuàng)新互聯(lián)已為超過(guò)千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、網(wǎng)站托管維護(hù)、企業(yè)網(wǎng)站設(shè)計(jì)、靜海網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
Create Table Like
CREATE [TEMPORARY] TABLE base_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id)) WITH ( 'connector': 'kafka') CREATE [TEMPORARY] TABLE derived_table ( WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)LIKE base_table;
CREATE [TEMPORARY] TABLE derived_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id), WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH ( ‘connector’: ‘kafka’)
CREATE [TEMPORARY] TABLE base_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id)) WITH ( 'connector': 'kafka', 'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300', 'format': 'json') CREATE [TEMPORARY] TABLE derived_table ( WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)WITH ( 'connector.starting-offset': '0')LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);
CREATE [TEMPORARY] TABLE derived_table ( id BIGINT, name STRING, tstmp TIMESTAMP, WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH ( 'connector': 'kafka', 'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300', 'format': 'json')
Dynamic Table Options
create table kafka_table ( id bigint, age int, name STRING) WITH ( 'connector' = 'kafka', 'topic' = 'employees', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '123456', 'format' = 'csv', 'csv.ignore-parse-errors' = 'false')
在之前的版本,如果用戶有如下需求:
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- override table options in query source
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- override table options in join
select * from
kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
join
kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
on t1.id = t2.id;
-- override table options for INSERT target table
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.dynamic-table-options.enabled", "true");
SQL API 改進(jìn)
Current Interface | New Interface |
tEnv.sqlUpdate("CREATE TABLE ..."); | TableResult result = tEnv.executeSql("CREATE TABLE ..."); |
tEnv.sqlUpdate("INSERT INTO ... SELECT ..."); tEnv.execute("test"); | TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ..."); |
execute vs createStatementSet
Hive 語(yǔ)法兼容加強(qiáng)
EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// to use hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// use the hive catalogtableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);tableEnv.useCatalog(hiveCatalog.getName());
create external table tbl1 (
d decimal(10,0),
ts timestamp)
partitioned by (p string)
location '%s'
tblproperties('k1'='v1');
create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;
create table tbl3 (
m map<timestamp,binary>
)
partitioned by (p1 bigint, p2 tinyint)
row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';
create table tbl4 (
x int,
y smallint)
row format delimited fields terminated by '|' lines terminated by '\n';
更簡(jiǎn)潔的 connector 屬性
CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset')
JDBC catalog
CREATE CATALOG mypg WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);
USE CATALOG mypg;
Python UDF 增強(qiáng)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")
def add(i, j):
return i + j
table_env = BatchTableEnvironment.create(env)
# register the vectorized Python scalar function
table_env.register_function("add", add)
# use the vectorized Python scalar function in Python Table API
my_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL API
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
關(guān)于“Flink SQL怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
名稱欄目:?FlinkSQL怎么用
文章鏈接:http://bm7419.com/article26/psdojg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、靜態(tài)網(wǎng)站、小程序開(kāi)發(fā)、App設(shè)計(jì)、網(wǎng)站營(yíng)銷(xiāo)、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)