Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The method of integrating Canal by Springboot2.3.x

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces the relevant knowledge of "the method of Springboot2.3.x integrating Canal". The editor shows you the operation process through the actual case. The operation method is simple, fast and practical. I hope this "Springboot2.3.x Integration Canal method" article can help you solve the problem.

I. the background of the story

Recently, I encountered a problem of data synchronization in my work.

A sub-business of our system needs to rely on the data of another system, and when the data of another system changes, the database on our side needs to synchronize the data.

Well, there are only two ways of synchronization that I have come up with:

1. Subscribe to MQ. If the data of another system is changed, we will subscribe to MQ to accept the change of data method.

2. Trigger of database

However, both of them have been paas by the team leader!

1. MQ will cause code intrusion, but another system will not make any code changes for the time being.

2, the trigger of the database will be strongly associated with the production database, will preempt resources, and may even cause the instability of the production database.

I am very distressed about this.

So, we can only use the powerful google and Baidu to see if we can solve this problem. After searching, I learned a very interesting thing.

What is Canal

Canal: Ali Open Source mysql binlog data component

In the early days, Alibaba B2B company had the business demand of synchronizing across computer rooms because of the deployment of dual computer rooms in Hangzhou and the United States. However, the early database synchronization business was mainly based on trigger to obtain incremental changes. however, from 2010, Ali companies began to gradually try to analyze the log based on the database and obtain incremental changes for synchronization, which led to the incremental subscription-consumption business, which opened a new era. Ps. Currently, synchronization in internal use already supports log resolution of some versions of mysql5.x and oracle.

Canal, translated as waterway / pipe / ditch, is mainly used for incremental log parsing based on MySQL database, providing incremental data subscription and consumption

working principle

Canal simulates the interaction protocol of MySQL slave, disguises itself as MySQL slave, and sends dump protocol to MySQL master

MySQL master receives the dump request and starts to push binary log to slave (i.e. canal)

Canal parses binary log objects (originally byte streams)

Canal actually uses the principle of master-slave replication of Mysql.

Implementation of MySQL master-slave replication

Replication follows a three-step process:

The primary server records changes to binlog (these records are called binlog events and can be viewed by show binary events)

Copy the binary log events from the master server to the relay log from the slave server.

The slave server redo event in the relay log then updates its old data.

How does it work

The principle is simple:

Canal simulates the interaction protocol of the MySQL slave station, disguises it as the MySQL slave station, and then sends the dump protocol to the MySQL master server.

MySQL Master receives the dump request and starts pushing the binary log to slave (that is, the canal).

The canal parses the binary log object to its own data type (initially a byte stream)

Through the introduction of the official website, let us understand that canal is actually disguised as a slave library, we only need to subscribe to the master database of data changes, then canal will read the binlog log of its master database as a slave database! When we get the binlog log information parsed by canal, we get the changed data!

3. Canal installation (1) preparation beforehand (1) Database opening binlog

When using canal, there is a prerequisite, that is, the subscribed database needs to enable binlog.

How to check to see if binlog is enabled?

Log in to the database on the server or execute the query statement in the visualization tool: if log_bin ON appears, Binlog is enabled

Show variables like 'log_bin'

If the database on the server is installed by yourself, find the configuration file my.conf and add the following. If you bought a cloud instance, you can ask the manufacturer to open it.

Add the following three lines under [mysqld] in the my.conf file

Log-bin=mysql-bin # enable binlogbinlog-format=ROW # Select ROW mode to read the line server_id=1 # configure MySQL replaction need to be defined, do not repeat with canal's slaveId (2) create a new database account and enable MySQL slav permission

Canaltest: the account Canal123 as the slave role. : for password

CREATE USER canaltest IDENTIFIED BY 'Canal123..'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *. * TO' canaltest'@'%';GRANT ALL PRIVILEGES ON *. * TO 'canaltest'@'%'; FLUSH PRIVILEGES

Connection test

Then come here and get ready for the work!

Maybe, some of my friends are a little confused. What are you doing? Then let's deal with it! Knock on the blackboard!

1. Preparation in advance is aimed at the subscription database (that is, the main database)

2. The actual steps are only two steps: 1: change the configuration, open binlog 2: set up a new account and grant slave permissions for canal to read Binlog bridges.

3. The above operation has nothing to do with canal itself, just a prerequisite for the use of canal.

(2) Canal Admin installation

Canal admin is a visual canal web management operation and maintenance project, away from the previous server operation and maintenance, oriented to web.

