Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of the change of listening data between SQL Server CDC and Kafka Connect

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly shows you the "SQL Server CDC with Kafka Connect monitoring data change example analysis", the content is easy to understand, clear, hope to help you solve doubts, the following let the editor lead you to study and learn "SQL Server CDC with Kafka Connect monitoring data change example analysis" this article.

Write at the front

When it comes to the topic, usually in order to realize the systematic operation and management of global data such as data statistics, data analysis, data mining and solving information isolated islands, lay the foundation for the in-depth development and application of BI, business analysis and decision support system, and mine the value of data, enterprises will begin to build data warehouse and data center. These data sources come from various business systems of the enterprise or crawl external data. The process from business system data to data warehouse is an ETL (Extract-Transform-Load) behavior, including collection, cleaning, data conversion and other main processes. Usually heterogeneous data extraction and conversion uses Sqoop, DataX, etc., log collection Flume, Logstash, Filebeat and so on.

Data extraction is divided into full extraction and incremental extraction, full extraction is similar to data migration or data replication, full extraction is easy to understand; incremental extraction is incremental on the basis of full amount, only monitoring and capturing dynamic data. How to capture the change of data is the key of incremental extraction, one is accuracy, we must accurately capture the dynamic changes of data, and the other is performance, which can not cause too much pressure on the business system.

Incremental extraction mode

There are usually several ways of incremental extraction, each of which has its own advantages and disadvantages.

1. Trigger

Create triggers in the target table on the source database, listen for add, delete, and change operations, and capture changes to the data to write to the temporary table.

Advantages: simple operation, clear rules, no effect on source table

Disadvantages: intrusion into the source database, which has a certain impact on the business system.

two。 Full table comparison

In the ETL process, the extractor sets up a temporary table for full extraction and storage, and then compares the data.

Advantages: no change to the source database and source table, complete delivery of ETL process processing, unified management

Disadvantages: ETL has low efficiency, complex design, large amount of data, slower speed and uncertain timeliness.

3. Delete the whole table and then insert it

Before extracting the data, empty the data in the table, and then extract all the data.

Advantages: ETL is easy to operate and fast.

Disadvantages: full extraction is generally in the form of Thum1, and extracting tables with large amounts of data is easy to cause pressure on the database.

4. Time stamp

The way of timestamp is to add a timestamp column to the source table, update the changed table, and then extract it according to the timestamp.

Advantages: simple operation, clear ELT logic and good performance

Disadvantages: there is an intrusion into the business system, and additional fields are needed for database tables. It may not be easy to change the old business system.

5. CDC mode

Change data capture Change Data Capture (CDC for short). SQLServer provides a CDC mechanism for real-time update data synchronization, similar to Mysql's binlog, which maintains data update operations in a CDC table. The source table with CDC enabled inserts data into the log table when inserting INSERT, updating UPDATE, and deleting DELETE activities. Cdc captures the change data into the change table through the capture process, which can be captured through the query function provided by cdc. For more information, please see the official introduction: about change data capture (SQL Server)

Advantages: provide easy-to-use API to set up the CDC environment, shorten the time of ETL, and do not need to modify the business system table structure.

Disadvantages: limited by the version of the database, the implementation process is relatively complex.

Prerequisites for CDC incremental extraction

1. Kafka cluster and Zookeeper cluster have been set up.

two。 The source database supports CDC, and the version is the development version or the enterprise version.

Case environment:

Ubuntu 20.04

Kafka2.13-2.7.0

Zookeeper 3.6.2

SQL Server 2012

Steps

In addition to enabling CDC support in the database, the main thing is to transfer the changed data through Kafka Connect. Debezium is an officially recommended connector that supports most mainstream databases: MySQL, PostgreSQL, SQL Server, Oracle and so on. For more information, please see Connectors.

1. Database step

Enable database CDC support

Execute the following command in the source database:

EXEC sys.sp_cdc_enable_db GO

A close statement is attached:

Exec sys.sp_cdc_disable_db

Query whether it is enabled

Select * from sys.databases where is_cdc_enabled = 1

Create a test data table: (skip this step for existing tables)

Create table T_LioCDC (ID int identity (1) primary key, Name nvarchar (16), Sex bit, CreateTime datetime, UpdateTime datetime)

Enable CDC support for source tables:

Exec sp_cdc_enable_table @ source_schema='dbo', @ source_name='T_LioCDC', @ role_name=null,@supports_net_changes = 1

Confirm that you have permission to access CDC Table:

EXEC sys.sp_cdc_help_change_data_capture

Verify that SQL Server Agent is turned on:

EXEC master.dbo.xp_servicecontrol SQLSERVERAGENT'

The above completes the CDC operation to the database.

2. Kafka step

There are two working modes of Kafka Connect, which are standalone mode and distributed mode. Standalone is used for stand-alone testing. This paper uses distributed mode for production environment. (Kafka must run startup before performing the following steps to configure. )

Download Sql Server Connector

