In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "how to call FlinkSQL API". In daily operation, I believe many people have doubts about how to call FlinkSQL API. The editor consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to call FlinkSQL API". Next, please follow the editor to study!
The background of FlinkSQL
Flink SQL is a set of development language designed by Flink real-time computing to simplify the computing model and reduce the threshold for users to use real-time computing. It conforms to the standard SQL semantics.
Flink SQL is a user-oriented API layer. In our traditional streaming computing field, such as Storm and Spark Streaming, some Function or Datastream API are provided. Users write business logic through Java or Scala. Although this approach is flexible, it has some shortcomings, such as having a certain threshold and difficult to tune. With the continuous update of the version, there are many incompatibilities in API.
In this context, there is no doubt that SQL has become our best choice. We chose SQL as the core API because it has several very important features:
SQL is a set language. Users only need to express their needs clearly and do not need to know the specific methods.
SQL can be optimized with a variety of query optimizers that can translate the optimal execution plan for SQL.
SQL is easy to understand, people in different industries and fields understand it, and the cost of learning is low.
SQL is very stable, and SQL itself has changed little in the more than 30 years of database history.
Stream and batch unity, Flink underlying Runtime itself is a stream and batch unified engine, and SQL can achieve API layer stream and batch unity.
Overall introduction 1 what are Table API and Flink SQL?
Flink itself is a unified processing framework for batch flow, so Table API and SQL are the upper processing API for batch flow unification. At present, the function is not perfect and is in the active development stage.
Table API is a set of query API embedded in Java and Scala languages that allows us to combine queries from relational operators (such as select, filter, and join) in a very intuitive way. For Flink SQL, you can write SQL directly in the code to achieve some Query operations. Flink's SQL support is based on Apache Calcite (Apache open source SQL parsing tool), which implements the SQL standard.
Regardless of whether the input is batch input or streaming input, in both sets of API, the specified query has the same semantics and gets the same result.
2 dependencies that need to be introduced
Table API and SQL need to introduce two dependencies: planner and bridge
Org.apache.flinkflink-table-planner_2.111.10.0org.apache.flinkflink-table-api-scala-bridge_2.111.10.0
Where:
The flink-table-planner:planner planner, the most important part of table API, provides the runtime environment and the planner that generates the program execution plan
Flink-table-api-scala-bridge:bridge bridge, mainly responsible for table API and DataStream/DataSet API connection support, divided into java and scala according to language
The two dependencies here need to be added to run in the IDE environment; in the case of a production environment, planner is already available by default in the lib directory, and only bridge is needed.
Of course, if you want to use user-defined functions or connect to kafka, you need to have a SQL client, which is included in the flink-table-common.
3 the difference between two kinds of planner (old & blink)
1. Batch stream unification: Blink regards batch processing as a special case of streaming processing. Therefore, blink does not support conversion between tables and DataSet, and batch jobs will not be converted to DataSet applications, but will be converted to DataStream programs like stream processing.
2. Because the batch stream is unified, Blink planner does not support BatchTableSource, but uses bounded StreamTableSource instead.
3. Blink planner only supports new directories, not deprecated ExternalCatalog.
4. The FilterableTableSource implementation of the old planner and Blink planner is not compatible. The old planner pushed PlannerExpressions down to filterableTableSource, while blink planner pushed Expressions down.
5. The string-based key configuration option is only applicable to Blink planner.
6. The implementation of PlannerConfig in the two planner is different.
7. Blink planner optimizes multiple sink in a single DAG (supported only on TableEnvironment, but not on StreamTableEnvironment). The optimization of the old planner always places each sink in a new DAG, where all DAG are independent of each other.
8. The old planner does not support directory statistics, but Blink planner does.
API call 1 basic program structure
The program structure of Table API and SQL is similar to that of streaming; it can also be thought of as a few steps: first create an execution environment, and then define source, transform, and sink.
The specific operation procedure is as follows:
Val tableEnv =. / / create the execution environment of the table / / create a table to read the data tableEnv.connect (.). CreateTemporaryTable ("inputTable") / / register a table to output the calculation results to tableEnv.connect (.). CreateTemporaryTable ("outputTable") / / through the Table API query operator to get a result table val result = tableEnv.from ("inputTable"). Select (...) / / query statements through SQL Get a result table val sqlResult = tableEnv.sqlQuery ("SELECT... FROM inputTable... ") / / write the result table to the output table result.insertInto (" outputTable ") 2 create the table environment
The easiest way to create a table environment is to call the create method to create it directly based on the flow processing execution environment:
Val tableEnv = StreamTableEnvironment.create (env)
Table environment (TableEnvironment) is the core concept of integrating Table API & SQL in flink. It is responsible for:
Register for catalog
Register the table in the internal catalog
Execute SQL query
Register user-defined functions
Convert DataStream or DataSet to a table
Save a reference to ExecutionEnvironment or StreamExecutionEnvironment
When you create a TableEnv, you can pass in an extra EnvironmentSettings or TableConfig parameter, which can be used to configure some of the features of TableEnvironment.
For example, configure an older version of streaming query (Flink-Streaming-Query):
Val settings = EnvironmentSettings.newInstance () .useOldPlanner () / use the old version of planner .inStreamingMode () / / StreamingMode. Build () val tableEnv = StreamTableEnvironment.create (env, settings)
Based on older versions of the batch environment (Flink-Batch-Query):
Val batchEnv = ExecutionEnvironment.getExecutionEnvironmentval batchTableEnv = BatchTableEnvironment.create (batchEnv)
Stream processing environment (Blink-Streaming-Query) based on blink version:
Val bsSettings = EnvironmentSettings.newInstance (). UseBlinkPlanner (). InStreamingMode (). Build () val bsTableEnv = StreamTableEnvironment.create (env, bsSettings)
Batch environment based on blink version (Blink-Batch-Query):
Val bbSettings = EnvironmentSettings.newInstance (). UseBlinkPlanner (). InBatchMode (). Build () val bbTableEnv = TableEnvironment.create (bbSettings) 3 the concept of registry table 3.1 (Table) in Catalog
TableEnvironment can register the directory Catalog and can be based on the Catalog registry. It maintains a map between Catalog-Table tables.
The Table is specified by an "identifier" and consists of three parts: the Catalog name, the database (database) name, and the object name (table name). If no directory or database is specified, the current default value is used.
Tables can be regular (Table, table) or virtual (View, view). Regular tables (Table) can generally be used to describe external data, such as files, database tables, or message queues, or can be converted directly from DataStream. A view can be created from an existing table, usually as a result of a table API or SQL query.
3.2 Connect to the file system (Csv format)
Connect the external system to the registry in Catalog, call tableEnv.connect () directly, and pass in a ConnectorDescriptor, that is, the connector descriptor. For connector of the file system, flink is already provided internally, which is called FileSystem ().
The code is as follows:
TableEnv.connect (new FileSystem (). Path ("sensor.txt")) / / define the table data source External connection .withFormat (new OldCsv ()) / defines the formatting method after reading data from an external system. WithSchema (new Schema (). Field ("id", DataTypes.STRING ()) .field ("timestamp", DataTypes.BIGINT ()) .field ("temperature", DataTypes.DOUBLE ()) / / defines the table structure .createTemporaryTable ("inputTable") / / create a temporary table
This is an older version of the csv format descriptor. Because it is non-standard and not universal for interfacing with external systems, it will be deprecated and will be replaced by a new format descriptor that conforms to the RFC-4180 standard. The new descriptor is called Csv (), but flink does not provide it directly, so you need to introduce dependency flink-csv:
Org.apache.flinkflink-csv1.10.0
The code is very similar, just change the OldCsv in withFormat to Csv.
3.3 Connect to Kafka
Table API support is already available in version 1. 10 of kafka's connector flink-kafka-connector. We can pass a class called Kafka directly into the connect method, which is the descriptor ConnectorDescriptor of the kafka connector.
TableEnv.connect (new Kafka (). Version ("0.11") / / defines the version of kafka. Topic ("sensor") / / defines the topic.property ("zookeeper.connect", "localhost:2181") .property ("bootstrap.servers", "localhost:9092") .withFormat (new Csv ()) .withSchema (new Schema () .field ("id", DataTypes.STRING ()) .field ("timestamp", DataTypes.BIGINT ()) .field ("temperature"). DataTypes.DOUBLE ()) .createTemporaryTable ("kafkaInputTable")
Of course, you can also connect to external systems such as ElasticSearch, MySql, HBase, Hive and so on, and the implementation is basically similar. Interested partners can study it on their own, so I won't go into details here.
4 query of table
Through the above study, we have used the external system connector connector, we can read and write data, and register the table in the environment's Catalog. Then you can do query transformation on the table.
Flink provides us with two query methods: Table API and SQL.
4.1 invocation of Table API
Table API is a query API integrated into Scala and Java languages. Unlike SQL, Table API's queries are not represented as strings, but are invoked step by step in the host language.
Table API is based on the Table class that represents a "table" and provides a complete set of methods for handling operations, API. These methods return a new Table object that represents the result of applying the transformation operation to the input table. Some relational transformation operations can be composed of multiple method calls to form a chain call structure. For example, table.select (…) .filter (...) , where select (… ) indicates that the field specified in the table is selected, filter (…) Represents the filter criteria.
The implementation in the code is as follows:
Val sensorTable: Table = tableEnv.from ("inputTable") val resultTable: Table = senorTable.select ("id, temperature") .filter ("id = 'sensor_1'") 4.2 SQL query
Flink's SQL integration, based on ApacheCalcite, implements the SQL standard. In Flink, SQL query statements are defined with regular strings. The result of the SQL query is a new Table.
The code is implemented as follows:
Val resultSqlTable: Table = tableEnv.sqlQuery ("select id, temperature from inputTable where id = 'sensor_1'")
Or:
Val resultSqlTable: Table = tableEnv.sqlQuery ("" | select id, temperature | from inputTable | where id = 'sensor_1' ".stripMargin)
Of course, you can also add aggregation operations. For example, we can count the number of sensor temperature data and do count statistics:
Val aggResultTable = sensorTable.groupBy ('id) .select ('id, 'id.count as' count)
Implementation of SQL:
Val aggResultSqlTable = tableEnv.sqlQuery ("select id, count (id) as cnt from inputTable group by id")
Here the fields specified in the Table API are preceded by a single quotation mark', which is the writing of the Expression type defined in Table API, and can easily represent the fields in a table.
Fields can be enclosed directly in double quotes, or in half single quotation marks + field names. The latter form is generally used in future code.
4.5 convert DataStream to a table
Flink allows us to convert Table to DataStream: based on a DataStream, we can read the data source first, then map it into a sample class, and then convert it to Table. The column field (column fields) of Table is the field in the sample class, so you don't have to bother to define schema.
4.5.1 Code representation
The implementation in the code is very simple, just use tableEnv.fromDataStream () directly. By default, the field definitions in the converted Table schema and DataStream correspond to each other, or you can specify them separately.
This allows us to change the order of the fields, rename them, or just select certain fields, which is equivalent to doing a map operation (or Table API's select operation).
The code is as follows:
Val inputStream: DataStream [String] = env.readTextFile ("sensor.txt") val dataStream: DataStream [SensorReading] = inputStream .map (data = > {val dataArray = data.split (",") SensorReading (dataArray (0), dataArray (1). ToLong, dataArray (2) .toDouble) val sensorTable: Table = tableEnv.fromDataStreama (datStream) val sensorTable2 = tableEnv.fromDataStream (dataStream,'id, 'timestamp as' ts) 4.5.2 correspondence between data type and Table schema
In the example in the previous section, the correspondence between the data type in DataStream and the Schema of the table corresponds to the field name in the sample class (name-based mapping), so you can also rename it with as.
Another way to position-based mapping is to position-based mapping directly according to the position of the field, and in the corresponding process, you can directly specify the new field name.
Name-based correspondence:
Val sensorTable = tableEnv.fromDataStream (dataStream, 'timestamp as' ts,'id as' myId, 'temperature)
Location-based correspondence:
Val sensorTable = tableEnv.fromDataStream (dataStream, 'myId,' ts)
Flink's DataStream and DataSet API support multiple types.
Combined types, such as tuples (built-in Scala and Java tuples), POJO, Scala case classes, and Flink's Row types, allow for nested data structures with multiple fields that can be accessed in Table expressions. Other types are treated as atomic types.
For tuple types and atomic types, it is generally better to use position correspondence; if you do not want to use a name, it is also possible: tuple type, the default name is "_ 1", "_ 2", while the atomic type, the default name is "f0".
4.6 create a temporary view (Temporary View)
The first way to create temporary views is to convert them directly from DataStream. Similarly, you can convert the field directly, or you can specify the corresponding field at the time of the transformation.
The code is as follows:
TableEnv.createTemporaryView ("sensorView", dataStream) tableEnv.createTemporaryView ("sensorView", dataStream,'id, 'temperature,' timestamp as'ts)
In addition, of course, you can create views based on Table:
TableEnv.createTemporaryView ("sensorView", sensorTable)
The Schema for View and Table is exactly the same. In fact, in Table API, View and Table can be considered equivalent.
4.7 output table
The output of the table is achieved by writing data to TableSink. TableSink is a common interface that supports different file formats, storage databases, and message queues.
The most direct way to implement the output table is to write a Table to the registered TableSink through the Table.insertInto () method.
4.7.1 Export to Fil
The code is as follows:
/ / register the output table tableEnv.connect (new FileSystem (). Path ("... \\ resources\\ out.txt ") / / defines the connection to the file system. WithFormat (new Csv ()) / / defines the formatting method Csv format .withSchema (new Schema () .field ("id", DataTypes.STRING ()) .field ("temp", DataTypes.DOUBLE () / defines the table structure .createTemporaryTable ("outputTable") / / create temporary table resultSqlTable.insertInto ("outputTable") 4.7.2 update schema (Update Mode)
In the process of flow processing, the processing of tables is not as simple as traditionally defined.
For streaming queries (Streaming Queries), you need to declare how to perform the transformation between (dynamic) tables and external connectors. The type of message exchanged with an external system, specified by the update mode (update mode).
There are three update modes in Flink Table API:
Append mode (Append Mode)
In append mode, tables (dynamic tables) and external connectors exchange only Insert messages.
Recall mode (Retract Mode)
In recall mode, tables and external connectors exchange Add and Retract messages.
Where:
The Insert is encoded as adding a message
Delete (Delete) is encoded to recall the message.
The Update is encoded as a recall message for the updated line (previous line) and an add message for the updated line (new line).
In this mode, key cannot be defined, which is completely different from the upsert schema.
Upsert (update insert) mode
In Upsert mode, dynamic tables and external connectors exchange Upsert and Delete messages.
This pattern requires a unique key through which update messages can be delivered. In order to apply messages correctly, the external connector needs to know the properties of this unique key.
Both Insert and Update are encoded as Upsert messages
Delete (Delete) encode as Delete information
The main difference between this pattern and the Retract pattern is that Update operations are encoded with a single message, so they are more efficient.
4.7.3 output to Kafka
In addition to exporting to a file, you can also export to Kafka. We can combine the previous Kafka as input data to build a data pipeline, kafka in, kafka out.
The code is as follows:
/ output to kafkatableEnv.connect (new Kafka (). Version ("0.11"). Topic ("sinkTest"). Property ("zookeeper.connect", "localhost:2181"). Property ("bootstrap.servers", "localhost:9092") .withFormat (new Csv ()) .withSchema (new Schema (). Field ("id", DataTypes.STRING ()) .field ("temp") DataTypes.DOUBLE ()) .createTemporaryTable ("kafkaOutputTable") resultTable.insertInto ("kafkaOutputTable") 4.7.4 output to ElasticSearch
ElasticSearch's connector can be operated in upsert (update+insert, update insert) mode, so that UPSERT/DELETE messages can be exchanged with external systems using the key (key) defined by Query.
In addition, for append-only-only queries, connector can also operate in append mode, so that only insert messages can be exchanged with external systems.
Currently, only Json is the data format supported by es, and flink itself does not have corresponding support, so you need to introduce dependencies:
Org.apache.flinkflink-json1.10.0
The code is implemented as follows:
/ output to estableEnv.connect (new Elasticsearch (). Version ("6"). Host ("localhost", 9200, "http"). Index ("sensor"). DocumentType ("temp") .inUpsertMode () / specifies the Upsert mode .withFormat (new Json ()) .withSchema (new Schema (). Field ("id", DataTypes.STRING ()) .field ("count"). DataTypes.BIGINT ()) .createTemporaryTable ("esOutputTable") aggResultTable.insertInto ("esOutputTable") 4.7.5 output to MySql
Flink provides flink-jdbc connectors specifically for jdbc connections of Table API. We need to introduce dependencies first:
Org.apache.flinkflink-jdbc_2.111.10.0
The code implementation of the jdbc connection is special because there is no corresponding java/scala class to implement ConnectorDescriptor, so you cannot directly tableEnv.connect (). However, Flink SQL leaves an interface to execute DDL: tableEnv.sqlUpdate ()
For the creation table operation of jdbc, it is naturally suitable to write DDL directly to implement it, so our code can be written as follows:
/ / output to Mysqlval sinkDDL: String = "" | create table jdbcOutputTable (| id varchar (20) not null, | cnt bigint not null |) with (| 'connector.type' =' jdbc', | 'connector.url' =' jdbc:mysql://localhost:3306/test', | 'connector.table' =' sensor_count', | 'connector.driver' =' com.mysql.jdbc.Driver', | 'connector.username' =' root' | | 'connector.password' =' 123456' |) "" .stripMargintableEnv.sqlUpdate (sinkDDL) aggResultSqlTable.insertInto ("jdbcOutputTable") 4.7.6 convert the table to DataStream |
Tables can be converted to DataStream or DataSet. In this way, the custom stream or batch program can continue to run on the results of the Table API or SQL query.
When you convert a table to DataStream or DataSet, you need to specify the generated data type, that is, the data type to convert each row of the table. In general, the most convenient type of conversion is Row. Of course, because all the field types of the results are explicit, we often use tuple types to express them.
As the result of a streaming query, the table is updated dynamically. Therefore, to convert this dynamic query into a data stream, we also need to encode the update operation of the table, and then have different conversion patterns.
There are two modes from table to DataStream in Table API:
Append mode (Append Mode)
For scenarios where the table will only be changed by the Insert operation
Recall mode (Retract Mode)
For any scenario. Somewhat similar to the Retract schema in the update mode, there are only two types of operations: Insert and Delete.
The resulting data adds an identification bit of type Boolean (the first field returned) to indicate whether it is the new data (Insert) or the deleted data (old data, Delete).
The code is implemented as follows:
Val resultStream: DataStream [Row] = tableEnv.toAppendStream [Row] (resultTable) val aggResultStream: DataStream [(Boolean, (String, Long))] = tableEnv.toRetractStream [(String, Long)] (aggResultTable) resultStream.print ("result") aggResultStream.print ("aggResult")
Therefore, without an aggregation operation such as groupby, you can directly use toAppendStream to transform, while if there is an update operation after aggregation, you generally have to use toRetractDstream.
4.7.7 interpretation and implementation of Query
Table API provides a mechanism to Explain the logic of computed tables and optimize query plans. This is done through the TableEnvironment.explain (table) method or the TableEnvironment.explain () method.
The explain method returns a string describing the three plans:
Unoptimized logical query plan
Optimized logical query plan
Actual implementation of the plan
We can view the execution plan in the code:
Val explaination: String = tableEnv.explain (resultTable) println (explaination)
The process of interpretation and implementation of Query is generally the same as that of blink planner, but different. On the whole, the Query is represented as a logical query plan, and then explained in two steps:
Optimize query plan
Interpreted as a DataStream or DataSet program
The Blink version is batch uniform, so all Query will only be interpreted as DataStream programs; in addition, in the batch environment TableEnvironment, the Blink version will not be interpreted until tableEnv.execute () executes the call.
At this point, the study on "how to call FlinkSQL API" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.