Canal-admin is designed to provide canal with operation and maintenance-oriented functions such as overall configuration management and node operation and maintenance, and provides a relatively friendly WebUI operation interface to facilitate more users to operate quickly and safely.

Qualified dependencies for canal-admin:

MySQL, used to store configuration, node and other related data

Canal version, requirements > = 1.1.4 (need to rely on canal-server to provide dynamic operation and maintenance management interface for admin)

Requires a JRE environment (install JDK)

download

Wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

Decompression

Mkdir / usr/local/canal-admintar zxvf canal.admin-1.1.4.tar.gz-C / usr/local/canal-admin

Go to the canal-admin directory to view

Cd / usr/local/canal-admin

Modify configuration

Vim conf/application.yml

The configuration inside is changed according to your own actual situation.

Server: port: 8089spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8# here is the database that the configuration canal-admin depends on, stores the configuration set in the web management, etc. Spring.datasource: address: 127.0.0.1 database 3306 database: canal_manager username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://$ {spring.datasource.address} / ${spring.datasource.database}? useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: account password used for connection canal: adminUser: admin adminPasswd: leitest

Import the database files required by canaladmin

It should be noted here that to correspond to the database name in application.yml, you can choose the command import, or you can visually drag the sql file into the Navicat. Everything... It depends on how you like it.

My server for playing canal is newly installed. Mysql can be installed directly with docker. For more information, please see my blog:

The solution that Docker cannot download image timeout under CentOS7 (illustration)

CentOS 7 install Docker

It should be noted that the mysql installed with docker cannot directly use the mysql-uroot-p command. You need to copy the script to the container first. If you are not proficient in docker or find it troublesome, please directly use the Navicat visualization tool.

Import sql files required for canal-admin services

If the mysql is installed by the server software, you can directly execute the following command

Mysql-uroot-pendant. # Import initialization SQL > source conf/canal_manager.sql

Start

Just execute the startup script directly.

Cd bin./startup.sh

Default account password:

Admin:123456

(3) Canal Server installation

Canal-server is the core of canal. The function of canal that we talked about earlier is actually about the function of canal-server. Admin is just a web management, don't confuse the primary and secondary relationship.

download

Wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

Decompression

Mkdir / usr/local/canal-servertar zxvf canal.deployer-1.1.4.tar.gz-C / usr/local/canal-server

Start and connect to the canal-admin web side

First, we need to modify the configuration file

Cd / usr/local/canal-servervim / conf/canal_local.properties

Pay attention to how the password is encrypted!

Remember that the account password is set to admin:leitest in the aplication.yml of the previous canal-admin

# account password used for connection: canal: adminUser: admin adminPasswd: leitest

Therefore, we need to encrypt and replace the plaintext leitest here.

Use the database function PASSWORD to encrypt

SELECT PASSWORD ('plaintext to be encrypted'), and then remove the preceding * sign

Start and connect to admin

Sh bin/startup.sh local

Check the port to see if there are 11110, 11111, 11112

Netstat-untlp looked at it and found that it was not successful, indicating that server did not start successfully.

Check the log.

Vim logs/canal/canal.log

Solution:

1. Canal-admin stops first and then starts

2. Canal server runs in the previous form without entering the following local command

3. Close canal server

4. Start it in the form of canal server connection admin

Create a new server on the admin page

Modify the configuration and comment (for instance connection information, we still follow the admin:leitest set above, all of which need to be commented out here. If not, then we need to use this account and password for the connection in our code)

Next, let's create instance.

How to understand server and instance, I think, can be thought of as class and bean in java, that is, classes and objects.

Server for the class instance for its specific instance object, you can create multiple different instances.

On the other hand, if we listen to the changes in the main database, we can do different configurations for different instances, that is, (instance), according to the business.

Filter data according to your own situation

Canal.instance.filter.regexmysql data parsing concerned with the table, Perl regular expression. Multiple regularities are separated by commas (,), and the escape character requires a double slash (\) common example: 1. All tables:. * or.\... 2. All tables under canal schema: canal\... * 3. Table starting with canal under canal: canal\ .charts. * 4. A table under canal schema: canal\ .test15. Multiple rules are combined: canal\... *, whether mysql.test1,mysql.test2 (comma separated) canal.instance.filter.druid.ddl uses druid to process all ddl parsing to get the library and table name whether truecanal.instance.filter.query.dcl ignores dcl statements falsecanal.instance.filter.query.dml ignores dml statements (after mysql5.6, each DML statement in row mode also records SQL to binlog, see MySQL documentation) falsecanal.instance.filter.query.ddl ignores ddl statement false

