Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to try kafka connector snooping on sqlserver

2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to carry out the attempt of kafka connector monitoring sqlserver, in view of this problem, this article introduces the corresponding analysis and answer in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.

Before, use canal to listen to mysql's binlog and pass the message to kafka topic, but canal can only listen to mysql, if the database is sqlserver\ orcale\ mongodb then there is nothing you can do. Take a look at the information on the Internet, the mainstream is to use kafka connect to monitor sqlserver, the following share the process of my attempt.

Now briefly, the configuration process involves kafka connector,confluent,kafka. Kafka connector is a built-in feature of kafka that is used to create and manage data flow pipes. It is a simple model for exchanging data with other systems.

Confluent is a company that makes products around kafka. It not only provides data transmission system, but also provides data transmission tools. It encapsulates kafka. Here we only use it to download the connector component of the kafka link sqlserver.

The kafka I use is installed with CDH cloudera manager, so the bin directory\ configuration directory\ log of kafka is not together, and there is no $KAFKA_HOME. Although this is a test function, but in order to download more connector components in the future, I still downloaded confluent. It is recommended to download on the official website, did not turn over the wall, the network speed is OK.

Confluent download address https://www.confluent.io/download/ Select the Download Confluent Platform below and fill in the email address and purpose for download.

Version 5.2 download address: http://packages.confluent.io/archive/5.2/

In the location where you are ready to download and extract, start downloading and unzipping:

Wget http://packages.confluent.io/archive/5.2/confluent-5.2.3-2.11.ziptar-zxvf confluent-5.2.3-2.11.zip confluent-5.2.3-2.11

