In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "Flink Connectors how to connect MySql", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "Flink Connectors how to connect MySql" bar!
Connect to a Mysql data source by using a Flink DataStream Connectors data stream connector and provide data stream input and output operations based on JDBC
Sample environment
Java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x
Data stream input
DataStreamSource.java
Package com.flink.examples.mysql;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.java.io.jdbc.JDBCOptions;import org.apache.flink.api.java.io.jdbc.JDBCTableSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema Import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @ Description outputs data query from mysql table to DataStream stream * / public class DataStreamSource {/ * official document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html * / public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () StreamTableEnvironment tEnv = StreamTableEnvironment.create (env); / / query sql String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user" / / set table view field and type TableSchema tableSchema = TableSchema.builder () .field ("id", DataTypes.INT ()) .field ("name", DataTypes.STRING ()) .field ("age", DataTypes.INT ()) .field ("sex", DataTypes.INT ()) .field ("address") DataTypes.STRING () / / .field ("createTime", DataTypes.TIMESTAMP ()) .field ("createTimeSeries", DataTypes.BIGINT ()) .build () / / configure the jdbc data source option JDBCOptions jdbcOptions = JDBCOptions.builder () .setDriverName (MysqlConfig.DRIVER_CLASS) .setDBUrl (MysqlConfig.SOURCE_DRIVER_URL) .setUsername (MysqlConfig.SOURCE_USER) .setPassword (MysqlConfig.SOURCE_PASSWORD) .setTableName ("t_user") .build () JDBCTableSource jdbcTableSource = JDBCTableSource.builder (). SetOptions (jdbcOptions) .setSchema (tableSchema). Build (); / / register the data source in the tableEnv view student tEnv.registerTableSource ("t_user", jdbcTableSource); Table table = tEnv.sqlQuery (sql); DataStream sourceStream = tEnv.toAppendStream (table, TUser.class); sourceStream.map ((t)-> new Gson (). ToJson (t)). Print () Env.execute ("flink mysql source");}}
Data stream output
DataStreamSink.java
Package com.flink.examples.mysql;import com.flink.examples.TUser;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment Import org.apache.flink.table.sinks.TableSink;import static org.apache.flink.table.api.Expressions.$ / * * @ Description inserts the DataStream data stream into the mysql table * / public class DataStreamSink {/ * official document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html * / public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setParallelism (1); env.enableCheckpointing (2000) StreamTableEnvironment tEnv = StreamTableEnvironment.create (env); / / query sql String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values"; / / encapsulated data TUser user = new TUser (); user.setId (0); user.setName ("zhao1"); user.setAge (22); user.setSex (1) User.setAddress ("CN"); user.setCreateTimeSeries (System.currentTimeMillis ()); DataStream sourceStream = env.fromElements (user); / / get data from DataStream / / Expression id = ExpressionParser.parse_Expression ("id"); / / Expression name = ExpressionParser.parse_Expression ("name"); / / Expression age = ExpressionParser.parse_Expression ("age") / Expression sex = ExpressionParser.parse_Expression ("sex"); / / Expression address = ExpressionParser.parse_Expression ("address"); / / Expression createTimeSeries = ExpressionParser.parse_Expression ("createTimeSeries"); / / Table table = tEnv.fromDataStream (sourceStream, id, name, age, sex, address, createTimeSeries) Table table = tEnv.fromDataStream (sourceStream,$ ("id"), $("name"), $("age"), $("sex"), $("address"), $("createTimeSeries")) / / output to mysql / / set table view field and type TableSchema tableSchema = TableSchema.builder () .field ("id", DataTypes.INT ()) .field ("name", DataTypes.STRING ()) .field ("age", DataTypes.INT ()) .field ("sex") DataTypes.INT () .field ("address", DataTypes.STRING ()) / / .field ("createTime", DataTypes.TIMESTAMP ()) .field ("createTimeSeries", DataTypes.BIGINT ()) .build () / / set sink output jdbc TableSink tableSink = JDBCAppendTableSink.builder () .setDrivername (MysqlConfig.DRIVER_CLASS) .setDBUrl (MysqlConfig.SOURCE_DRIVER_URL) .setUsername (MysqlConfig.SOURCE_USER) .setPassword (MysqlConfig.SOURCE_PASSWORD) .setQuery (sql) .setParameterTypes (tableSchema.getFieldTypes) () .setBatchSize (100) .build () / / Register the data source to the tableEnv view result tEnv.registerTableSink ("result", tableSchema.getFieldNames (), tableSchema.getFieldTypes (), tableSink); / / register under the specified path, and then perform the insert operation table.executeInsert ("result");}}
Data source configuration class
MysqlConfig.java
Package com.flink.examples.mysql;/** * @ Description Mysql database connection configuration * / public class MysqlConfig {public final static String DRIVER_CLASS= "com.mysql.jdbc.Driver"; public final static String SOURCE_DRIVER_URL= "jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; public final static String SOURCE_USER= "root"; public final static String SOURCE_PASSWORD= "root";}
Data presentation
Thank you for your reading, the above is the content of "how to connect Flink Connectors to MySql", after the study of this article, I believe you have a deeper understanding of how to connect Flink Connectors to MySql, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.