In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly shows you the "SQL Server CDC with Kafka Connect monitoring data change example analysis", the content is easy to understand, clear, hope to help you solve doubts, the following let the editor lead you to study and learn "SQL Server CDC with Kafka Connect monitoring data change example analysis" this article.
Write at the front
When it comes to the topic, usually in order to realize the systematic operation and management of global data such as data statistics, data analysis, data mining and solving information isolated islands, lay the foundation for the in-depth development and application of BI, business analysis and decision support system, and mine the value of data, enterprises will begin to build data warehouse and data center. These data sources come from various business systems of the enterprise or crawl external data. The process from business system data to data warehouse is an ETL (Extract-Transform-Load) behavior, including collection, cleaning, data conversion and other main processes. Usually heterogeneous data extraction and conversion uses Sqoop, DataX, etc., log collection Flume, Logstash, Filebeat and so on.
Data extraction is divided into full extraction and incremental extraction, full extraction is similar to data migration or data replication, full extraction is easy to understand; incremental extraction is incremental on the basis of full amount, only monitoring and capturing dynamic data. How to capture the change of data is the key of incremental extraction, one is accuracy, we must accurately capture the dynamic changes of data, and the other is performance, which can not cause too much pressure on the business system.
Incremental extraction mode
There are usually several ways of incremental extraction, each of which has its own advantages and disadvantages.
1. Trigger
Create triggers in the target table on the source database, listen for add, delete, and change operations, and capture changes to the data to write to the temporary table.
Advantages: simple operation, clear rules, no effect on source table
Disadvantages: intrusion into the source database, which has a certain impact on the business system.
two。 Full table comparison
In the ETL process, the extractor sets up a temporary table for full extraction and storage, and then compares the data.
Advantages: no change to the source database and source table, complete delivery of ETL process processing, unified management
Disadvantages: ETL has low efficiency, complex design, large amount of data, slower speed and uncertain timeliness.
3. Delete the whole table and then insert it
Before extracting the data, empty the data in the table, and then extract all the data.
Advantages: ETL is easy to operate and fast.
Disadvantages: full extraction is generally in the form of Thum1, and extracting tables with large amounts of data is easy to cause pressure on the database.
4. Time stamp
The way of timestamp is to add a timestamp column to the source table, update the changed table, and then extract it according to the timestamp.
Advantages: simple operation, clear ELT logic and good performance
Disadvantages: there is an intrusion into the business system, and additional fields are needed for database tables. It may not be easy to change the old business system.
5. CDC mode
Change data capture Change Data Capture (CDC for short). SQLServer provides a CDC mechanism for real-time update data synchronization, similar to Mysql's binlog, which maintains data update operations in a CDC table. The source table with CDC enabled inserts data into the log table when inserting INSERT, updating UPDATE, and deleting DELETE activities. Cdc captures the change data into the change table through the capture process, which can be captured through the query function provided by cdc. For more information, please see the official introduction: about change data capture (SQL Server)
Advantages: provide easy-to-use API to set up the CDC environment, shorten the time of ETL, and do not need to modify the business system table structure.
Disadvantages: limited by the version of the database, the implementation process is relatively complex.
Prerequisites for CDC incremental extraction
1. Kafka cluster and Zookeeper cluster have been set up.
two。 The source database supports CDC, and the version is the development version or the enterprise version.
Case environment:
Ubuntu 20.04
Kafka2.13-2.7.0
Zookeeper 3.6.2
SQL Server 2012
Steps
In addition to enabling CDC support in the database, the main thing is to transfer the changed data through Kafka Connect. Debezium is an officially recommended connector that supports most mainstream databases: MySQL, PostgreSQL, SQL Server, Oracle and so on. For more information, please see Connectors.
1. Database step
Enable database CDC support
Execute the following command in the source database:
EXEC sys.sp_cdc_enable_db GO
A close statement is attached:
Exec sys.sp_cdc_disable_db
Query whether it is enabled
Select * from sys.databases where is_cdc_enabled = 1
Create a test data table: (skip this step for existing tables)
Create table T_LioCDC (ID int identity (1) primary key, Name nvarchar (16), Sex bit, CreateTime datetime, UpdateTime datetime)
Enable CDC support for source tables:
Exec sp_cdc_enable_table @ source_schema='dbo', @ source_name='T_LioCDC', @ role_name=null,@supports_net_changes = 1
Confirm that you have permission to access CDC Table:
EXEC sys.sp_cdc_help_change_data_capture
Verify that SQL Server Agent is turned on:
EXEC master.dbo.xp_servicecontrol SQLSERVERAGENT'
The above completes the CDC operation to the database.
2. Kafka step
There are two working modes of Kafka Connect, which are standalone mode and distributed mode. Standalone is used for stand-alone testing. This paper uses distributed mode for production environment. (Kafka must run startup before performing the following steps to configure. )
Download Sql Server Connector
After downloading the connector, create a folder to store it and extract it to that directory. Example path: / usr/soft/kafka/kafka_2.13_2.7.0/plugins (remember this path, which will be used in the configuration)
Download address: debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz
Edit connect-distributed.properties configuration
Modify the Kafka connect configuration file, $KAFKA_HOME/config/connect-distributed.properties, with the following changes:
/ / kafka Cluster ip+portbootstrap.servers=172.192.10.210:9092172.192.10.211:9092172.192.10.212:9092key.converter.schemas.enable=falsevalue.converter.schemas.enable=falseoffset.storage.topic=connect-offsetsoffset.storage.replication.factor=1offset.storage.partitions=3offset.storage.cleanup.policy=compactconfig.storage.topic=connect-configsconfig.storage.replication.factor=1status.storage.topic=connect-statusstatus.storage.replication.factor=1status.storage.partitions=3// just downloaded the path plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins of the connector decompression
See that there are three Topic in the configuration, which are
Config.storage.topic: used to save configuration information for connector and task, it is important to note that the number of partitions for this topic can only be 1, and there are multiple copies.
Offset.storage.topic: used to save offset information.
Status.storage.topic: used to save the status information of connetor.
These Topic may not be created, but will be created by default after startup.
Start the Kafka cluster
After saving the configuration, distribute the connect-distributed.properties to the cluster, and then start:
Bin/connect-distributed.sh config/connect-distributed.properties
Check to see if it starts
Connector supports REST API management, so you can call relevant APIs for management with Post man or Fiddler. Check to see if it is started:
Not surprisingly, the IP configured for the cluster above is 172, and 192.168.1.177 here is still a server in my cluster because the servers use dual network cards. Because there is no connector-related configuration, the interface returns an empty array, and a new connector will be added next.
Write sqlserver-cdc-source.json
{"name": "sqlserver-cdc-source", "config": {"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.server.name": "JnServer", "database.hostname": "172.192.20.2"-- ip "database.port" of the target database: "1433" -- Port "database.user": "sa" of the target database,-- account "database.password" of the target database: "123456",-- password "database.dbname": "Dis",-- database name of the target database "table.whitelist": "dbo.T_LioCDC" -- listener table name "schemas.enable": "false", "mode": "incrementing",-- incremental mode "incrementing.column.name": "ID",-- incremental column name "database.history.kafka.bootstrap.servers": "172.192.10.210 mode 9092172.192.211" -- kafka cluster "database.history.kafka.topic": "TopicTLioCDC",-- kafka topic internal use Not for consumers to use "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter"} / / Source address: https://www.cnblogs.com/EminemJK/p/14688907.html
There are other additional configurations, you can refer to the official documentation. And then execute
Continue to perform the check and find that the connector has been successfully configured:
Other API
GET / connectors-returns all running connector names. POST / connectors-the new connector; request body must be in json format and contain name field and config field. Name is the name of connector, config is json format, and must contain the configuration information of your connector. GET / connectors/ {name}-gets information about the specified connetor. GET / connectors/ {name} / config-gets the configuration information for the specified connector. PUT / connectors/ {name} / config-updates the configuration information for the specified connector. GET / connectors/ {name} / status-gets the status of the specified connector, including whether it is running, stopped, or failed, and lists the details of the error if an error occurs. GET / connectors/ {name} / tasks-gets the task that the specified connector is running. GET / connectors/ {name} / tasks/ {taskid} / status-gets the status information of the task of the specified connector. PUT / connectors/ {name} / pause-pause connector and its task and stop data processing until it is restored. PUT / connectors/ {name} / resume-restore a paused connector. POST / connectors/ {name} / restart-it is more common to restart a connector, especially if a connector fails, POST / connectors/ {name} / tasks/ {taskId} / restart-restart a task, usually because it fails. DELETE / connectors/ {name}-Delete a connector, stop all its task, and delete the configuration. / / Source address: https://www.cnblogs.com/EminemJK/p/14688907.html
View Topic
/ usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh-list-zookeeper localhost:2000
TopicJnServer.dbo.T_LioCDC is the theme for our consumption, starting a consumer monitoring test:
Bin/kafka-console-consumer.sh-- bootstrap-server 172.192.10.210 consumer-property group.id=group1-- consumer-property client.id=consumer-1-- topic JnServer.dbo.T_LioCDC
Then add and delete some columns in the source table.
-- Test code insert into T_LioCDC (name, sex, createtime,UpdateTime) values ('insert into T_LioCDC (), getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values (' insert into T_LioCDC (), getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values (()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values Getdate () insert into T_LioCDC (name, sex, createtime,UpdateTime) values ('getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values (' getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values
Data changes have been successfully captured. Compare several operations Json, which are insert, update, and delete in turn:
The above is all the contents of the article "sample Analysis of SQL Server CDC with Kafka Connect snooping data changes". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.