The unzipped should have the following folders (I created usr myself to store the user's configuration files and statements):

Configure CONFLUENT_HOME into the environment variable:

Vi / etc/profileexport CONFLUENT_HOME=/usr/software/confluent-5.2.3export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME:$CONFLUENT_HOME/bin

The path is my own, so change it to your own file path.

Download the connector connector component. The configuration file for each component to connect to jdbc may be different. Check the official documentation. I chose debezium-connector-sqlserver. First enter the bin directory, you can see that there is a confluent-hub instruction, we rely on it to download components.

[root@centos04 bin] # confluent-hub install debezium/debezium-connector-sqlserver:latestThe component can be installed in any of the following Confluent Platform installations: 1. / usr/software/confluent-5.2.3 (based on $CONFLUENT_HOME) 2. / usr/software/confluent-5.2.3 (where this tool is installed) Choose one of these to continue the installation (1-2): 2Do you want to install this into / usr/software/confluent-5.2.3/share/confluent-hub-components? (yN) y ^ HDO you want to install this into / usr/software/confluent-5.2.3/share/confluent-hub-components? (yN) y Component's license: Apache 2.0 https://github.com/debezium/debezium/blob/master/LICENSE.txt I agree to the software license agreement (yN) y

After entering the instructions, first ask you to install the component location, $CONFLUENT_HOME directory or confluent directory, and then ask you whether the component is installed in the default location {$confluent} / share/confluent-hub-components, select n, you can enter the file location, and then ask if you agree to the license, and whether to update the component. If there is no special demand, just choose y directly.

Other components can be selected from https://www.confluent.io/hub/, there are official documentation to teach you how to configure, very important. Just looking at online tutorials doesn't understand why it's easy to take detours and don't know what went wrong. I read a lot of articles are exactly the same, using the component is Confluent MSSQL Connector. But this component is no longer available, so the configuration needs to be changed if you switch to other components. I spent a lot of time here. Pay attention to the official documents.

Debezium SQL Server description document address: https://docs.confluent.io/current/connect/debezium-connect-sqlserver/index.html#sqlserver-source-connector

After the download is complete, you can see the downloaded components in the {$confluent} / share/confluent-hub-components directory. Next, configure kafka.

Enter the kafka configuration directory. If kafka is installed separately, the $KAFKA_HOME/config,CDH version of the configuration file is under / opt/cloudera/parcels/CDH-6.3.0-1.cdh7.3.0.p0.1279813/etc/kafka/conf.dist. If you don't know the installation location, search the file name connect-distributed.properties directly. If none of this means that your kafka version may be too low, there is no such feature.

Modify the connect-distributed.properties file.

# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2. (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "ASIS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.### This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended# to be used with the examples, and some settings may differ from those used in a production system, especially# the `bootstrap. Servers` and those specifying replication factors.# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.#kafka cluster location, you need to configure bootstrap.servers=centos04:9092,centos05:9092,centos06:9092# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs#group.id, default is connect-cluster, just keep consistent group.id=connect-cluster# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will# need to configure these based on the format they want their data in when loaded from or stored into Kafkakey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply# it tokey.converter.schemas.enable=truevalue.converter.schemas.enable=true# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create# the topic before starting Kafka Connect if a specific topic configuration is needed.# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.# Since this means there must be at least as many brokers as the maximum replication factor used We'd like to be able# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.offset.storage.topic=connect-offsetsoffset.storage.replication.factor=3offset.storage.partitions=1# Topic to use for storing connector and task configurations Note that this should be a single partition, highly replicated,# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create# the topic before starting Kafka Connect if a specific topic configuration is needed.# Most users will want to use the built-in default replication factor of 3 orin some cases even specify a larger value.# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.config.storage.topic=connect-configsconfig.storage.replication.factor=3# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create# the topic before starting Kafka Connect if a specific topic configuration is needed.# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.# Since this means there must be at least as many brokers as the maximum replication factor used We'd like to be able# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.status.storage.topic=connect-statusstatus.storage.replication.factor=3#status.storage.partitions=1offset.storage.file.filename=/var/log/confluent/offset-storage-file# Flush much faster than normal, which is useful for testing/debuggingoffset.flush.interval.ms=10000# These are provided to inform the user about the presence of the REST host and port configs # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.#rest.host.name=#kafka connector port number, you can modify rest.port=8083# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.#rest.advertised.host.name=#rest.advertised.port=# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins# (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies# b) uber-jars with plugins and their dependencies# c) directories immediately containing the package directory structure of classes of plugins and their dependencies# Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a # directory other than the home directory of Confluent Platform.# component location Add the download location of the confluent component to plugin.path=/usr/software/confluent-5.2.3/share/java/confluent-hub-client,/usr/software/confluent-5.2.3/share/confluent-hub-client,/usr/software/confluent-5.2.3/share/confluent-hub-components

First create a special topic to be used in connector to avoid kafka connector startup failure caused by creation failure when starting kafka connector. There are three special topic:

Kafka-topics-- create-- zookeeper 192.168.49.104 zookeeper 2181-- topic connect-offsets-- replication-factor 3-- partitions 1kafka-topics-- create-- zookeeper 192.168.49.104 create 2181-- topic connect-configs-- replication-factor 3-- partitions 1kafka-topics-- create-- zookeeper 192.168.49.104 2181-- topic connect-status-- replication-factor 3-- partitions 1

Then enter the bin directory of kafka, and the CDH version is / opt/cloudera/parcels/CDH-6.3.0-1.cdh7.3.0.p0.1279813/lib/kafka/bin.

Execute the connect-distributed.sh instruction:

Sh connect-distributed.sh / opt/cloudera/parcels/CDH-6.3.0-1.cdh7.3.0.p0.1279813/etc/kafka/conf.dist/connect-distributed.properties

To say one thing, the kafka installed by CDH will report an error when executing the command and cannot find the log file because the various parts of the kafka installed by CDH are not together. Just modify the connect-distributed.sh directly and write down the address inside.

Vi connect-distributed.sh# modified local base_dir=$ (dirname $0) if ["x$KAFKA_LOG4J_OPTS" = "x"]; then export KAFKA_LOG4J_OPTS= "- Dlog4j.configuration=file:/opt/cloudera/parcels/CDH-6.3.0-1.cdh7.3.0.p0.1279813/etc/kafka/conf.dist/connect-log4j.properties" fi

In this way, there will be no problem with implementation.

The above execution is performed in the foreground, and if the foreground stops exiting, the kafka connector will stop, which is suitable for debugging. You need to add the-daemon parameter to run in the background.

Sh connect-distributed.sh-daemon / opt/cloudera/parcels/CDH-6.3.0-1.cdh7.3.0.p0.1279813/etc/kafka/conf.dist/connect-distributed.properties

If you use Debezium SQL Server to monitor, you need to turn on the CDC function of sqlserver. CDC function must first enable the capture of the library, and then turn on the capture of the table before you can listen to the changes of the table.

I use navicat to connect to the database, you can use your own appropriate tools.

Capture of the open library:

Use database;EXEC sys.sp_cdc_enable_db

After this step, the database will have an extra schema called cdc, with 5 tables below.

Query which databases have the CDC feature enabled:

Select * from sys.databases where is_cdc_enabled = 1

Enable the CDC function of the table:

Use database; EXEC sys.sp_cdc_enable_table @ source_schema = 'dbo', @ source_name =' table_name', @ role_name = null

See which tables have the CDC feature enabled:

Use database;select name, is_tracked_by_cdc from sys.tables where is_tracked_by_cdc = 1

The above turns on the CDC function of monitoring the table.

When we start KafkaConnector, we can access and submit information through the interface.

View kafka connector information:

[root@centos04 huishui] # curl-s centos04:8083 | jq {"version": "2.2.1-cdh7.3.0", "commit": "unknown", "kafka_cluster_id": "GwdoyDpbT5uP4k2CN6zbrw"}

8083 is the port number configured above, which can also be accessed through the web page.

See which connector connectors are installed:

[root@centos04 huishui] # curl-s centos04:8083 | jq {"version": "2.2.1-cdh7.3.0", "commit": "unknown", "kafka_cluster_id": "GwdoyDpbT5uP4k2CN6zbrw"} [root@centos04 huishui] # curl-s centos04:8083/connector-plugins | jq [{"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type": "sink", "version": "10.0.2"} {"class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "5.5.1"}, {"class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "5.5.1"}, {"class": "io.debezium.connector.sqlserver.SqlServerConnector", "type": "source" "version": "1.2.2.Final", {"class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "2.2.1-cdh7.3.0"}, {"class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source" "version": "2.2.1-cdh7.3.0"}]

I installed a lot, io.debezium.connector.sqlserver.SqlServerConnector means no problem.

View the currently running task / Task:

[root@centos04 huishui] # curl-s centos04:8083/connectors | jq []

Since we haven't submitted any user configuration, there are no tasks, and the return is an empty json. It shows that kafka connector starts successfully and can be configured normally. Next is the business operation. Write a user-configured json and submit it through the API:

# I chose to save the user configuration. Since my kafka is not in the same folder, I store all the configuration files in confluent/usr. In fact, it doesn't matter whether you save it or not. According to the official documentation, I chose to save. # when the kafka connector is created, the kafka topic will be created automatically. The name ${server.name}. $tableName.debezium cannot listen to a single table. All tables will have corresponding topic.cd $CONFLUENTmkdir usrcd usrvi register-sqlserver.json {"name": "inventory-connector", "config": {"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "database.server.name": "server.name". "database.hostname": "localhost", "database.port": "1433", "database.user": "sa", "database.password": "password!", "database.dbname": "rcscounty_quannan", "database.history.kafka.bootstrap.servers": "centos04:9092" "database.history.kafka.topic": "schema-changes.inventory"} curl-I-X POST-H "Accept:application/json"-H "Content-Type:application/json" http://centos04:8083/connectors/-d @ register-sqlserver.json

Failure to submit will have an error message. See what the error message is and then change it. When the submission is successful, and then look at the currently running Task, there will be a connector:

[root@centos04 huishui] # curl-s centos04:8083/connectors | jq ["inventory-connector"]

View kafka topic:

Kafka-topics-list-zookeeper centos04:2181

You will see that kafka has created the topic. If there is no corresponding topic, then there may be a problem with the connector at run time. View the connector status created at that time:

[root@centos04 usr] # curl-s centos04:8083/connectors/inventory-connector/status | jq {"name": "inventory-connector", "connector": {"state": "RUNNING", "worker_id": "192.168.49.104 jq 8083"}, "tasks": [{"id": 0, "state": "RUNNING" "worker_id": "192.168.49.104 source 8083"}], "type": "source"}

Mine is in good condition. If there is no problem with running, start listening to the topic corresponding to the table with CDC enabled to see if you can successfully listen for changes to the table:

Kafka-console-consumer-bootstrap-server centos04:9092-topic server.name.tableName

You can see that the topic created by Debezium connector delivers a lot of messages, and you may need to modify the maximum message body of kafka. I set 9m before, so there is no problem here.

The database changes passed by Debezium and the json of new / modified / deleted\ mode changes are different. For more information, please see the Debezium Connector for SQL Server.

In short, if you can see the change, it means the debugging is successful.

This is the answer to the question about how to monitor sqlserver by kafka connector. I hope the above content can be of some help to you. If you still have a lot of doubts to solve, you can follow the industry information channel to learn more about it.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report