Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use flink sql cdc

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to use flink sql cdc". Friends who are interested might as well take a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn how to use flink sql cdc.

Preface

CDC,Change Data Capture, short for change data acquisition, using CDC, we can get committed changes from the database and send them downstream for downstream use. These changes can include INSERT,DELETE,UPDATE and so on.

Users can use cdc in the following scenarios:

Real-time data synchronization: for example, we synchronize data from the mysql library to our data warehouse.

Real-time materialized view of the database.

Flink consumes cdc data

In the previous data synchronization, for example, if we want to obtain the data of the database in real time, the general architecture is to use third-party tools, such as canal, debezium, to collect the change log of the database in real time, and then send the data to kafka and other message queues. Then the data of kafka is consumed by other components, such as flink, spark, etc., and then calculated and sent to the downstream system. The overall architecture is as follows:

For the above architecture, flink plays the role of computing layer. Currently, there are two formats of format provided by flink: canal-json and debezium-json, which we will briefly introduce below.

Canal format

In China, we often use Alibaba's open source canal. We can use canal to subscribe to mysql's binlog log. Canal will organize the change data of the mysql library into its fixed JSON or protobuf format and send it to kafka for downstream use.

The format of json data parsed by canal is as follows:

{

"data": [

{

Id: 111l

"name": "scooter"

"description": "Big 2-wheel scooter"

"weight": "5.18"

}

]

"database": "inventory"

"es": 1589373560000

"id": 9

"isDdl": false

"mysqlType": {

"id": "INTEGER"

"name": "VARCHAR

"description": "VARCHAR"

"weight": "FLOAT"

}

"old": [

{

"weight": "5.15"

}

]

"pkNames": [

"id"

]

"sql":

"sqlType": {

"id": 4

"name": 12

"description": 12

"weight": 7

}

"table": "products"

"ts": 1589373560798

"type": "UPDATE"

}

Briefly describe the following core fields:

Type: describes the type of operation, including the 'UPDATE',' INSERT', 'DELETE'.

Data: data that represents the operation. INSERT', indicates the contents of the row; UPDATE', indicates the updated state of the row; and DELETE', indicates the state before deletion.

Old: optional field, which, if present, represents the content before the update, or null if it is not a update operation.

The complete semantics are as follows

Private String destination; / / corresponding instance of canal or topic of MQ

Private String groupId; / / group id of the corresponding mq

Private String database; / / Database or schema

Private String table; / / Table name

Private List pkNames

Private Boolean isDdl

Private String type; / / Type: INSERT UPDATE DELETE

/ / binlog executeTime

Private Long es; / / execution takes time

/ / dml build timeStamp

Private Long ts; / / synchronization time

Sql executed by private String sql; / /, dml sql is empty

Private List data; / / data list

Private List old; / / Old data list, used for one-to-one mapping of update, size and data

In flink sql, the sql for consuming this data is as follows:

CREATE TABLE topic_products (

Id BIGINT

Name STRING

Description STRING

Weight DECIMAL (10,2)

) WITH (

'connector' = 'kafka'

'topic' = 'products_binlog'

'properties.bootstrap.servers' = 'localhost:9092'

'properties.group.id' = 'testGroup'

'format' =' canal-json'-- using canal-json as the format

)

The fields and types of the table in DDL should match those in mysql, and then we can write flink sql to query our defined topic_products.

Debezium format

In foreign countries, a more famous open source tool similar to canal is debezium, which is more powerful than canal and not only supports mysql. Other database synchronization is also supported, such as PostgreSQL, Oracle, etc. Currently, debezium supports serialization formats such as JSON and Apache Avro.

The format provided by debezium is as follows:

{

"before": {

"id": 111

"name": "scooter"

"description": "Big 2-wheel scooter"

"weight": 5.18

}

"after": {

"id": 111

"name": "scooter"

"description": "Big 2-wheel scooter"

"weight": 5.15

}

Source: {...}

"op": "u"

"ts_ms": 1589362330904

"transaction": null

}

Similarly, when using flink sql for consumption, sql is similar to using canal above, except that you need to change foramt to debezium-json.

CanalJson deserialization source code parsing

Next, let's take a look at the implementation of canal-json format in the source code of flink. The canal format is a flink format and is source, so when it comes to deserialization when reading data, let's briefly take a look at the implementation of CanalJson deserialization. The concrete implementation class is CanalJsonDeserializationSchema.

Let's take a look at the core deserialization method:

@ Override

