Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Flink implements Exactly-Once from Kafka to Mysql

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

I. background

Recently, Flink is used in the project to consume kafka messages and store the consumed messages in mysql, which seems to be a very simple requirement. There are many examples of flink consuming kafka on the Internet, but I have not seen any article that can solve the problem of repeated consumption. So I searched the official website of flink for the handling of such scenarios and found that the official website did not implement an example of Exactly-Once from flink to mysql. But the official website has a similar example to solve the end-to-end consumption problem. This off-the-shelf example is the FlinkKafkaProducer011 class, which ensures that the message sent to kafka through FlinkKafkaProducer011 is Exactly-Once, the main implementation is to inherit the TwoPhaseCommitSinkFunction class, about the role of this class can first see the previous article https://blog.51cto.com/simplelife/2401411.

Second, the thought of realization

To put it simply, the purpose of this class is to implement the methods of this class: beginTransaction, preCommit, commit, abort, to achieve the logic of preCommit pre-submission (pre-commit is carried out when the event is handled by its own logic, and the real (commit) submission is carried out after the pre-submission is successful. If the pre-submission fails, call the abort method to roll back the event), combined with flink's checkpoint mechanism to save the offset of partition in topic.

Let me give an example to illustrate the effect achieved: for example, checkpoint is carried out every 10 seconds, and messages in kafka are consumed in real time with FlinkKafkaConsumer011. After consuming and processing the messages, the database is pre-submitted. If there is no problem with the pre-submission, the real database insertion operation is performed 10 seconds later. If the insertion is successful, a checkpoint,flink will automatically record the consumed offset, and the data saved by checkpoint can be put into the hdfs. If there is a pre-submission error, such as an error at 5s, the Flink program will enter a continuous restart, and the restart policy can be set in the configuration. Of course, the next checkpoint will not be done. Checkpoint records the offset that was successfully consumed last time. The data consumed this time is successful, but failed in the pre-submission process. Note that the data is not really inserted at this time. Because the pre-commit (preCommit) failed, the commit process will not occur. After you have finished handling the exception data, restart the Flink program, and it will automatically continue to consume data from the last successful checkpoint to achieve the Kafka to Mysql Exactly-Once.

Third, three classes of concrete implementation code

1 、 StreamDemoKafka2Mysql.java

