In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use Tbale SQL and Flink JDBC connectors to read MYSQL data", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "how to use Tbale SQL and Flink JDBC connectors to read MYSQL data"!
Use Tbale&SQL and Flink JDBC connectors to read MYSQL data and use GROUP BY statements to group result sets based on one or more columns.
Sample environment
Java.version: 1.8.xflink.version: 1.11.1kafka:2.11
GroupToMysql.java
Package com.flink.examples.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row Import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/** * @ Description uses Tbale&SQL and Flink JDBC connectors to read MYSQL data and uses GROUP BY statements to group result sets according to one or more columns. * / public class GroupToMysql {/ * * official reference: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html Partition scanning to speed up data reading in parallel Source task instances, Flink provides partition scanning for JDBC tables. Scan.partition.column: the column name used to partition the input. Scan.partition.num: number of partitions. Scan.partition.lower-bound: the minimum value of the first partition. Scan.partition.upper-bound: the maximum of the last partition. * / flink-jdbc-1.11.1, all attribute names are defined in the JdbcTableSourceSinkFactory factory class static String table_sql = "CREATE TABLE my_users (\ n" + "id BIGINT,\ n" + "name STRING,\ n" + "age INT,\ n" + "status INT") \ n "+" PRIMARY KEY (id) NOT ENFORCED\ n "+") WITH (\ n "+" 'connector.type' =' jdbc',\ n "+" 'connector.url' =' jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8' \ n "+" 'connector.driver' =' com.mysql.jdbc.Driver',\ n "+" 'connector.table' =' users',\ n "+" 'connector.username' =' root' \ n "+" 'connector.password' =' password'\ n "+ / /" 'connector.read.fetch-size' =' 10'\ n "+") " Public static void main (String [] args) throws Exception {/ / build StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / set setParallelism parallelism env.setParallelism (1); / / build EnvironmentSettings and specify BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance (). UseBlinkPlanner (). InStreamingMode (). Build (); / build StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create (env, bsSettings) / / register the mysql data dimension table tEnv.executeSql (table_sql); / / Table table = avg (tEnv); / / Table table = count (tEnv); / / Table table = min (tEnv); Table table = max (tEnv); / / print field structure table.printSchema () / / ordinary query operation uses toAppendStream / / tEnv.toAppendStream (table, Row.class). Print (); / / group operation uses toRetractStream / / tEnv.toRetractStream (table, Row.class). Print (); / / table is converted to dataStream stream. The first parameter flag of Tuple2 is true for add to add a new stream, and false for retract represents the old record stream DataStream behaviorStream = tEnv.toRetractStream (table, Row.class) BehaviorStream.flatMap (new FlatMapFunction () {@ Override public void flatMap (Tuple2 value, Collector out) throws Exception {if (value.f0) {System.out.println (value.f1.toString ());}) .print (); env.execute () } / * avg gets the numerical average in a set of data streams * @ param tEnv * @ return * / public static Table avg (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,avg (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Avg (). As ("age1")); return table } / * count gets the sum of the rows of cumulative packets in a set of data streams * @ param tEnv * @ return * / public static Table count (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,count (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Count (). As ("age1")); return table } / * sum gets the sum of cumulative packets in a set of data streams * @ param tEnv * @ return * / public static Table sum (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,sum (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Sum (). As ("age1")); return table } / * min gets the minimum value in a set of data streams * @ param tEnv * @ return * / public static Table min (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,min (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Min (). As ("age1")); return table } / * max gets the maximum value in a set of data streams. Each time the data is added, it is calculated * @ param tEnv * @ return * / public static Table max (StreamTableEnvironment tEnv) {/ / first: execute SQL String sql = "select status,max (age) as age1 from my_users group by status"; / / Table table = tEnv.sqlQuery (sql) / / second: assemble the execution statement Table table = tEnv.from ("my_users"). GroupBy ($("status")). Select ($("status"), $("age"). Max (). As ("age1"); return table;}}
Build table SQL
CREATE TABLE `users` (`id` bigint (8) NOT NULL AUTO_INCREMENT, `name` varchar (40) DEFAULT NULL, `age` int (8) DEFAULT NULL, `status` tinyint (2) DEFAULT NULL, PRIMARY KEY (`id`) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
Print the result
Root |-- status: INT |-- age1: INT0,160,181,211,282,31 Thank you for your reading. The above is the content of "how to use Tbale SQL and Flink JDBC Connector to read MYSQL data". After the study of this article, I believe you have a deeper understanding of how to use Tbale SQL and Flink JDBC Connector to read MYSQL data, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.