Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use ​ Flink SQL

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to use Flink SQL for you. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.

Flink SQL is one of the core modules of Flink. As a distributed SQL query engine. Flink SQL provides federated queries for various heterogeneous data sources. Developers can easily write complex analytical queries through SQL in a program. Through CBO optimizer, column storage, and code generation technology, Flink SQL has very high query efficiency. At the same time, with the good fault tolerance and expansibility of Flink runtime, Flink SQL can easily deal with massive data.

While ensuring excellent performance, ease of use is the highlight of version 1.11 of Flink SQL. The improvement in ease of use is mainly reflected in the following aspects:

Create Table Like

In production, users often have the need to adjust existing table definitions. For example, users want to add some Flink-specific definitions such as watermark to some external table definitions (such as Hive metastore). In the ETL scenario, when the data from multiple tables are merged into one table, the schema definition of the target table is actually a collection of upstream tables, which requires a convenient way to merge table definitions. Since version 1.11, Flink provides LIKE syntax, which makes it easy for users to append new definitions to existing table definitions.

For example, we can append the watermark definition to the existing table base_table using the following syntax:

CREATE [TEMPORARY] TABLE base_table (id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY (id)) WITH ('connector':' kafka') CREATE [TEMPORARY] TABLE derived_table (WATERMARK FOR tstmp AS tsmp-INTERVAL'5' SECOND) LIKE base_table

Here the derived_table table definition is equivalent to the following definition:

CREATE [TEMPORARY] TABLE derived_table (id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY (id), WATERMARK FOR tstmp AS tsmp-INTERVAL'5' SECOND) WITH ('connector':' kafka')

In contrast, the new syntax eliminates repeated schema definitions, and users only need to define additional attributes, which is very convenient and concise.

Some partners in the multi-attribute strategy will ask, are the properties of the original table and the new table just new or added? What should I do if I want to override or exclude certain attributes? This is a good question, and the Flink LIKE syntax provides a very flexible strategy for manipulating table properties.

The LIKE syntax supports the classification of table attributes using different keyword:

ALL: complete table definition CONSTRAINTS: primary keys, unique key and other constraints GENERATED: mainly refers to calculated columns and watermarkOPTIONS: WITH (...) Table optionsPARTITIONS defined in the statement: table partition information

Different attribute behaviors can be appended to different attribute classifications:

INCLUDING: include (default behavior) EXCLUDING: exclude OVERWRITING: override

The following table illustrates the behaviors allowed by different classification attributes:

INCLUDINGEXCLUDINGOVERWRITINGALL ✔️✔️❌ CONSTRAINTS ✔️✔️❌ PARTITIONS ✔️✔️❌ GENERATED ✔️ OPTIONS ✔️

For example, the following statement:

CREATE [TEMPORARY] TABLE base_table (id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY (id)) WITH ('connector':' kafka', 'scan.startup.specific-offsets':' partition:0,offset:42 Partition:1,offset:300', 'format':' json') CREATE [TEMPORARY] TABLE derived_table (WATERMARK FOR tstmp AS tsmp-INTERVAL'5' SECOND) WITH ('connector.starting-offset':' 0') LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS)

The equivalent table properties are defined as:

CREATE [TEMPORARY] TABLE derived_table (id BIGINT, name STRING, tstmp TIMESTAMP, WATERMARK FOR tstmp AS tsmp-INTERVAL'5' SECOND) WITH ('connector':' kafka', 'scan.startup.specific-offsets':' partition:0,offset:42;partition:1,offset:300', 'format':' json')

For details, see: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Dynamic Table Options

In production, adjusting parameters is a common requirement, which is often temporarily modified (for example, through terminal query and display), such as the following Kafka table:

Create table kafka_table (id bigint, age int, name STRING) WITH ('connector' =' kafka', 'topic' =' employees', 'scan.startup.mode' =' timestamp', 'scan.startup.timestamp-millis' =' 123456, 'format' =' csv', 'csv.ignore-parse-errors' =' false')

In previous versions, if the user had the following requirements:

The user needs to specify the consumption timestamp of the feature, that is, to modify the scan.startup.timestamp-millis attribute. If you want to ignore the parsing error, you need to change the format.ignore-parse-errors to true. You can only use statements like ALTER TABLE to modify the definition of the table. From 1.11 onwards, users can flexibly set the property parameters of the table in the form of dynamic parameters, overwrite or append the WITH (...) of the original table. The table options defined in the statement.

The basic syntax is:

Table_name / * + OPTIONS ('K1x / aa.bb.cc'='v2') * /

The key-value pair in OPTIONS overrides the table options of the original table, and users can use this syntax in various SQL contexts, such as:

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...); CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...)

