Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Tbale SQL and Flink JDBC Connector to read MYSQL data

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to use Tbale SQL and Flink JDBC connectors to read MYSQL data", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "how to use Tbale SQL and Flink JDBC connectors to read MYSQL data"!

Use Tbale&SQL and Flink JDBC connectors to read MYSQL data and use GROUP BY statements to group result sets based on one or more columns.

Sample environment

Java.version: 1.8.xflink.version: 1.11.1kafka:2.11

GroupToMysql.java

Package com.flink.examples.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row Import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/** * @ Description uses Tbale&SQL and Flink JDBC connectors to read MYSQL data and uses GROUP BY statements to group result sets according to one or more columns. * / public class GroupToMysql {/ * * official reference: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html Partition scanning to speed up data reading in parallel Source task instances, Flink provides partition scanning for JDBC tables. Scan.partition.column: the column name used to partition the input. Scan.partition.num: number of partitions. Scan.partition.lower-bound: the minimum value of the first partition. Scan.partition.upper-bound: the maximum of the last partition. * / flink-jdbc-1.11.1, all attribute names are defined in the JdbcTableSourceSinkFactory factory class static String table_sql = "CREATE TABLE my_users (\ n" + "id BIGINT,\ n" + "name STRING,\ n" + "age INT,\ n" + "status INT") \ n "+" PRIMARY KEY (id) NOT ENFORCED\ n "+") WITH (\ n "+" 'connector.type' =' jdbc',\ n "+" 'connector.url' =' jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8' \ n "+" 'connector.driver' =' com.mysql.jdbc.Driver',\ n "+" 'connector.table' =' users',\ n "+" 'connector.username' =' root' \ n "+" 'connector.password' =' password'\ n "+ / /" 'connector.read.fetch-size' =' 10'\ n "+") " Public static void main (String [] args) throws Exception {/ / build StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / set setParallelism parallelism env.setParallelism (1); / / build EnvironmentSettings and specify BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance (). UseBlinkPlanner (). InStreamingMode (). Build (); / build StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create (env, bsSettings) / / register the mysql data dimension table tEnv.executeSql (table_sql); / / Table table = avg (tEnv); / / Table table = count (tEnv); / / Table table = min (tEnv); Table table = max (tEnv); / / print field structure table.printSchema () / / ordinary query operation uses toAppendStream / / tEnv.toAppendStream (table, Row.class). Print (); / / group operation uses toRetractStream / / tEnv.toRetractStream (table, Row.class). Print (); / / table is converted to dataStream stream. The first parameter flag of Tuple2 is true for add to add a new stream, and false for retract represents the old record stream DataStream behaviorStream = tEnv.toRetractStream (table, Row.class) BehaviorStream.flatMap (new FlatMapFunction () {@ Override public void flatMap (Tuple2 value, Collector out) throws Exception {if (value.f0) {System.out.println (value.f1.toString ());}) .print (); env.execute () } / * avg gets the numerical average in a set of data streams * @ param tEnv * @ return * / public static Table avg (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,avg (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Avg (). As ("age1")); return table } / * count gets the sum of the rows of cumulative packets in a set of data streams * @ param tEnv * @ return * / public static Table count (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,count (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Count (). As ("age1")); return table } / * sum gets the sum of cumulative packets in a set of data streams * @ param tEnv * @ return * / public static Table sum (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,sum (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Sum (). As ("age1")); return table } / * min gets the minimum value in a set of data streams * @ param tEnv * @ return * / public static Table min (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,min (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Min (). As ("age1")); return table } / * max gets the maximum value in a set of data streams. Each time the data is added, it is calculated * @ param tEnv * @ return * / public static Table max (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,max (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Max (). As ("age1"); return table;}}

Build table SQL

CREATE TABLE `users` (`id` bigint (8) NOT NULL AUTO_INCREMENT, `name` varchar (40) DEFAULT NULL, `age` int (8) DEFAULT NULL, `status` tinyint (2) DEFAULT NULL, PRIMARY KEY (`id`) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

Print the result

Root |-- status: INT |-- age1: INT0,160,181,211,282,31 Thank you for your reading. The above is the content of "how to use Tbale SQL and Flink JDBC Connector to read MYSQL data". After the study of this article, I believe you have a deeper understanding of how to use Tbale SQL and Flink JDBC Connector to read MYSQL data, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report