Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The method of integrating Canal data synchronization with SpringBoot

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "the method of SpringBoot integrating Canal data synchronization". In the daily operation, I believe that many people have doubts about the method of SpringBoot integrating Canal data synchronization. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "SpringBoot integration Canal data synchronization method". Next, please follow the editor to study!

1. CentOS7 compilation and installation MySQL5.7.242, Mysql setting binLog configuration

(1) check whether the binlog function is enabled

(2) if the display status is OFF, it means that the feature is not enabled, and the binlog function is enabled.

Mysql > show variables like 'log_bin';+-+-+ | Variable_name | Value | +-+-+ | log_bin | OFF | +-+-+ 1 row in set (0.00 sec)

1. Modify the configuration file my.cnf of mysql

Vi / etc/my.cnf

Additional content:

Log-bin=mysql-bin # binlog filename binlog_format=ROW # Select row mode server_id=1 # mysql instance id, which cannot be duplicated with the slaveId of canal

2. Restart mysql:

Service mysql restart

3. Log in to the mysql client and view the log_bin variable

Mysql > show variables like 'log_bin';+-+-+ | Variable_name | Value | +-+-+ | log_bin | ON | +-+-+ 1 row in set (0.00 sec) if the display status is ON, the feature is enabled

(3) add the following relevant users and permissions to mysql

CREATE USER 'canal'@'%' IDENTIFIED BY' canal';GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *. * TO'canal'@'%';FLUSH PRIVILEGES;3, Linux download and install Canal service

Download address:

Https://github.com/alibaba/canal/releases

(1) after downloading, put it in the directory and decompress the file

Mkdir / usr/local/canalcd / usr/local/canalcanal.deployer-1.1.4.tar.gztar zxvf canal.deployer-1.1.4.tar.gz

(2) modify the configuration file

Vi conf/example/instance.properties

# need to change to your own database information canal.instance.master.address=39.106.224.236:3306# needs to change to your own database user name and password canal.instance.dbUsername=canalcanal.instance.dbPassword=canal# needs to be changed to synchronized database table rules # 1, synchronize all tables canal.instance.filter.regex=.*\\.. * # 2, the table in the library that needs to be synchronized # canal.instance.filter.regex=guli_ucenter.ucenter_member

Mysql data parsing concerned with the table, Perl regular expression.

Multiple regularities are separated by commas (,), and the escape character requires a double slash (\)

Common examples:

All tables:. * or.\...

All tables under canal schema: canal\... *

The table under canal that starts with canal: canal\ .charts. *

A table under canal schema: canal.test1

Multiple rules are combined: canal\... *, mysql.test1,mysql.test2 (separated by commas)

Note: this filter condition is only valid for row schema data (ps. Mixed/statement cannot accurately mention sql because it does not parse it.

Take tableName for filtering).

(3) start under the bin directory

. / startup.sh4 、 Introducing dependency org.springframework.boot spring-boot-starter-web mysql mysql-connector-java commons-dbutils commons-dbutils org.springframework.boot spring-boot-starter-jdbc into Boot projects Com.alibaba.otter canal.client 5 、 Modify properties configuration file # mysql database connection spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.datasource.url=jdbc:mysql://localhost:3306/guli?serverTimezone=GMT%2B8spring.datasource.username=rootspring.datasource.password=1234566, modify Application startup class

7. Create Canal configuration class for automatic listening

Note: use it directly after modifying the Ip address of the Linux virtual machine in the run () method!

