Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What does CDC mean in flink 1.11?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article is to share with you about the meaning of CDC in flink 1.11. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Introduction to CDC

CDC,Change Data Capture, short for change data acquisition, using CDC, we can get committed changes from the database and send them downstream for downstream use. These changes can include INSERT,DELETE,UPDATE, etc.

Users can use CDC in the following scenarios:

Using flink sql for data synchronization, you can synchronize data from one data to another, such as mysql, elasticsearch, and so on.

An aggregate view can be materialized in real time on the source database

Because it is only incremental synchronization, it can synchronize data with low latency in real time.

Use a temporal table in EventTime join so that you can get accurate results

Flink 1.11 extracts and converts these changelog to table apa and sql, which currently supports two formats: Debezium and Canal, which means that the source table is not only append operations, but also upsert and delete operations.

Canal

Next, we use canal as an example to briefly introduce the use of CDC.

Canal format:

{"data": [{"id": "13", "username": "13", "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9", "name": "Canal Manager V2"}], "old": [{"id": "13", "username": "13", "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9", "name": "Canal Manager"}] "database": "canal_manager", "es": 1568972368000, "id": 11, "isDdl": false, "mysqlType": {.}, "pkNames": ["id"], "sql": "", "sqlType": {...}, "table": "canal_user", "ts": 1568972369005, "type": "UPDATE"}

Briefly describe the following core fields:

Type: describes the type of operation, including the 'UPDATE',' INSERT', 'DELETE'.

Data: data that represents the operation. INSERT', indicates the contents of the row; UPDATE', indicates the updated state of the row; and DELETE', indicates the state before deletion.

Old: optional field, which, if present, represents the content before the update, or null if it is not a update operation.

The complete semantics are as follows

Private String destination; / / corresponding to the instance of canal or topic private String groupId; / / to the corresponding mq group id private String database; / / database or schema private String table of MQ / / Table name private List pkNames; private Boolean isDdl; private String type; / / Type: INSERT UPDATE DELETE / / binlog executeTime private Long es / / execution time / / dml build timeStamp private Long ts; / / synchronization time private String sql; / / sql executed, dml sql is empty private List data / / data list private List old / / Old data list, one-to-one mapping of size for update, size and data-the defined fields and the data in data want to match CREATE TABLE my_table (id BIGINT, name STRING, description STRING, weight DECIMAL (10,2)) WITH ('connector' =' kafka', 'topic' =' products_binlog', 'properties.bootstrap.servers' =' localhost:9092' 'properties.group.id' =' testGroup', 'canal-json.ignore-parse-errors'='true'-ignore parsing errors Default value false) CanalJson deserialization source code parsing

The canal format is also a flink format and is source, so it involves deserialization when reading data, so let's briefly take a look at the implementation of CanalJson deserialization. The concrete implementation class is CanalJsonDeserializationSchema.

Let's take a look at the core deserialization method:

@ Override public void deserialize (byte [] message, Collector out) throws IOException {try {/ / use json deserializer to deserialize message into RowData RowData row = jsonDeserializer.deserialize (message) / / get the type field, which is used to determine String type = row.getString (2). ToString () If (OP_INSERT.equals (type)) {/ / if the operation type is insert, then the data array represents the data to be inserted, iterate through the data, then add an identity INSERT, construct the RowData object, and send it downstream. ArrayData data = row.getArray (0); for (int I = 0; I < data.size (); iTunes +) {RowData insert = data.getRow (I, fieldCount); insert.setRowKind (RowKind.INSERT) Out.collect (insert) }} else if (OP_UPDATE.equals (type)) {/ / if it is a update operation, get the updated data from the data field, ArrayData data = row.getArray (0) The / / old field gets the data before the update ArrayData old = row.getArray (1); for (int I = 0; I < data.size (); iTunes +) {/ / the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow (I, fieldCount); GenericRowData before = (GenericRowData) old.getRow (I, fieldCount); for (int f = 0; f < fieldCount) ) {if (before.isNullAt (f)) {/ / if the old field is not empty, the data has been updated. If the old field is null, the data is the same before and after the update. At this time, the data of before is also set to that of after. That is, the before and after data sent downstream are the same. Before.setField (f, after.getField (f));}} before.setRowKind (RowKind.UPDATE_BEFORE); after.setRowKind (RowKind.UPDATE_AFTER) / / send the data before and after the update to downstream out.collect (before); out.collect (after) }} else if (OP_DELETE.equals (type)) {/ / if it is a delete operation, the data field contains the data to be deleted, which is organized and sent to the downstream ArrayData data = row.getArray (0) For (int I = 0; I < data.size (); iTunes +) {RowData insert = data.getRow (I, fieldCount); insert.setRowKind (RowKind.DELETE); out.collect (insert) }} else {if (! ignoreParseErrors) {throw new IOException (format ("Unknown\" type\ "value\" s\ ". The Canal JSON message is'% s'", type, new String (message));}} catch (Throwable t) {/ / a big try catch to protect the processing. If (! ignoreParseErrors) {throw new IOException (format ("Corrupt Canal JSON message'% slots.", new String (message)), t) } this is what the CDC in flink 1.11 means. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report