After downloading the connector, create a folder to store it and extract it to that directory. Example path: / usr/soft/kafka/kafka_2.13_2.7.0/plugins (remember this path, which will be used in the configuration)

Download address: debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz

Edit connect-distributed.properties configuration

Modify the Kafka connect configuration file, $KAFKA_HOME/config/connect-distributed.properties, with the following changes:

/ / kafka Cluster ip+portbootstrap.servers=172.192.10.210:9092172.192.10.211:9092172.192.10.212:9092key.converter.schemas.enable=falsevalue.converter.schemas.enable=falseoffset.storage.topic=connect-offsetsoffset.storage.replication.factor=1offset.storage.partitions=3offset.storage.cleanup.policy=compactconfig.storage.topic=connect-configsconfig.storage.replication.factor=1status.storage.topic=connect-statusstatus.storage.replication.factor=1status.storage.partitions=3// just downloaded the path plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins of the connector decompression

See that there are three Topic in the configuration, which are

Config.storage.topic: used to save configuration information for connector and task, it is important to note that the number of partitions for this topic can only be 1, and there are multiple copies.

Offset.storage.topic: used to save offset information.

Status.storage.topic: used to save the status information of connetor.

These Topic may not be created, but will be created by default after startup.

Start the Kafka cluster

After saving the configuration, distribute the connect-distributed.properties to the cluster, and then start:

Bin/connect-distributed.sh config/connect-distributed.properties

Check to see if it starts

Connector supports REST API management, so you can call relevant APIs for management with Post man or Fiddler. Check to see if it is started:

Not surprisingly, the IP configured for the cluster above is 172, and 192.168.1.177 here is still a server in my cluster because the servers use dual network cards. Because there is no connector-related configuration, the interface returns an empty array, and a new connector will be added next.

Write sqlserver-cdc-source.json

{"name": "sqlserver-cdc-source", "config": {"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.server.name": "JnServer", "database.hostname": "172.192.20.2"-- ip "database.port" of the target database: "1433" -- Port "database.user": "sa" of the target database,-- account "database.password" of the target database: "123456",-- password "database.dbname": "Dis",-- database name of the target database "table.whitelist": "dbo.T_LioCDC" -- listener table name "schemas.enable": "false", "mode": "incrementing",-- incremental mode "incrementing.column.name": "ID",-- incremental column name "database.history.kafka.bootstrap.servers": "172.192.10.210 mode 9092172.192.211" -- kafka cluster "database.history.kafka.topic": "TopicTLioCDC",-- kafka topic internal use Not for consumers to use "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter"} / / Source address: https://www.cnblogs.com/EminemJK/p/14688907.html

There are other additional configurations, you can refer to the official documentation. And then execute

Continue to perform the check and find that the connector has been successfully configured:

Other API

GET / connectors-returns all running connector names. POST / connectors-the new connector; request body must be in json format and contain name field and config field. Name is the name of connector, config is json format, and must contain the configuration information of your connector. GET / connectors/ {name}-gets information about the specified connetor. GET / connectors/ {name} / config-gets the configuration information for the specified connector. PUT / connectors/ {name} / config-updates the configuration information for the specified connector. GET / connectors/ {name} / status-gets the status of the specified connector, including whether it is running, stopped, or failed, and lists the details of the error if an error occurs. GET / connectors/ {name} / tasks-gets the task that the specified connector is running. GET / connectors/ {name} / tasks/ {taskid} / status-gets the status information of the task of the specified connector. PUT / connectors/ {name} / pause-pause connector and its task and stop data processing until it is restored. PUT / connectors/ {name} / resume-restore a paused connector. POST / connectors/ {name} / restart-it is more common to restart a connector, especially if a connector fails, POST / connectors/ {name} / tasks/ {taskId} / restart-restart a task, usually because it fails. DELETE / connectors/ {name}-Delete a connector, stop all its task, and delete the configuration. / / Source address: https://www.cnblogs.com/EminemJK/p/14688907.html

View Topic

/ usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh-list-zookeeper localhost:2000

TopicJnServer.dbo.T_LioCDC is the theme for our consumption, starting a consumer monitoring test:

Bin/kafka-console-consumer.sh-- bootstrap-server 172.192.10.210 consumer-property group.id=group1-- consumer-property client.id=consumer-1-- topic JnServer.dbo.T_LioCDC

Then add and delete some columns in the source table.

-- Test code insert into T_LioCDC (name, sex, createtime,UpdateTime) values ('insert into T_LioCDC (), getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values (' insert into T_LioCDC (), getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values (()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values Getdate () insert into T_LioCDC (name, sex, createtime,UpdateTime) values ('getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values (' getdate ()) insert into T_LioCDC (name, sex, createtime,UpdateTime) values

Data changes have been successfully captured. Compare several operations Json, which are insert, update, and delete in turn:

The above is all the contents of the article "sample Analysis of SQL Server CDC with Kafka Connect snooping data changes". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report