Package com.atguigu.canal.client;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;import org.apache.commons.dbutils.DbUtils;import org.apache.commons.dbutils.QueryRunner;import org.springframework.stereotype.Component;import javax.annotation.Resource;import javax.sql.DataSource;import java.net.InetSocketAddress;import java.sql.Connection Import java.sql.SQLException;import java.util.List;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;@Componentpublic class CanalClient {/ / sql queue private Queue SQL_QUEUE = new ConcurrentLinkedQueue (); @ Resource private DataSource dataSource / * canal storage method * / public void run () {CanalConnector connector = CanalConnectors.newSingleConnector (new / * * where Ip address is linux virtual machine address * port number is fixed 11111 * other no need to modify * / InetSocketAddress ("39.106.224.236", 11111), "example" ") Int batchSize = 1000; try {connector.connect (); connector.subscribe (". *\\. *"); connector.rollback () Try {while (true) {/ / attempts to pull data batchSize records from master, and how many Message message = connector.getWithoutAck (batchSize); long batchId = message.getId (); int size = message.getEntries () .size () If (batchId = =-1 | | size = = 0) {Thread.sleep (1000);} else {dataHandle (message.getEntries ());} connector.ack (batchId) / / simulate the execution of if (SQL_QUEUE.size () > = 1) {executeQueueSql ();} catch (InterruptedException e) {e.printStackTrace () when the sql accumulated in the queue is greater than a certain value } catch (InvalidProtocolBufferException e) {e.printStackTrace ();}} finally {connector.disconnect ();}} / * simulate the execution of sql statements in the queue * / public void executeQueueSql () {int size = SQL_QUEUE.size (); for (int I = 0; I

< size; i++) { String sql = SQL_QUEUE.poll(); System.out.println("[sql]---->

"+ sql); this.execute (sql.toString ());}} / * * data processing * @ param entrys * / private void dataHandle (List entrys) throws InvalidProtocolBufferException {for (Entry entry: entrys) {if (EntryType.ROWDATA = = entry.getEntryType ()) {RowChange rowChange = RowChange.parseFrom (entry.getStoreValue ()) EventType eventType = rowChange.getEventType (); if (eventType = = EventType.DELETE) {saveDeleteSql (entry);} else if (eventType = = EventType.UPDATE) {saveUpdateSql (entry);} else if (eventType = = EventType.INSERT) {saveInsertSql (entry) }} / * Save update statement * @ param entry * / private void saveUpdateSql (Entry entry) {try {RowChange rowChange = RowChange.parseFrom (entry.getStoreValue ()); List rowDatasList = rowChange.getRowDatasList () For (RowData rowData: rowDatasList) {List newColumnList = rowData.getAfterColumnsList (); StringBuffer sql = new StringBuffer ("update" + entry.getHeader (). GetTableName () + "set"); for (int I = 0; I < newColumnList.size ()) Sql.append ("" + newColumnList.get (I). GetName () + "='" + newColumnList.get (I). GetValue () + "'"); if (I! = newColumnList.size ()-1) {sql.append (",") } sql.append ("where"); List oldColumnList = rowData.getBeforeColumnsList () For (Column column: oldColumnList) {if (column.getIsKey ()) {/ / only supports a single primary key sql.append (column.getName () + "=" + column.getValue ()); break }} SQL_QUEUE.add (sql.toString ());}} catch (InvalidProtocolBufferException e) {e.printStackTrace () }} / * Save delete statement * * @ param entry * / private void saveDeleteSql (Entry entry) {try {RowChange rowChange = RowChange.parseFrom (entry.getStoreValue ()); List rowDatasList = rowChange.getRowDatasList (); for (RowData rowData: rowDatasList) {List columnList = rowData.getBeforeColumnsList () StringBuffer sql = new StringBuffer ("delete from" + entry.getHeader (). GetTableName () + "where"); for (Column column: columnList) {if (column.getIsKey ()) {/ / only supports a single primary key sql.append (column.getName () + "=" + column.getValue ()) Break;}} SQL_QUEUE.add (sql.toString ());}} catch (InvalidProtocolBufferException e) {e.printStackTrace () }} / * Save insert statement * * @ param entry * / private void saveInsertSql (Entry entry) {try {RowChange rowChange = RowChange.parseFrom (entry.getStoreValue ()); List rowDatasList = rowChange.getRowDatasList (); for (RowData rowData: rowDatasList) {List columnList = rowData.getAfterColumnsList () StringBuffer sql = new StringBuffer ("insert into" + entry.getHeader (). GetTableName () + "("); for (int I = 0; I < columnList.size ()) {sql.append (columnList.get (I). GetName ()) If (I! = columnList.size ()-1) {sql.append (",");}} sql.append (") VALUES ("); for (int I = 0; I < columnList.size () GetValue +) {sql.append ("'" + columnList.get (I). GetValue () + "); if (I! = columnList.size ()-1) {sql.append (", ");}} sql.append (")) SQL_QUEUE.add (sql.toString ());}} catch (InvalidProtocolBufferException e) {e.printStackTrace ();}} / * Storage * @ param sql * / public void execute (String sql) {Connection con = null; try {if (null = = sql) return Con = dataSource.getConnection (); QueryRunner qr = new QueryRunner (); int row = qr.execute (con, sql); System.out.println ("update:" + row);} catch (SQLException e) {e.printStackTrace ();} finally {DbUtils.closeQuietly (con) At this point, the study on "the method of SpringBoot integrating Canal data synchronization" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report