Public void deserialize (byte [] message, Collector out) throws IOException {

Try {

/ / use json deserializer to deserialize message into RowData

RowData row = jsonDeserializer.deserialize (message)

/ / get the type field, which is used for the following judgment

String type = row.getString (2). ToString ()

If (OP_INSERT.equals (type)) {

/ / if the operation type is insert and the data array represents the data to be inserted, iterate through the data, then add an identity INSERT, construct the RowData object, and send it downstream.

ArrayData data = row.getArray (0)

For (int I = 0; I

< data.size(); i++) { RowData insert = data.getRow(i, fieldCount); insert.setRowKind(RowKind.INSERT); out.collect(insert); } } else if (OP_UPDATE.equals(type)) { // 如果是update操作,从data字段里获取更新后的数据、 ArrayData data = row.getArray(0); // old字段获取更新之前的数据 ArrayData old = row.getArray(1); for (int i = 0; i < data.size(); i++) { // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f)) { //如果old字段非空,则说明进行了数据的更新,如果old字段是null,则说明更新前后数据一样,这个时候把before的数据也设置成after的,也就是发送给下游的before和after数据一样。 before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); //把更新前后的数据都发送下游 out.collect(before); out.collect(after); } } else if (OP_DELETE.equals(type)) { // 如果是删除操作,data字段里包含将要被删除的数据,把这些数据组织起来发送给下游 ArrayData data = row.getArray(0); for (int i = 0; i < data.size(); i++) { RowData insert = data.getRow(i, fieldCount); insert.setRowKind(RowKind.DELETE); out.collect(insert); } } else { if (!ignoreParseErrors) { throw new IOException(format( "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message))); } } } catch (Throwable t) { // a big try catch to protect the processing. if (!ignoreParseErrors) { throw new IOException(format( "Corrupt Canal JSON message '%s'.", new String(message)), t); } } } flink cdc connector背景 对于上面的架构,我们需要部署canal(debezium)+ kafka,然后flink再从kafka消费数据,这种架构下我们需要部署多个组件,并且数据也需要落地到kafka,有没有更好的方案来精简下这个流程呢?我们接下来讲讲flink提供的cdc connector。 这个connector并没有包含在flink的代码里,具体的地址是在https://github.com/ververica/flink-cdc-connectors里,详情大家可以看下这里面的内容。 这种架构下,flink直接消费数据库的增量日志,替代了原来作为数据采集层的canal(debezium),然后直接进行计算,经过计算之后,将计算结果 发送到下游。整体架构如下:

The benefits of using this architecture are:

Reduced maintenance costs for canal and kafka, shorter links and lower latency

Flink provides exactly once semantics

Can be read from the specified position

Kafka is removed to reduce the cost of message storage.

Mysql-cdc

At present, flink supports two kinds of built-in connector,PostgreSQL and mysql, so let's take mysql as an example.

Before using it, we need to introduce the pom of the corresponding pom,mysql as follows:

Com.alibaba.ververica

Flink-connector-mysql-cdc

1.1.0

If it is used by sql client, download flink-sql-connector-mysql-cdc-1.1.0.jar and put it under / lib/

An example sql for connecting to a mysql database is as follows:

CREATE TABLE mysql_binlog (

Id INT NOT NULL

Name STRING

Description STRING

Weight DECIMAL (10par 3)

) WITH (

'connector' = 'mysql-cdc'

'hostname' = 'localhost'

'port' =' 3306'

'username' = 'flinkuser'

'password' = 'flinkpw'

'database-name' = 'inventory'

'table-name' = 'products'

)

If you subscribe to a postgres database, we need to replace connector with an one-to-one correspondence between the schema of the table in postgres-cdc,DDL and the database.

For more detailed configuration, see:

Https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

Mysql-cdc connector source code parsing

Let's take mysql-cdc as an example to see how the source level is implemented. Since it is a connector of sql, there will be a corresponding TableFactory first, then a corresponding source will be constructed in the factory class, and finally the consumed data will be converted into RowData format recognized by flink and sent downstream.

Let's follow this line of thinking to look at the implementation of the flink cdc source code.

In flink-connector-mysql-cdc module, find its corresponding factory class: MySQLTableSourceFactory, and enter the createDynamicTableSource (Context context) method. In this method, a MySQLTableSource class is constructed using the host, dbname, and other information obtained from the properties in ddl.

MySQLTableSource

In the MySQLTableSource#getScanRuntimeProvider method, we see that we first construct an object RowDataDebeziumDeserializeSchema for serialization, which is mainly used to convert the data obtained by Debezium in SourceRecord format into RowData objects recognized by flink. Let's take a look at the RowDataDebeziumDeserializeSchem#deserialize method. The main operation here is to judge the incoming data types (insert, update, delete), and then convert them for different types (short, int, etc.).

Finally, we see that the Source function used by flink to get the database change log is DebeziumSourceFunction, and the final return type is RowData.

In other words, the bottom layer of flink is the change data obtained from mysql, postgres and other databases using Debezium tools.

@ SuppressWarnings ("unchecked")

@ Override

Public ScanRuntimeProvider getScanRuntimeProvider (ScanContext scanContext) {

RowType rowType = (RowType) physicalSchema.toRowDataType () .getLogicalType ()

TypeInformation typeInfo = (TypeInformation) scanContext.createTypeInformation (physicalSchema.toRowDataType ())

DebeziumDeserializationSchema deserializer = new RowDataDebeziumDeserializeSchema (

RowType

TypeInfo

((rowData, rowKind)-> {})

ServerTimeZone)

MySQLSource.Builder builder = MySQLSource.builder ()

.hostname (hostname)

.

DebeziumSourceFunction sourceFunction = builder.build ()

Return SourceFunctionProvider.of (sourceFunction, false)

}

DebeziumSourceFunction

Let's take a look at the DebeziumSourceFunction class.

@ PublicEvolving

Public class DebeziumSourceFunction extends RichSourceFunction implements

CheckpointedFunction

ResultTypeQueryable {

.

}

We see that the DebeziumSourceFunction class inherits RichSourceFunction and implements the CheckpointedFunction interface, which means that this class is a SourceFunction of flink and takes data from the source side (the run method) and sends it downstream. In addition, this class also implements the CheckpointedFunction interface, that is, exactly once semantics are guaranteed through the mechanism of checkpoint.

Next, let's go to the run method to see how to get the change data from the database.

@ Override

Public void run (SourceContext sourceContext) throws Exception {

..

/ / DO NOT include schema change, e.g. DDL

Properties.setProperty ("include.schema.changes", "false")

..

/ / print out all attribute information for troubleshooting.

/ / dump the properties

String propsString = properties.entrySet () .stream ()

.map (t-> "\ t" + t.getKey () .toString () + "=" + t.getValue () .toString () + "\ n")

.notify (Collectors.joining ())

LOG.info ("Debezium Properties:\ n {}", propsString)

/ / Logic for specific data processing

This.debeziumConsumer = new DebeziumChangeConsumer (

SourceContext

Deserializer

RestoredOffsetState = = null, / / DB snapshot phase if restore state is null

This::reportError)

/ / create the engine with this configuration...

This.engine = DebeziumEngine.create (Connect.class)

.using (properties)

Notifying (debeziumConsumer) / / data is sent to the debeziumConsumer above

.using ((success, message, error)-> {

If (! success & & error! = null) {

This.reportError (error)

}

})

.build ()

If (! running) {

Return

}

/ / run the engine asynchronously

Executor.execute (engine)

/ / Loop judgment, when the program is interrupted, or there is an error, break the engine and throw an exception

/ / on a clean exit, wait for the runner thread

Try {

While (running) {

If (executor.awaitTermination (5, TimeUnit.SECONDS)) {

Break

}

If (error! = null) {

Running = false

ShutdownEngine ()

/ / rethrow the error from Debezium consumer

ExceptionUtils.rethrow (error)

}

}

}

Catch (InterruptedException e) {

/ / may be the result of a wake-up interruption after an exception.

/ / we ignore this here and only restore the interruption state

Thread.currentThread () .interrupt ()

}

}

At the beginning of the function, a lot of properties is set, for example, include.schema.changes is set to false, that is, DDL operations that do not include tables, and changes to the table structure are not captured. We only focus on the addition, deletion and modification of the data here.

Next, we construct a DebeziumChangeConsumer object, which implements the DebeziumEngine.ChangeConsumer interface, which is mainly to process a batch of data one by one.

Next, set a DebeziumEngine object, which is really used for work, and its underlying layer uses kafka's connect-api to get the data, and the result is an org.apache.kafka.connect.source.SourceRecord object. The resulting data is passed to the DebeziumChangeConsumer defined above through the notifying method to override the default implementation for complex operations.

Next, the engine is started asynchronously through a thread pool ExecutorService.

Finally, a loop judgment is made to break the engine and throw an exception when the program is interrupted or there is an error.

To sum up, it is in the source function of Flink that the Debezium engine is used to obtain the corresponding database change data (SourceRecord). After a series of deserialization operations, it is finally converted to a RowData object in flink and sent to the downstream.

Changelog format usage scenario

When we get the change data from the database from mysql-cdc, or write a query for group by, the result data is constantly changing. How can we send the change data to the kafka queue that only supports append mode?

So flink provides a kind of changelog format, in fact, we simply understand that flink wraps the incoming RowData data, and then adds a data operation type, including the following INSERT,DELETE, UPDATE_BEFORE,UPDATE_AFTER. In this way, when the downstream obtains this data, we can judge how to operate the data according to the type of data.

For example, our original data format is like this.

{"day": "2020-06-18", "gmv": 100}

After processing in changelog format, it becomes the following format:

{"data": {"day": "2020-06-18", "gmv": 100}, "op": "+ I"}

In other words, changelog format wraps the native format and adds an op field to indicate the type of operation of the data. Currently, there are the following:

+ I: insert operation.

-U: update the previous data content:

+ U: the content of the updated data.

-D: delete operation.

Example

When using it, you need to introduce the corresponding pom.

Com.alibaba.ververica

Flink-format-changelog-json

1.1.0

You can use flink sql as follows:

CREATE TABLE kafka_gmv (

Day_str STRING

Gmv DECIMAL (10,5)

) WITH (

'connector' = 'kafka'

'topic' = 'kafka_gmv'

'scan.startup.mode' = 'earliest-offset'

'properties.bootstrap.servers' = 'localhost:9092'

'format' = 'changelog-json'

);

We define a kafka connector whose format is changelog-json, and then we can write and query it.

For a complete code and configuration, please refer to:

Https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

Analysis of source code

As a format of flink, we mainly look at its serialization and sending serialization methods. Changelog-json uses the flink-json package for json processing.

Deserialization

Deserialization uses the ChangelogJsonDeserializationSchema class, and in its constructor, we see that we mainly construct a json serializer jsonDeserializer to process the data.

Public ChangelogJsonDeserializationSchema (

RowType rowType

TypeInformation resultTypeInfo

Boolean ignoreParseErrors

TimestampFormat timestampFormatOption) {

This.resultTypeInfo = resultTypeInfo

This.ignoreParseErrors = ignoreParseErrors

This.jsonDeserializer = new JsonRowDataDeserializationSchema (

CreateJsonRowType (fromLogicalToDataType (rowType))

/ / the result type is never used, so it's fine to pass in Debezium's result type

ResultTypeInfo

False, / / ignoreParseErrors already contains the functionality of failOnMissingField

IgnoreParseErrors

TimestampFormatOption)

}

The createJsonRowType method specifies that the format of changelog is a format of type Row. Let's take a look at the code:

Private static RowType createJsonRowType (DataType databaseSchema) {

DataType payload = DataTypes.ROW (

DataTypes.FIELD ("data", databaseSchema)

DataTypes.FIELD ("op", DataTypes.STRING ())

Return (RowType) payload.getLogicalType ()

}

Here, you specify that the row format has two fields, one is data, which represents the content of the data, and the other is op, which represents the type of operation.

Finally, take a look at the core ChangelogJsonDeserializationSchema#deserialize (byte [] bytes, Collector out >)

@ Override

Public void deserialize (byte [] bytes, Collector out) throws IOException {

Try {

GenericRowData row = (GenericRowData) jsonDeserializer.deserialize (bytes)

GenericRowData data = (GenericRowData) row.getField (0)

String op = row.getString (1) .toString ()

RowKind rowKind = parseRowKind (op)

Data.setRowKind (rowKind)

Out.collect (data)

} catch (Throwable t) {

/ / a big try catch to protect the processing.

If (! ignoreParseErrors) {

Throw new IOException (format (

"Corrupt Debezium JSON message'% slots.", new String (bytes)), t)

}

}

}

Use jsonDeserializer to process the data, then judge the second field op and add the corresponding RowKind.

Serialization

Serialization method Let's take a look at the method: ChangelogJsonSerializationSchema#serialize

@ Override

Public byte [] serialize (RowData rowData) {

Reuse.setField (0, rowData)

Reuse.setField (1, stringifyRowKind (rowData.getRowKind ()

Return jsonSerializer.serialize (reuse)

}

This block is not difficult, just serialize the RowData of flink into a byte array using jsonSerializer.

At this point, I believe you have a deeper understanding of "how to use flink sql cdc". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report