Package com.fwmagic.flink.streaming;import com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 Import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;/** * consumes kafka messages, sink (custom) to mysql, and ensures that kafka to mysql's Exactly-Once * / @ SuppressWarnings ("all") public class StreamDemoKafka2Mysql {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () / / set parallelism. To facilitate testing, check the order of messages. Set it to 1 here, which can be changed to multiple parallelism env.setParallelism (1); / / checkpoint setting / / start a checkpoint every 10 seconds [set checkpoint cycle] env.enableCheckpointing (10000) / / set the mode to: exactly_one, semantic env.getCheckpointConfig () .setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE) only once; / / ensure that there is a 1s interval between checkpoints [checkpoint minimum interval] env.getCheckpointConfig () .setMinPauseBetweenCheckpoints (1000); / / checkpoints must be completed within 10s, or be discarded [checkpoint timeout] env.getCheckpointConfig () .setCheckpointTimeout (10000) / / only one checkpoint is allowed at a time env.getCheckpointConfig (). SetMaxConcurrentCheckpoints (1); / / means that once the Flink program is cancel, the checkpoint data will be retained in order to restore to the specified checkpoint / / env.getCheckpointConfig () .enableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) according to the actual needs; / / set statebackend to save the checkpoint on the hdfs, which is saved in memory by default. Save it to the local env.setStateBackend (new FsStateBackend ("file:///Users/temp/cp/")); / / set kafka consumption parameter Properties props = new Properties (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG," hd1:9092,hd2:9092,hd3:9092 "); props.put (ConsumerConfig.GROUP_ID_CONFIG," flink-consumer-group1 ")) / / kafka partition automatic discovery cycle props.put (FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000"); / * SimpleStringSchema can get kafka messages, JSONKeyValueDeserializationSchema can obtain key,value,metadata:topic,partition,offset and other information of all messages * / FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011 (topic, new SimpleStringSchema (), props); FlinkKafkaConsumer011 kafkaConsumer011 = new FlinkKafkaConsumer011 ("demo123", new JSONKeyValueDeserializationSchema (true), props) / / join kafka data source DataStreamSource streamSource = env.addSource (kafkaConsumer011); / / transfer data to downstream streamSource.addSink (new MySqlTwoPhaseCommitSink ()) .name ("MySqlTwoPhaseCommitSink"); / / trigger execution of env.execute (StreamDemoKafka2Mysql.class.getName ());}}

2 、 MySqlTwoPhaseCommitSink.java

Package com.fwmagic.flink.sink;import com.fwmagic.flink.util.DBConnectUtil;import org.apache.flink.api.common.ExecutionConfig;import org.apache.flink.api.common.typeutils.base.VoidSerializer;import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.Timestamp Import java.text.SimpleDateFormat;import java.util.Date;/** * Custom kafka to mysql, inherit TwoPhaseCommitSinkFunction, and implement two-phase commit. * function: ensure kafak to mysql Exactly-Once * / public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {public MySqlTwoPhaseCommitSink () {super (new KryoSerializer (Connection.class, new ExecutionConfig ()), VoidSerializer.INSTANCE) } / * perform data storage operations * @ param connection * @ param objectNode * @ param context * @ throws Exception * / @ Override protected void invoke (Connection connection, ObjectNode objectNode, Context context) throws Exception {System.err.println ("start invoke."); String date = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .format (new Date ()) System.err.println ("= = > date:" + date + "" + objectNode); String value = objectNode.get ("value"). ToString (); String sql = "insert into `ttest` (`value`, `insert_ time`) values (?)"; PreparedStatement ps = connection.prepareStatement (sql); ps.setString (1, value); ps.setTimestamp (2, new Timestamp (System.currentTimeMillis () / / execute the insert statement ps.execute (); / / manually create an exception if (Integer.parseInt (value) = = 15) System.out.println (1bank 0) } / * obtain the connection and enable manual submission of things (in the getConnection method) * @ return * @ throws Exception * / @ Override protected Connection beginTransaction () throws Exception {String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true"; Connection connection = DBConnectUtil.getConnection (url, "root", "123456") System.err.println ("start beginTransaction." + connection); return connection;} / * pre-submitted logic in the invoke method * @ param connection * @ throws Exception * / @ Override protected void preCommit (Connection connection) throws Exception {System.err.println ("start preCommit." + connection) } / * submit things if invoke executes normally * @ param connection * / @ Override protected void commit (Connection connection) {System.err.println ("start commit." + connection); DBConnectUtil.commit (connection);} @ Override protected void recoverAndCommit (Connection connection) {System.err.println ("start recoverAndCommit." + connection) } @ Override protected void recoverAndAbort (Connection connection) {System.err.println ("start abort recoverAndAbort." + connection);} / * rolls back things if invoke executes an exception, and the next checkpoint operation will not perform * @ param connection * / @ Override protected void abort (Connection connection) {System.err.println ("start abort rollback." + connection) DBConnectUtil.rollback (connection);}}

3 、 DBConnectUtil.java

Package com.fwmagic.flink.util;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;public class DBConnectUtil {/ * get a connection * * @ param url * @ param user * @ param password * @ return * @ throws SQLException * / public static Connection getConnection (String url, String user, String password) throws SQLException {Connection conn = null Try {Class.forName ("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace ();} conn = DriverManager.getConnection (url, user, password); / / set manual submission conn.setAutoCommit (false); return conn } / * submit transaction * / public static void commit (Connection conn) {if (conn! = null) {try {conn.commit ();} catch (SQLException e) {e.printStackTrace ();} finally {close (conn) * * @ param conn * / public static void rollback (Connection conn) {if (conn! = null) {try {conn.rollback ();} catch (SQLException e) {e.printStackTrace () } finally {close (conn);}} / * * close connection * * @ param conn * / public static void close (Connection conn) {if (conn! = null) {try {conn.close () } catch (SQLException e) {e.printStackTrace ();}

Fourth, code testing

In order to facilitate sending messages, I use a scheduled task to send a number per second, 1: 16. Before sending to the number 15, I should have done checkpoint once, and it is almost time for the second checkpoint. The consumption data of the first checkpoint will be successfully inserted into the database. When consumption reaches the number 15, I manually create an exception. At this time, there should only be data from the commit after the first checkpoint in the database. The data of the second checkpoint will not be inserted into the database (because the pre-submission has failed and the actual submission will not be made), the log information of my experiment:

Start invoke.=== > date:2019-05-28 18:36:50 {"value": 1, "metadata": {"offset": 892, "topic": "gaga", "partition": 0}} start invoke.=== > date:2019-05-28 18:36:51 {"value": 2, "metadata": {"offset": 887, "topic": "gaga" "partition": 2}} start invoke.=== > date:2019-05-28 18:36:52 {"value": 3, "metadata": {"offset": 889," topic ":" gaga "," partition ": 1}} start invoke.=== > date:2019-05-28 18:36:53 {" value ": 4," metadata ": {" offset ": 893," topic ":" gaga " "partition": 0}} start invoke.=== > date:2019-05-28 18:36:54 {"value": 5, "metadata": {"offset": 888," topic ":" gaga "," partition ": 2}} start invoke.=== > date:2019-05-28 18:36:55 {" value ": 6," metadata ": {" offset ": 890," topic ":" gaga " "partition": 1}} start invoke.=== > date:2019-05-28 18:36:56 {"value": 7, "metadata": {"offset": 894, "topic": "gaga", "partition": 0}} start invoke.=== > date:2019-05-28 18:36:57 {"value": 8, "metadata": {"offset": 889, "topic": "gaga" "partition": 2}} start preCommit.start beginTransaction.start commit.com.mysql.jdbc.JDBC4Connection@3c5ad420start invoke.=== > date:2019-05-28 18:36:58 {"value": 9, "metadata": {"offset": 891, "topic": "gaga", "partition": 1}} start invoke.=== > date:2019-05-28 18:36:59 {"value": 10 "metadata": {"offset": 895, "topic": "gaga", "partition": 0}} start invoke.=== > date:2019-05-28 18:37:00 {"value": 11, "metadata": {"offset": "gaga", "partition": 2}} start invoke.=== > date:2019-05-28 18:37:01 {"value": 12, "metadata": {"offset": 892, "topic": "gaga" "partition": 1}} start invoke.=== > date:2019-05-28 18:37:02 {"value": 13, "metadata": {"offset": 896, "topic": "gaga", "partition": 0}} start invoke.=== > date:2019-05-28 18:37:03 {"value": 14, "metadata": {"offset": 891, "topic": "gaga" "partition": 2}} start invoke.=== > date:2019-05-28 18:37:04 {"value": 15, "metadata": {"offset": 893, "topic": "gaga" "partition": 1}} start abort rollback.com.mysql.jdbc.JDBC4Connection@5f2afc1bstart commit.com.mysql.jdbc.JDBC4Connection@71ed09ajava.lang.ArithmeticException: / by zero at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke (MySqlTwoPhaseCommitSink.java:36) at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke (MySqlTwoPhaseCommitSink.java:16) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:534) at org .apache.flink.streaming.api.operators.AbstractStreamOperator $CountingOutput.collect (AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect (StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp (StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals .AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:91) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:156) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:711) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java : 93) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run (Task.java:711) at java.lang.Thread.run (Thread.java:748)

You can see from the log that the first commit time is at 18:36:57 on 2019-05-28, and the data is successfully stored to 1-8. When the second consumption reaches the number 15, the submission fails, the last line of the log is rolled back, the connection is closed, and then the conmit also fails. The consumed data 9-15 will not be inserted into the database, and checkpoint will not do it at this time. Checkpoint still saves the offset data after the last successful consumption.

Database table: t_test

CREATE TABLE `tTest` (`id` bigint (20) NOT NULL AUTO_INCREMENT, `value` varchar (255) DEFAULT NULL, `insert_ time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

Data in the table:

5. Complete code address: https://gitee.com/fang_wei/fwmagic-flink

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report