Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does Flink Connectors connect to MySql

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "Flink Connectors how to connect MySql", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "Flink Connectors how to connect MySql" bar!

Connect to a Mysql data source by using a Flink DataStream Connectors data stream connector and provide data stream input and output operations based on JDBC

Sample environment

Java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x

Data stream input

DataStreamSource.java

Package com.flink.examples.mysql;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.java.io.jdbc.JDBCOptions;import org.apache.flink.api.java.io.jdbc.JDBCTableSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema Import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @ Description outputs data query from mysql table to DataStream stream * / public class DataStreamSource {/ * official document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html * / public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () StreamTableEnvironment tEnv = StreamTableEnvironment.create (env); / / query sql String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user" / / set table view field and type TableSchema tableSchema = TableSchema.builder () .field ("id", DataTypes.INT ()) .field ("name", DataTypes.STRING ()) .field ("age", DataTypes.INT ()) .field ("sex", DataTypes.INT ()) .field ("address") DataTypes.STRING () / / .field ("createTime", DataTypes.TIMESTAMP ()) .field ("createTimeSeries", DataTypes.BIGINT ()) .build () / / configure the jdbc data source option JDBCOptions jdbcOptions = JDBCOptions.builder () .setDriverName (MysqlConfig.DRIVER_CLASS) .setDBUrl (MysqlConfig.SOURCE_DRIVER_URL) .setUsername (MysqlConfig.SOURCE_USER) .setPassword (MysqlConfig.SOURCE_PASSWORD) .setTableName ("t_user") .build () JDBCTableSource jdbcTableSource = JDBCTableSource.builder (). SetOptions (jdbcOptions) .setSchema (tableSchema). Build (); / / register the data source in the tableEnv view student tEnv.registerTableSource ("t_user", jdbcTableSource); Table table = tEnv.sqlQuery (sql); DataStream sourceStream = tEnv.toAppendStream (table, TUser.class); sourceStream.map ((t)-> new Gson (). ToJson (t)). Print () Env.execute ("flink mysql source");}}

Data stream output

DataStreamSink.java

Package com.flink.examples.mysql;import com.flink.examples.TUser;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment Import org.apache.flink.table.sinks.TableSink;import static org.apache.flink.table.api.Expressions.$ / * * @ Description inserts the DataStream data stream into the mysql table * / public class DataStreamSink {/ * official document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html * / public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setParallelism (1); env.enableCheckpointing (2000) StreamTableEnvironment tEnv = StreamTableEnvironment.create (env); / / query sql String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values"; / / encapsulated data TUser user = new TUser (); user.setId (0); user.setName ("zhao1"); user.setAge (22); user.setSex (1) User.setAddress ("CN"); user.setCreateTimeSeries (System.currentTimeMillis ()); DataStream sourceStream = env.fromElements (user); / / get data from DataStream / / Expression id = ExpressionParser.parse_Expression ("id"); / / Expression name = ExpressionParser.parse_Expression ("name"); / / Expression age = ExpressionParser.parse_Expression ("age") / Expression sex = ExpressionParser.parse_Expression ("sex"); / / Expression address = ExpressionParser.parse_Expression ("address"); / / Expression createTimeSeries = ExpressionParser.parse_Expression ("createTimeSeries"); / / Table table = tEnv.fromDataStream (sourceStream, id, name, age, sex, address, createTimeSeries) Table table = tEnv.fromDataStream (sourceStream,$ ("id"), $("name"), $("age"), $("sex"), $("address"), $("createTimeSeries")) / / output to mysql / / set table view field and type TableSchema tableSchema = TableSchema.builder () .field ("id", DataTypes.INT ()) .field ("name", DataTypes.STRING ()) .field ("age", DataTypes.INT ()) .field ("sex") DataTypes.INT () .field ("address", DataTypes.STRING ()) / / .field ("createTime", DataTypes.TIMESTAMP ()) .field ("createTimeSeries", DataTypes.BIGINT ()) .build () / / set sink output jdbc TableSink tableSink = JDBCAppendTableSink.builder () .setDrivername (MysqlConfig.DRIVER_CLASS) .setDBUrl (MysqlConfig.SOURCE_DRIVER_URL) .setUsername (MysqlConfig.SOURCE_USER) .setPassword (MysqlConfig.SOURCE_PASSWORD) .setQuery (sql) .setParameterTypes (tableSchema.getFieldTypes) () .setBatchSize (100) .build () / / Register the data source to the tableEnv view result tEnv.registerTableSink ("result", tableSchema.getFieldNames (), tableSchema.getFieldTypes (), tableSink); / / register under the specified path, and then perform the insert operation table.executeInsert ("result");}}

Data source configuration class

MysqlConfig.java

Package com.flink.examples.mysql;/** * @ Description Mysql database connection configuration * / public class MysqlConfig {public final static String DRIVER_CLASS= "com.mysql.jdbc.Driver"; public final static String SOURCE_DRIVER_URL= "jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; public final static String SOURCE_USER= "root"; public final static String SOURCE_PASSWORD= "root";}

Data presentation

Thank you for your reading, the above is the content of "how to connect Flink Connectors to MySql", after the study of this article, I believe you have a deeper understanding of how to connect Flink Connectors to MySql, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report