Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the principle and use of Apache Flink CDC

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to understand the principle and use of Apache Flink CDC, I believe that many inexperienced people do not know what to do. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

CDC (Change Data Capture)

Flink added the feature of CDC in version 1.11, which is called change data capture for short. The name looks a bit messy, let's first look at the content of CDC from the previous data architecture.

The above is the previous mysql binlog log processing process, such as canal snooping binlog to write logs to kafka. While Apache Flink consumes Kakfa data in real time to realize the synchronization of mysql data or other contents. As a whole, the split can be divided into the following stages.

Mysql enables binlogcanal to synchronize binlog data writing to kafkaflink to read binlog data in kakfa for related business processing.

The overall processing link is longer, and more components are needed. Apache Flink CDC can be obtained directly from the database to binlog for downstream business calculation and analysis. To put it simply, the link will look like this.

In other words, the data is no longer synchronized with kafka through canal, while flink processes mysql data directly. The process of canal and kafka is saved.

Mysql-cdc and postgre-CDC are implemented in Flink 1.11, that is to say, in Flink 1.11, we can consume mysql,postgresql data directly through Flink for business processing.

Use the increments of the scene database data to synchronize the physical view and dimension table on the database table join other business processing. MySQL CDC operation practice

First, you need to make sure that the mysql database has binlog turned on. If it is not open, please refer to the relevant information to enable binlog. Self-built binlog is not enabled by default.

Source table DROP TABLE IF EXISTS `t_ test`

CREATE TABLE `t _ test` (

`id`int (11) NOT NULL AUTO_INCREMENT

`ip`varchar (255) DEFAULT NULL

`size` bigint (20) DEFAULT NULL

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=183 DEFAULT CHARSET=utf8mb4

Add mysql-cdc related dependencies

Com.alibaba.ververica

Flink-connector-mysql-cdc

1.1.0

Compile

The related code implements def main (args: Array [String]): Unit = {

Val envSetting = EnvironmentSettings.newInstance ()

.useBlinkPlanner ()

.inStreamingMode ()

.build ()

Val env = StreamExecutionEnvironment.getExecutionEnvironment

Val tableEnv = StreamTableEnvironment.create (env, envSetting)

Val sourceDDL =

"CREATE TABLE test_binlog (" +

"id INT NOT NULl," +

"ip STRING," +

"size INT" +

") WITH (" +

"'connector' =' mysql-cdc'," +

"'hostname' =' localhost'," +

"'port' =' 3306," +

"'username' =' root'," +

"'password' =' cain'," +

"'database-name' =' test'," +

"'table-name' =' tasking test'" +

")"

/ / output target table

Val sinkDDL =

"CREATE TABLE test_sink (\ n" +

"ip STRING,\ n" +

"countSum BIGINT,\ n" +

"PRIMARY KEY (ip) NOT ENFORCED\ n" +

") WITH (\ n" +

"'connector' =' print'\ n" +

")"

Val exeSQL =

"INSERT INTO test_sink" +

"SELECT ip, COUNT (1)" +

"FROM test_binlog" +

"GROUP BY ip"

TableEnv.executeSql (sourceDDL)

TableEnv.executeSql (sinkDDL)

Val result = tableEnv.executeSql (exeSQL)

Result.print ()

}

Start flink job and insert data INSERT INTO `test`.`t _ test` (`ip`, `size`) VALUES (UUID (), 1231231)

INSERT INTO `test`.`t _ test` (`ip`, `size`) VALUES (UUID (), 1231231)

INSERT INTO `test`.`t _ test` (`ip`, `size`) VALUES (UUID (), 1231231)

...

Insert data to see the result of flink processing directly in console

The Apache Flink CDC method replaces the previous canal+kafka node. The synchronization of mysql data is realized directly through sql.

After reading the above, have you mastered how to understand the principle and method of Apache Flink CDC? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report