In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "Flink CDC how to monitor MySQL table", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "Flink CDC how to monitor MySQL table" bar!
/ / Prospect Abstract: enable mysql binlog monitoring. (directory: C:\ ProgramData\ MySQL\ MySQL Server 5.6\ my.ini) ProgramData is a hidden directory. Note: binlog_format=ROW// creates TableEnvironmentEnvironmentSettings bsSettings for Blink Streaming = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build (); TableEnvironment bsTableEnv = TableEnvironment.create (bsSettings) / / create a table Connector uses mysql-cdcbsTableEnv.executeSql ("CREATE TABLE mysql_binlog" + "(id STRING," + "times STRING," + "temp STRING)" + "WITH" + "('connector' =' mysql-cdc'," + "'hostname' =' 127.0.0.1", "+" 'port' =' 3306 "," + "'username' =' root'," + "'password' =' 123456)," + "'database-name' =' test' "+" 'table-name' =' sersor_temp' "+")) / print console bsTableEnv.executeSql ("CREATE TABLE sink_table" + "(id STRING," + "times STRING," + "temp DOUBLE)" + "WITH" + "('connector' =' print'" + ")"); / / docking the CDC data source with the downstream data table bsTableEnv.executeSql ("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog") BsTableEnv.executeSql ("CREATE TABLE sink_kafka_table" + "(id STRING," + "times STRING," + "temp DOUBLE" + ") WITH (" + "'connector' =' kafka'," + "'topic' =' test_mysql_binlog'," + "'scan.startup.mode' =' earliest-offset'," + "'properties.group.id' =' testGroup'," + "'properties.bootstrap.servers' =' node2:9092'") "+" 'format' =' canal-json' "+")) / join CDC data with kafka table bsTableEnv.executeSql ("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog"); bsTableEnv.executeSql ("CREATE TABLE hTable (" + "id STRING," + "f ROW," + "PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + "connector' = 'hbase-2.2'," + "' table-name' = 'regional:binlog'," + "' zookeeper.quorum' = 'node2:2181'" + ") / / store CDC data in Hbase bsTableEnv.executeSql ("INSERT INTO hTable SELECT id, ROW (times, temp) FROM mysql_binlog")
-- Table structure for sersor_temp
DROP TABLE IF EXISTS `sersor_ temp`
CREATE TABLE `sersor_ temp` (
`id` varchar (20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL
`temp` decimal (10,2) NOT NULL
`times` varchar (10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact
-- Records of sersor_temp
INSERT INTO `sersor_ temp` VALUES ('sensor_1', 22.20,' 1547718527')
INSERT INTO `sersor_ temp` VALUES ('sensor_2', 25.20,' 1547718214')
INSERT INTO `sersor_ temp` VALUES ('sensor_3', 46.40,' 1547718520')
INSERT INTO `sersor_ temp` VALUES ('sensor_5', 32.62,' 1547718325')
Note: the temp field in the table here is of decimal type. Using DECIMAL in SQL and storing DOUBLE type in hbase will cause garbled problems, so they are all replaced by STRING.
Thank you for your reading, the above is the content of "how Flink CDC monitors MySQL table". After the study of this article, I believe you have a deeper understanding of how Flink CDC monitors MySQL tables, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.