Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Read and write Mysql for Flink batch processing

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Add Maven coordinates mysql mysql-connector-java 5.1.48 org.apache.flink flink-jdbc_2.12 1.8.02, create table CREATE TABLE `temp` (`id` bigint (20) NOT NULL AUTO_INCREMENT, `name` varchar (255) DEFAULT NULL, `time` varchar (255) DEFAULT NULL, `type` bigint (20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf83, Show Codepackage com.fwmagic.flink.batch Import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple3 Import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.types.Row;import java.util.concurrent.TimeUnit;public class BatchDemoOperatorMysql {public static void main (String [] args) throws Exception {String driverClass = "com.mysql.jdbc.Driver"; String dbUrl = "jdbc:mysql://localhost:3306/test"; String userNmae = "root"; String passWord = "123456" String sql = "insert into test.temp (name,time,type) values"; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment () / * contents of the document: * Guan Yu, 2019-10-14 0000 String filePath 01Magne1 * Zhang Fei, 2019-10-14 0000GRV 02jue 2 * Zhao Yun, 2019-10-14 0000GRV 0000JI 03jue 3 * / String filePath = "/ Users/temp/data.csv" / / read the contents of the csv file and convert it to the Row object DataSet outputData = env.readCsvFile (filePath) .fieldDelimiter (",") .types (String.class, String.class, Long.class) .map (new MapFunction () {@ Override public Row map (Tuple3 t) throws Exception {Row row = new Row (3); row.setField (0, t.f0.getBytes ("UTF-8")) Row.setField (1, t.f1.getBytes ("UTF-8")); row.setField (2, t.f2.longValue ()); return row;}}) / / write the Row object to mysql outputData.output (JDBCOutputFormat.buildJDBCOutputFormat () .setDrivername (driverClass) .setDBUrl (dbUrl) .setUsername (userNmae) .setPassword (passWord) .setQuery (sql) .finish ()); / / trigger execution of env.execute ("insert data to mysql") System.out.println ("mysql write successful!") ; TimeUnit.SECONDS.sleep (6) / / read mysql DataSource dataSource = env.createInput (JDBCInputFormat.buildJDBCInputFormat () .setDrivername (driverClass) .setDBUrl (dbUrl) .setUsername (userNmae) .setPassword (passWord) .setQuery ("select * from temp") .setRowTypeInfo (new RowTypeInfo (BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) BasicTypeInfo.LONG_TYPE_INFO)) .resume () / / get data and print dataSource.map (new MapFunction () {@ Override public String map (Row value) throws Exception {System.out.println (value); return value.toString ();}}) .print ();}} 4. The DataSet generic type for note data writing to mysql requires row and conversion. The result of data reading is also of row type, which cannot be directly print and needs to be converted; env.execute () must be added after data writing to trigger task execution; if Chinese is involved, it needs to be converted to UTF-8, otherwise garbled codes will appear in the database.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report