For more settings, see the official website: https://github.com/alibaba/canal/wiki/AdminGuide

In this way, a simple canal environment is set up, and then let's start testing!

(4) springboot demo example

Introduce the dependencies required by canal

Com.alibaba.otter canal.client 1.1.4

Configuration

Canal: # instance instance ip host: 192.168.96.129 # tcp Communication Port port: 11111 # username: admin # password password: leitest # instance name instance: test set by account canal-admin application.yml

Code

Package com.leilei;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;import java.util.List / * @ author lei * @ version 1.0 * @ date 22:23 on 2020-9-27 * @ desc reads the binlog log * / @ Componentpublic class ReadBinLogService implements ApplicationRunner {@ Value ("${canal.host}") private String host; @ Value ("${canal.port}") private int port; @ Value ("${canal.username}") private String username; @ Value ("${canal.password}") private String password @ Value ("${canal.instance}") private String instance; @ Override public void run (ApplicationArguments args) throws Exception {CanalConnector conn = getConn (); while (true) {conn.connect (); / / subscribe to all databases and tables in the instance conn.subscribe (". *\. *") / / rollback to the place where ack is not performed conn.rollback (); / / get data to get 100 change data at a time Message message = conn.getWithoutAck (100); long id = message.getId (); int size = message.getEntries () .size () If (id! =-1 & & size > 0) {/ / data parsing analysis (message.getEntries ());} else {Thread.sleep (1000);} / confirmation message conn.ack (message.getId ()) / / close the connection conn.disconnect () }} / * data parsing * / private void analysis (List entries) {for (CanalEntry.Entry entry: entries) {/ / only parses the operations of mysql transactions, others do not parse if (entry.getEntryType () = = CanalEntry.EntryType.TRANSACTIONBEGIN) {continue If (entry.getEntryType () = = CanalEntry.EntryType.TRANSACTIONEND) {/ / parse binlog CanalEntry.RowChange rowChange = null; try {rowChange = CanalEntry.RowChange.parseFrom (entry.getStoreValue ());} catch (Exception e) {throw new RuntimeException ("parsing exception data:" + entry.toString (), e) If (rowChange! = null) {/ / get the operation type CanalEntry.EventType eventType = rowChange.getEventType (); / / get the database String dbName = entry.getHeader () .getSchemaName () to which the current operation belongs. / / get the table String tableName = entry.getHeader () .getTableName () to which the current operation belongs; / / transaction commit time long timestamp = entry.getHeader () .getExecuteTime () For (CanalEntry.RowData rowData: rowChange.getRowDatasList ()) {dataDetails (rowData.getBeforeColumnsList (), rowData.getAfterColumnsList (), dbName, tableName, eventType, timestamp) System.out.println ("- -") } * resolve the data of a specific Binlog message * * @ param dbName the name of the database to which the current operation belongs * @ the name of the table to which the current operation belongs * @ param eventType the current operation type (add, modify, delete) private static void dataDetails (List beforeColumns, List afterColumns String dbName, String tableName, CanalEntry.EventType eventType, long timestamp) {System.out.println ("Database:" + dbName) System.out.println ("Table name:" + tableName); System.out.println ("Operation Type:" + eventType); if (CanalEntry.EventType.INSERT.equals (eventType)) {System.out.println ("New data:"); printColumn (afterColumns) } else if (CanalEntry.EventType.DELETE.equals (eventType)) {System.out.println ("delete data:"); printColumn (beforeColumns);} else {System.out.println ("update data: data before update -"); System.out.println ("update data: data after update -") System.out.println ("operating time:" + timestamp); private static void printColumn (List columns) {for (CanalEntry.Column column: columns) {System.out.println (column.getName () + ":" + column.getValue () + "update=" + column.getUpdated ()) * get connection public CanalConnector getConn () {return CanalConnectors.newSingleConnector (new InetSocketAddress (host, port), instance, username, password);}

Test View

When the database modifies the database

When data is added to data

Delete the data (delete the Xiaoming we just added)

When we operate the monitored database DM L, it will be monitored by canal. We, through canal monitoring, get modified libraries, modified tables, modified fields, and can process data according to our own business!

Ah, at this time, ah, a small partner may be about to ask, so, can I directly get the sql statement of its operation?

At present, I parse its column myself to manually assemble the sql statement to implement the

If you don't say much, let's start with the effect:

Canal detected the change of sql in the main library-> update students set id= '2mm, age =' 999mm, name = 'mistress', city = '11mm, date =' 2020-09-27 17R 41V 44th, birth = '2020-09-27 18Rd 0048' where id=2

Canal listens to the sql change of the main library-> delete from students where id=6

Canal monitored the change of sql in the main library-> insert into students (id,age,name,city,date,birth) VALUES. (added to the testing of '89th Personality', 'Shenzhen', '2020-09-27 22 Plex 46 insert into students 53')

Canal detected the change of sql in the main library-> update students set id='89, age ='98, name = 'test added', city = 'Shenzhen', date = '2020-09-27 22 update students set id= 46 canal 53 where id=89, birth =' 2020-09-27 22 14 14 46 V 56'

In fact, we just get it to perform the front-line data changes and implement the post-column data changes, and we just splice a sql. Attach code

Package com.leilei;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.exception.CanalClientException;import com.google.protobuf.InvalidProtocolBufferException;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;import java.net.InetSocketAddress Import java.util.List;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;/** * @ author lei * @ version 1.0 * @ date 2020-9-27 22:33 * @ desc reads binlog logs * / @ Componentpublic class ReadBinLogToSql implements ApplicationRunner {/ / read binlog sql queue cache side Push side poll private Queue canalQueue = new ConcurrentLinkedQueue (); @ Value ("${canal.host}") private String host @ Value ("${canal.port}") private int port; @ Value ("${canal.username}") private String username; @ Value ("${canal.password}") private String password; @ Value ("${canal.instance}") private String instance; @ Override public void run (ApplicationArguments args) throws Exception {CanalConnector conn = getConn () While (true) {try {conn.connect (); / subscribe to all databases and tables in the instance conn.subscribe (". *\\. *"); / / rollback to the place where ack is not performed conn.rollback () / / get data one hundred items at a time Message message = conn.getWithoutAck (100); long id = message.getId (); int size = message.getEntries () .size () If (id! =-1 & & size > 0) {/ / data parsing analysis (message.getEntries ());} else {Thread.sleep (1000);} / confirmation message conn.ack (message.getId ()) } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) {e.printStackTrace ();} finally {/ / close the connection conn.disconnect () }} private void analysis (List entries) throws InvalidProtocolBufferException {for (Entry entry: entries) {if (EntryType.ROWDATA = = entry.getEntryType ()) {RowChange rowChange = RowChange.parseFrom (entry.getStoreValue ()); EventType eventType = rowChange.getEventType (); if (eventType = = EventType.DELETE) {saveDeleteSql (entry) } else if (eventType = = EventType.UPDATE) {saveUpdateSql (entry);} else if (eventType = = EventType.INSERT) {saveInsertSql (entry) }} / * Save update statement * * @ param entry * / private void saveUpdateSql (Entry entry) {try {RowChange rowChange = RowChange.parseFrom (entry.getStoreValue ()); List dataList = rowChange.getRowDatasList () For (RowData rowData: dataList) {List afterColumnsList = rowData.getAfterColumnsList (); StringBuffer sql = new StringBuffer ("update" + entry.getHeader (). GetTableName () + "set"); for (int I = 0; I

< afterColumnsList.size(); i++) { sql.append(" ") .append(afterColumnsList.get(i).getName()) .append(" = '").append(afterColumnsList.get(i).getValue()) .append("'"); if (i != afterColumnsList.size() - 1) { sql.append(","); } } sql.append(" where "); List oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { sql.append(column.getName()).append("=").append(column.getValue()); break; } } canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存删除语句 * * @param entry */ private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { sql.append(column.getName()).append("=").append(column.getValue()); break; } } canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入语句 * * @param entry */ private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List datasList = rowChange.getRowDatasList(); for (RowData rowData : datasList) { List columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); canalQueue.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 获取连接 */ public CanalConnector getConn() { return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password); } /** * 模拟消费canal转换的sql语句 */ public void executeQueueSql() { int size = canalQueue.size(); for (int i = 0; i < size; i++) { String sql = canalQueue.poll(); System.out.println("canal 监听到主库sql变化---->

"+ sql);} this is the end of the introduction of" Springboot2.3.x 's method of integrating Canal ". Thank you for reading. If you want to know more about the industry, you can follow the industry information channel. The editor will update different knowledge points for you every day.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report