-- override table options in query sourceselect id, name from kafka_table1 / * + OPTIONS ('scan.startup.mode'='earliest-offset') * /

-- override table options in joinselect * from kafka_table1 / * + OPTIONS ('scan.startup.mode'='earliest-offset') * / T1 join kafka_table2 / * + OPTIONS (' scan.startup.mode'='earliest-offset') * / T2 on t1.id = t2.id

-- override table options for INSERT target tableinsert into kafka_table1 / * + OPTIONS ('sink.partitioner'='round-robin') * / select * from kafka_table2

There are no contextual restrictions on the use of dynamic parameters, and they can be defined as long as they are referenced by the table. Isn't it convenient that the dynamic parameters appended after the specified table are automatically appended to the original table definition:)

Because it may have an impact on the query results, the dynamic parameter feature is turned off by default. Turn it on in the following way:

/ / instantiate table environmentTableEnvironment tEnv =. / / access flink configurationConfiguration configuration = tEnv.getConfig () .getConfiguration (); / / set low-level key-value optionsconfiguration.setString ("table.dynamic-table-options.enabled", "true")

For details, see: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

SQL API improvement

As more and more statements are supported by Flink SQL, the old API tends to cause some confusion:

The original sqlUpdate () method passes the DDL statement to be executed immediately, while the INSERT INTO statement executes the Table program only when the execute method is called. The execution entry of the Table program is not clear enough, such as TableEnvironment.execute () and StreamExecutionEnvironment.execute () can trigger the table program to execute the execute method with no return value. Statements like SHOW TABLES do not return results in a good way. In addition, more and more statements are added to the sqlUpdate method, resulting in unclear interface definition. SqlUpdate can execute SHOW TABLES is a counterexample. Blink planner has always provided the ability of multi-sink optimization execution, but it is not reflected in the API layer.

1.11 re-combs the sql-related interfaces on TableEnv to provide clearer execution semantics. Any sql statement executed at the same time now has a return value, and users can flexibly organize multiple lines of sql statements to execute together through the new API.

Clearer execution semantics the new interface TableEnvironment#executeSql uniformly returns abstract TableResult, and users can iterate through TableResult to get the execution result. The data structure that returns the result varies depending on the execution statement. For example, the SELECT statement returns the query result, while the INSERT statement asynchronously submits the job to the cluster.

Organizing multiple statements to execute the new interface TableEnvironment#createStatementSet allows users to add multiple INSERT statements and execute them together. In multi-sink scenarios, Blink planner will optimize the execution plan accordingly.

Compare the new and old API with a table to feel the changes of the new and old API:

SqlUpdate vs executeSql

Current Interface

New Interface

TEnv.sqlUpdate ("CREATE TABLE...")

TableResult result = tEnv.executeSql ("CREATE TABLE...")

TEnv.sqlUpdate ("INSERT INTO... SELECT...")

TEnv.execute ("test")

TableResult result = tEnv.executeSql ("INSERT INTO... SELECT...")

Execute vs createStatementSet

Current InterfaceNew InterfacetEnv.sqlUpdate ("insert into xx...") tEnv.sqlUpdate ("insert into yy...") tEnv.execute ("test") StatementSet ss = tEnv.createStatementSet (); ss.addInsertSql ("insert into xx..."); ss.addInsertSql ("insert into yy..."); TableResult result = ss.execute ()

TEnv.insertInto ("sink1", table1) tEnv.insertInto ("sink2", table2) tEnv.execute ("test") StatementSet ss = tEnv.createStatementSet (); ss.addInsert ("sink1", table1); ss.addInsert ("sink2", table2); TableResult result = ss.execute () for details, see: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Hive syntax compatibility enhancement

Since 11 / 11, Flink SQL has separated the Hive parser module to be compatible with the syntax of Hive. At present, the syntax related to DB, Table, View and Function has been supported at the DDL level. Students with HiveCatalog,Hive can directly use the syntax of Hive to do related operations.

You need to set the correct Dialect before using the hive statement:

