Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to extract data from Flink stream processing engine

2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)05/31 Report--

In this article, Xiaobian introduces in detail "how to extract data from Flink flow processing engine". The content is detailed, the steps are clear, and the details are handled properly. I hope this article "how to extract data from Flink flow processing engine" can help you solve your doubts.

1. CDC

CDC (Change Data Capture), in a broad sense, can be called CDC as long as it can capture data changes. But generally speaking, the CDC technology is mainly oriented to the changes in the database (including common mysql,Oracle, MongoDB, etc.). It is a technology used to capture data changes in the database.

2. Comparison of common CDC

The common ones include Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium and so on.

The CDC implementation technology of DataX,Sqoop and kettle is mainly based on query, which realizes batch requests by scheduling query jobs offline. This kind of operation mode can not guarantee the consistency of data, and the real-time performance is poor.

Flink CDC,Canal,Debezium and Oracle Goldengate are log-based CDC technologies. This technology uses the way of stream processing to process log data in real time, ensures the consistency of data, and provides real-time data for other services.

III. Flink CDC

In 2020, Flink cdc announced for the first time at the Flink forward conference, which was put forward by two bigwigs Jark Wu & Qingsheng Ren.

Flink CDC connector can capture all changes that occur in one or more tables. This mode usually has a pre-record and a post-record. Flink CDC connector can be used directly in Flink in unconstrained mode (streaming) without the need to use middleware such as kafka to transfer data.

IV. Databases supported by Flink CDC

PS:

Flink CDC 2.2 has just added four kinds of OceanBase,PolarDB-X,SqlServer,TiDB data source access, all of which support full and incremental integrated synchronization.

Up to now, FlinkCDC has supported 12 + data sources.

5. The FlinkCDC example implemented by Ali relies on introducing org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-table-api-java-bridge_$ {scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_$ {scala.binary.version} ${flink .version} com.alibaba.ververica flink-connector-mysql-cdc 1.4.0 mysql mysql-connector-java 8.0.28 com.alibaba fastjson 1.2.80 com.fasterxml.jackson.core jackson-core ${jackson.version} com.fasterxml.jackson.core jackson-databind ${jackson.version} com.fasterxml.jackson.module jackson-module-parameter-names ${jackson.version} based on tablepackage spendreport.cdc Import com.alibaba.fastjson.JSONObject;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import io.debezium.data.Envelope;import java.util.List;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation Import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord / * * @ author zhengwen * * / public class TestMySqlFlinkCDC {public static void main (String [] args) throws Exception {/ / 1. Create the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setParallelism (1); / / 2.Flink-CDC will store the location information of reading binlog in CK as a state. If you want to resume upload at breakpoint, you need to start the program from Checkpoint or Savepoint / / 2.1and do CK env.enableCheckpointing every 5 seconds (5000L). / / 2.2 specify the consistency semantics of CK env.getCheckpointConfig (). SetCheckpointingMode (CheckpointingMode.EXACTLY_ONCE); / / 2.3 set the task to keep the last CK data env.getCheckpointConfig (). EnableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); / / 2.4 specify the automatic restart policy env.setRestartStrategy (RestartStrategies.fixedDelayRestart (3,2000L)) from CK DebeziumSourceFunction sourceFunction = MySQLSource.builder () .hostname ("127.0.0.1") .serverTimeZone ("GMT+8") / / time zone error add this setting. Port (3306) .username ("root") .password ("123456") .databaseList ("wz") .t ableList ("wz.user_info") / Note that the table must write the library name. The table name is multiple, separated by .startupOptions (StartupOptions.initial ()) / Custom to json format. Serializer (new MyJsonDebeziumDeserializationSchema ()) / comes with string format serialization / / .scientiializer (new StringDebeziumDeserializationSchema ()) .build (); DataStreamSource streamSource = env.addSource (sourceFunction); / / TODO can keyBy, such as according to table or type, and then open the window processing / / 3. Print data streamSource.print (); / / streamSource.addSink (); output / / 4. Execute task env.execute ("flinkTableCDC");} private static class MyJsonDebeziumDeserializationSchema implements com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema {@ Override public void deserialize (SourceRecord sourceRecord, Collector collector) throws Exception {Struct value = (Struct) sourceRecord.value (); Struct source = value.getStruct ("source"); / / get the database name String db = source.getString ("db"); String table = source.getString ("table") / / get data type String type = Envelope.operationFor (sourceRecord). ToString (). ToLowerCase (); if (type.equals ("create")) {type = "insert";} JSONObject jsonObject = new JSONObject (); jsonObject.put ("database", db); jsonObject.put ("table", table); jsonObject.put ("type", type) / / get data data Struct after = value.getStruct ("after"); JSONObject dataJson = new JSONObject (); List fields = after.schema (). Fields (); for (Field field: fields) {String field_name = field.name (); Object fieldValue = after.get (field); dataJson.put (field_name, fieldValue);} jsonObject.put ("data", dataJson) Collector.collect (JSONObject.toJSONString (jsonObject));} @ Override public TypeInformation getProducedType () {return BasicTypeInfo.STRING_TYPE_INFO;}

Running effect

PS:

The addition, deletion and modification of the operation database will be triggered immediately.

Here is a custom serialization to json format string, and its own string serialization is also possible (you can try the printed content yourself)

Based on sqlpackage spendreport.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @ author zhengwen * * / public class TestMySqlFlinkCDC2 {public static void main (String [] args) throws Exception {/ / 1. Create the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setParallelism (1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create (env); / / 2. Create Source String connectorName = "mysql-cdc" for Flink-MySQL-CDC; String dbHostName = "127.0.0.1"; String dbPort = "3306"; String dbUsername = "root"; String dbPassword = "123456"; String dbDatabaseName = "wz"; String dbTableName = "user_info" String tableSql = "CREATE TABLE t_user_info (" + "id int,mobile varchar (20)," + "user_name varchar (30)," + "real_name varchar (60)," + "id_card varchar (20)," + "org_name varchar (100)," + "user_stars int," + "create_by int," / / + "create_time datetime "+" update_by int, "/ +" update_time datetime, "+" is_deleted int "+" WITH ("+" 'connector' =' "+ connectorName +"', "+" 'hostname' =' "+ dbHostName +"', "+" 'port' =' "+ dbPort +"', "+" 'username' =' "+ dbUsername +"'' "+" 'password' =' "+ dbPassword +"', "+" 'database-name' =' "+ dbDatabaseName +"', "+" 'table-name' =' "+ dbTableName +"'"+") " TableEnv.executeSql (tableSql); tableEnv.executeSql ("select * from t_user_info"). Print (); env.execute ();}}

Running effect:

Summary

Since it is based on log, then the configuration file of the database must enable the log function. Here, mysql needs to open the content.

Server-id=1

Log_bin=mysql-bin

Binlog_format=ROW # can only support lines at present

Expire_logs_days=30

Binlog_do_db=wz # if there are more than one binlog library here, write another line, never write it into use, separate it.

The real-time performance is indeed high, and the experience number is a hundred times higher than that of those automatic tasks.

The flow is really slippery.

After reading this, the article "how to extract data from Flink flow processing engine" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it yourself. If you want to know more about related articles, welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report