In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
How to use Apache Flink to achieve custom Sink, I believe that many inexperienced people do not know what to do, so this paper summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.
The data sent by socket converts the String type into an object, and then saves the Java object to the Mysql database.
Create databases and tables
Create database imooc_flink;create table student (id int (11) NOT NULL AUTO_INCREMENT,name varchar (25), age int (10), primary key (id)
Import mysql dependencies:
Mysql mysql-connector-java 8.0.15
Create POJO Student
Package com.vincent.course05;public class Student {private int id; private String name; private int age; @ Override public String toString () {return "Student {" + "id=" + id + ", name='" + name +'\'+ ", age=" + age +'}';} public int getId () {return id } public void setId (int id) {this.id = id;} public String getName () {return name;} public void setName (String name) {this.name = name;} public int getAge () {return age;} public void setAge (int age) {this.age = age;}}
Then create a connection, SinkToMySQL.java
Package com.vincent.course05;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction {PreparedStatement ps; private Connection connection Establish a connection in the / * open () method, so that you don't have to establish a connection and release it every time you invoke * * @ param parameters * @ throws Exception * / @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); connection = getConnection (); String sql = "insert into student (id, name, age) values (?,?);" Ps = this.connection.prepareStatement (sql);} @ Override public void close () throws Exception {super.close (); / / close the connection and release the resource if (connection! = null) {connection.close ();} if (ps! = null) {ps.close () The invoke () method * * @ param value * @ param context * @ throws Exception * / @ Override public void invoke (Student value, Context context) throws Exception {/ / assemble the data is called once for each data insertion, and the insert operation ps.setInt (1, value.getId ()) is performed. Ps.setString (2, value.getName ()); ps.setInt (3, value.getAge ()); ps.executeUpdate ();} private static Connection getConnection () {Connection con = null; try {Class.forName ("com.mysql.cj.jdbc.Driver") Con = DriverManager.getConnection ("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");} catch (Exception e) {e.printStackTrace (); System.out.println ("- mysql get connection has exception, msg =" + e.getMessage ());} return con;}}
Main method:
Public static void main (String [] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource source = environment.socketTextStream ("192.168.152.45", 9999); SingleOutputStreamOperator studentStream = source.map (new MapFunction () {@ Override public Student map (String value) throws Exception {String [] splits = value.split (","); Student student = new Student () Student.setId (Integer.parseInt (splits [0])); student.setName (splits [1]); student.setAge (Integer.parseInt (splits [2])); return student;}}); studentStream.addSink (new SinkToMySQL ()); environment.execute ("JavaCustomSinkToMysql");}
To get data from socket, the data format is separated by commas, and enter:
Nc-lk 99991
Check the database. There is an extra piece of data in the database.
Mysql > select * from student;+----+ | id | name | age | +-+ | 1 | tom | 23 | +-+ 1 row in set (0.00 sec)
This makes it easy to use custom sink and write it to MySQL.
Summary:
Step 1: inheriting RichSinkFunction T is the type of object you want to write
Step 2: rewrite the method open/close lifecycle method, which is executed once per record in invoke
By default, the parallelism of the open method is not 1, which is related to the specific computer.
After reading the above, have you mastered how to use Apache Flink to implement custom Sink? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.