EnvironmentSettings settings = EnvironmentSettings.newInstance ()... build (); TableEnvironment tableEnv = TableEnvironment.create (settings); / / to use hive dialecttableEnv.getConfig () .setSqlDialect (SqlDialect.HIVE); / / use the hive catalogtableEnv.registerCatalog (hiveCatalog.getName (), hiveCatalog); tableEnv.useCatalog (hiveCatalog.getName ())

We can then use the syntax of Hive to perform some DDL, such as the most common table creation operations:

Create external table tbl1 (d decimal (10L0), ts timestamp) partitioned by (p string) location'% s'tblproperties ('K1s struct) v1'; create table tbl2 (s struct) stored as orc

Create table tbl3 (m map) partitioned by (p1 bigint, p2 tinyint) row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe'

Create table tbl4 (x int, y smallint) row format delimited fields terminated by'| 'lines terminated by'\ n'

Hive syntax compatibility for DQL is already planned, and version 1.12 will be compatible with more query syntax ~

For more information, please see https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html

More concise connector properties

1.11 re-standardize the attribute definition of connector, the new attribute key is more intuitive and concise, compared with the original attribute key, the main changes are as follows:

The type key,connector version information that uses connector as connector is directly put into value. For example, kafka of 0.11 removes redundant connector prefixes for kafka-0.11 and uses scan and sink prefixes to mark source and sink proprietary attributes format.type as format. At the same time, format's own attributes are prefixed with the value of format, for example, csv format's own attributes are prefixed with csv

For example, the 1.11 Kafka table is defined as follows:

CREATE TABLE kafkaTable (user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP (3)) WITH ('connector' =' kafka', 'topic' =' user_behavior', 'properties.bootstrap.servers' =' localhost:9092', 'properties.group.id' =' testGroup', 'format' =' csv', 'scan.startup.mode' =' earliest-offset')

For more information, please see https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

JDBC catalog

In previous versions, users could only create mirror tables of relational databases by displaying the creation of tables. Users need to manually track the schema of the Flink SQL table and the schema changes of the database. In 1.11, JDBC catalog SQL provides a JDBC catalog interface to a variety of external database systems, such as Postgres, MySQL, MariaDB, AWS Aurora, etc.

Currently, Flink has a built-in catalog implementation of Postgres, and configure JDBC catalog with the following code:

CREATE CATALOG mypg WITH ('type' =' jdbc', 'default-database' ='...', 'username' ='...', 'password' ='...', 'base-url' ='...')

USE CATALOG mypg

Users can also implement JDBCCatalog interface to customize the catalog of other databases.

Python UDF enhancement

Version 1.11 of py-flink provides many enhancements to python UDF, including the way DDL is defined, support for scalar vectorized python UDF, support for a full set of python UDF metrics definitions, and the definition of python UDF in SQL-CLI.

The python UDF1.10.0 version of the DDL definition introduces support for python UDF. But only the way python table api is supported. 1.11 provides a SQL DDL way to define python UDF, which can be used in Java/Scala table API and SQL-CLI scenarios.

For example, users can now define the use of python UDF by Java table API programs as follows:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment (); BatchTableEnvironment tEnv = BatchTableEnvironment.create (env)

TEnv.getConfig (). GetConfiguration (). SetString ("python.files", "/ home/my/test1.py"); tEnv.getConfig (). GetConfiguration (). SetString ("python.client.executable", "python3"); tEnv.sqlUpdate ("create temporary system function func1 as' test1.func1' language python"); Table table = tEnv.fromDataSet (env.fromElements ("1", "2", "3"). As ("str"). Select ("func1 (str)")

TEnv.toDataSet (table, String.class). Collect (); Vectorization supports vectorization Python UDF greatly improves the performance compared to ordinary functions. Users can use popular python libraries such as Pandas and Numpy to implement vectorized python UDF. Users only need to add an additional parameter udf_type= "pandas" to the decorator udf. For example, the following example shows how to define a vectorized Python scalar function and its application in python table api:

@ udf (input_types= [DataTypes.BIGINT (), DataTypes.BIGINT ()], result_type=DataTypes.BIGINT (), udf_type= "pandas") def add (I, j): return I + j

Table_env = BatchTableEnvironment.create (env)

# register the vectorized Python scalar functiontable_env.register_function ("add", add)

# use the vectorized Python scalar function in Python Table APImy_table.select ("add (bigint, bigint)")

# use the vectorized Python scalar function in SQL APItable_env.sql_query ("SELECT add (bigint, bigint) FROM MyTable")

This is the end of the article on "how to use Flink SQL". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report