Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The method of handling updates and deletes by Kafka Connect

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

In this article, the editor introduces in detail the "Kafka Connect methods for dealing with updates and deletions". The content is detailed, the steps are clear, and the details are handled properly. I hope this "Kafka Connect methods for dealing with updates and deletions" can help you solve your doubts.

Kafka Connect is an excellent tool that allows you to easily set up continuous data flows from a data source to a target database. Its configuration is very simple, and when you have a legacy system to serve the business data you need, it is useful in different places for some reason or other. My typical use case is to move data from the Oracle table to the MongoDB collection used by the microservice. This allows for better scalability because we do not have to use production queries to access a large number of source tables.

When you open the Kafka Connect manual, one of the things that is not easy to explain is how to modify existing data that has been moved; or, in other words, update and delete. I think this is the limitation of the typical JDBC/MongoDB connector pair we use. For a while I explored the Debezium connector, which promises to capture these types of events and copy them to the target database. Using OracleDB's POC is not successful for us. We have limited access to these databases, and the level of configuration required for these connectors is not a simple solution.

As we continue to use connectors, we find that there are some ways to deal with these scenarios. I will explain two strategies. The first is ideal and requires a specific design in our source database. If the design does not exist and cannot be changed for any reason, the second is an alternative solution.

Basic example

Suppose we have an old system for dealing with promotions. To simplify our example, suppose we have a basic table with three columns. We need to constantly move this data from SQL databases to document-based databases such as MongoDB.

Basic concept

First, we need a quick description of the two types of Kafka connectors that can be used: incremental and batch. Strictly speaking, JDBC connectors have four modes: bulk, timestamp, incrementing, and timestamp+incrementing. I group the last three into increments because they share the same basic concepts. You only want to move the new data detected from the source.

Bulk connectors always move the entire dataset. However, much depends on the use case of the data we are moving. Ideally, incremental connectors are the best solution because it is easier to manage small chunks of new data in terms of resource usage or data preparation. The question here is: how does Kafka Connect use pure SQL queries and how does it know when new data is inserted into the source?

The source connector configuration can use one (or both) of the following two properties: incrementing.column.name and timestamp.column.name. The Incrementing property uses incremental columns, such as automatically generated id, to detect when a new row is inserted. The Timestamp property uses the DateTime column to detect new changes. Kafka Connect holds an offset and appends it to the SQL query used to get data from the source.

For example, if our table is named "promotions", we will use it in the query properties of the source connector, as follows:

"query": "SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS", "timestamp.column.name": "LAST_UPDATE_DATE"

Kafka internally modifies the query to look like this:

SELECT * FROM (SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS) WHERE LAST_UPDATE_DATE > {OFFSET_DATE}

On the receiver connector side, that is, the connector that holds the data in the target database, we need to set a policy to make the correct upsert based on ID. You can read more about it in the documentation for the receiver connector you use. For MongoDB connectors, the typical settings I use are:

"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy"

This indicates that the _ id of our document will be from the source data in the future. In this case, our source query should contain a _ id column:

"query": "SELECT PROMO_ID as\" _ id\ ", TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS"

At this point, we have the basic configuration to detect new inserts. Each time a new promotion with a new timestamp is added, the source connector grabs it and moves it to the desired destination. But with this exact same configuration, we can achieve the overall goal of detecting updates and deletions. What we need is to design our data source correctly.

Change the timestamp column on each update

If we want to ensure that our updates are processed and reflected in the target database, we need to make sure that each update made in the source table also updates the timestamp column values. This can be done by the application that wrote it with the current timestamp as a parameter to the update operation, or by creating a trigger that listens for update events. Because the sink connector processes upsert according to id, updates are also reflected in the target document.

Soft deletion

In order to be able to handle deletions, we need the previous steps and what is considered a good practice in database design: soft deletions. This practice is not to delete (hard delete) a record in the database when needed, but to mark it with a special flag indicating that the record is no longer valid / active. This has its own benefits in terms of recoverability or auditing. This of course means that our application or stored procedure needs to understand this design and filter out inactive records when querying data.

If it is difficult to update the application that deletes the record for soft deletion (in case the design of the data source does not take this into account), we can also use triggers to capture hard deletions and do soft deletions instead.

For our Kafka Connect purposes, what we need to do is to change our timestamp column value when the record is marked as inactive. In this example, we set the HOT SUMMER promotion to inactive and the ACTIVE column to 0. The LAST_UPDATE_DATE is also modified to the most recent date, which causes the source connector to get the record.

When data is moved, such as moving to MongoDB, in order to use it, we also need to filter based on this ACTIVE field:

Db.getCollection ('promotions') .find ({active: 1}) versioned batch

If we have to deal with an immutable design, the last method option we can use does not allow the source mode to be modified to have a timestamp column or activity flag. This option has what I call versioned batches. As I explained earlier, the bulk connector moves the entire dataset with each call. In most cases, I've come across the idea that incremental updates are always preferable, but in this case, we can take advantage of the batch option.

Because we need to track what is newly inserted, updated, or deleted, we can add an additional column to identify a snapshot of the data each time we move the data. We can also use the timestamp when querying the data. Because the timestamp is a value sorted by natural descendants, if we want the latest snapshot, we can easily filter through the last or penultimate snapshot (I'll explain why this might be better) once the data is moved to the target location.

The query in Oracle is as follows:

"query": "SELECT PROMO_ID as\" _ id\ ", TITLE, DISCOUNT, PRODUCT_CATEGORY, TO_CHAR (SYSDATE, 'yyyymmddhh34miss') AS SNAPSHOT FROM PROMOTIONS"

This approach requires configurations that are critical to correct performance when using the final dataset. As you can imagine, the index is important here and, more importantly, in the new snapshot column. Another important consideration is the consumption of space. Depending on the number of records in each snapshot, we may need to delete the old version. We can use some scheduled tasks for this, or we can configure TTL as we would with MongoDB indexes.

When working with data, we first need to take the latest snapshots. I mentioned that the penultimate one might be better. The reason is that the latest may be in progress. In other words, when you execute a query to use the data, the data may move. If your query against the target database is any type of aggregation, you may get incomplete results. Therefore, for the latest snapshot, we are not sure whether it is ready for use. If we grab the penultimate one, we can be sure that the snapshot is complete.

In the next example, two versions of the data are moved. Version 2021073012000 contains three documents. The newer version 2021080112000 has two documents, one text

After reading this, the article "Kafka Connect's methods for handling updates and deletions" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it yourself. If you want to know more about related articles, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report