In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "how to master Fink SQL quickly". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to master Fink SQL quickly.
1. Import the required dependency packages
Org.apache.flink flink-table-planner_2.12 1.10.1 org.apache.flink flink-table-api-scala-bridge_2.12 1.10.1 org.apache.flink flink-csv 1.10.1
Flink-table-planner:planner planner, which is the most important part of table API, provides a runtime environment and a planner; flink-table-api-scala-bridge:bridge bridge to generate program execution plans. It is mainly responsible for the connection support between table API and DataStream/DataSet API, which is divided into java and scala according to language.
The two dependencies here need to be added to run in the IDE environment; in the case of a production environment, planner is already available by default in the lib directory, and only bridge is needed.
Of course, if you want to use user-defined functions or connect to kafka, you need to have a SQL client, which is included in the flink-table-common.
2. The difference between two kinds of planner (old& blink)
Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community
Batch stream unification: Blink regards batch jobs as a special case of streaming. Therefore, blink does not support conversion between tables and DataSet, and batch jobs will not be converted to DataSet applications, but will be converted to DataStream programs like stream processing.
Because of batch flow unification, Blink planner does not support BatchTableSource, so it uses bounded
Blink planner only supports new directories, not deprecated ExternalCatalog.
The FilterableTableSource implementation of the old planner and Blink planner is not compatible. The old planner pushed PlannerExpressions down to filterableTableSource, while blink planner pushed Expressions down.
The string-based key configuration option applies only to Blink planner.
The implementation of PlannerConfig in the two planner is different.
Blink planner optimizes multiple sink in a single DAG (supported only on TableEnvironment and not on StreamTableEnvironment). The optimization of the old planner always places each sink in a new DAG, where all DAG are independent of each other.
The old planner does not support catalog statistics, but Blink planner does.
3. The concept of Table
TableEnvironment can register the directory Catalog and can be based on the Catalog registry. It maintains a map between Catalog-Table tables. The Table is specified by an identifier and consists of three parts: the Catalog name, the database (database) name, and the object name (table name). If no directory or database is specified, the current default value is used.
4. Connect to the file system (Csv format)
Connect the external system to the registry in Catalog, call tableEnv.connect () directly, and pass in a ConnectorDescriptor, that is, the connector descriptor. For connector of the file system, flink is already provided internally, which is called FileSystem ().
5. Test case (new)
Requirements: take a txt text file as an input stream to read data filter id is not equal to sensor_1 data implementation idea: first, we build a table env environment to read the data through the method provided by connect, and then set the table structure to register the data as a table for our data filtering (parsing using sql or stream processing)
Prepare data
Sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
Code implementation
Import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api. {DataTypes} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors. {Csv, FileSystem Schema} / * * @ Package * @ author Brother big data * @ date 21:22 on 2020-12-12 * @ version V1.0 * the first Flinksql test case * / object FlinkSqlTable {def main (args: Array [String]): Unit = {/ / build a running environment for running flow processing val env = StreamExecutionEnvironment.getExecutionEnvironment / / build table environment val tableEnv = StreamTableEnvironment.create (env) / / read data through connect tableEnv.connect (new FileSystem (). Path ("D:\\ D12\\ Flink\\ FlinkSql\\ src\\ main\\ resources\\ sensor.txt") .withFormat (new Csv ()) / / set the type .withSchema (new Schema () / / add meta information to the data .field ("id") DataTypes.STRING () .field ("time", DataTypes.BIGINT ()) .field ("temperature") DataTypes.DOUBLE ()) .createTemporaryTable ("inputTable") / / create a temporary table val resTable = tableEnv.from ("inputTable") .select ("*") .filter ('id= "sensor_1") / / query data var resSql = tableEnv.sqlQuery ("select * from inputTable where id='sensor_1'") / / convert data to stream for output resTable.toAppendStream [(String) Long, Double) .print ("resTable") resSql.toAppendStream [(String, Long, Double)] .print ("resSql") env.execute ("FlinkSqlWrodCount")}}
6. The function of TableEnvironment
Register for catalog
Register the table in the internal catalog
Execute SQL query
Register user-defined functions
Register user-defined functions
Save a reference to ExecutionEnvironment or StreamExecutionEnvironment
When creating a TableEnv, you can pass in an extra EnvironmentSettings or TableConfig parameter, which can be used to configure some of the features of the TableEnvironment.
7. Batch processing of stream creation in previous versions
7.1 previous version stream processing
Val settings = EnvironmentSettings.newInstance () .useOldPlanner () / use the old version of planner .inStreamingMode () / / StreamingMode. Build () val tableEnv = StreamTableEnvironment.create (env, settings)
7.2 batch processing of previous versions
Val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create (batchEnv)
7.3 blink version of the streaming environment
Val bsSettings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode (). Build () val bsTableEnv = StreamTableEnvironment.create (env, bsSettings)
Batch environment for version 7.4 blink
Val bbSettings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inBatchMode (). Build () val bbTableEnv = TableEnvironment.create (bbSettings) Thank you for your reading, the above is "how to quickly grasp the content of Fink SQL", after the study of this article, I believe you have a deeper understanding of how to quickly master Fink SQL, the specific use of the situation also needs to be verified by practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.