Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use canal+Kafka for database synchronization

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces how to use canal+Kafka for database synchronization operation related knowledge, the content is detailed and easy to understand, the operation is simple and fast, with a certain reference value, I believe that everyone after reading this article on how to use canal+Kafka for database synchronization operation will have a harvest, let's take a look.

In the usual work, the database is often used by us. In the micro-service split architecture, each service has its own database, so we often encounter the problem of data communication between services. For example, the data of B service database comes from the database of A service; when the data of A service has change operation, it needs to be synchronized to B service.

The first solution:

In the code logic, when there is a relevant A service data write operation, the B service interface is called by calling the interface, and the B service writes the data to the new database. This approach may seem simple, but in fact there are a lot of "tricks". A large amount of code calling interface synchronization will be added to the A service code logic, which increases the complexity of the project code, and it will be more and more difficult to maintain in the future. Moreover, the way of interface call is not a stable way, there is no retry mechanism, no synchronous location record, how to deal with the failure of interface call, the problems caused by a sudden large number of interface calls, and so on. all these should be considered and dealt with in the business. There will be a lot of work here. When I thought of this, I ruled out the plan.

The second solution:

Synchronize through the binlog of the database. This solution is independent of A service and does not have code coupling with A service. Data can be transferred through a direct TCP connection, which is better than the way called by the interface. This is a mature production solution, and there are many middleware tools for binlog synchronization, so we focus on which tool can better build a stable, high-performance, and easy-to-deploy solution.

After research, we chose canal. Canal is Alibaba MySQL binlog incremental subscription & consumption component, there are already examples of practice in production, and convenient support and other commonly used middleware components, such as kafka,elasticsearch, etc., there is also a canal-go go language client library to meet our needs on go.

Work flow

Canal connects to database An and simulates slave

Canal-client establishes a connection with Canal and subscribes to the corresponding database table

A database changes write to binlog,Canal send dump request to database, get binlog and parse, send parsed data to canal-client

Canal-client receives the data and synchronizes it to the new database

Protocol Buffer serialization is still very fast. The data obtained after deserialization is the data of each row, which is put into an array according to the structure of the field name and the value of the field. Simple code example:

Func Handler (entry protocol.Entry) {var keys [] string rowChange: = & protocol.RowChange {} proto.Unmarshal (entry.GetStoreValue (), rowChange) if rowChange! = nil {eventType: = rowChange.GetEventType () for _ RowData: = range rowChange.GetRowDatas () {/ / traverse each row of data if eventType = = protocol.EventType_DELETE | | eventType = = protocol.EventType_UPDATE {columns: = rowData.GetBeforeColumns () / / get all field attributes before the change} else if eventType = = protocol.EventType_INSERT {columns: = rowData.GetAfterColumns () / / get all field properties before the change}. }}}

Problems encountered

For high availability and higher performance, we will create a cluster of multiple canal-client to parse and synchronize to the new database. Here is a more important question, how to ensure that the canal-client cluster parses the consumption binlog sequentially?

The binlog we use is row mode. Each write operation produces a binlog log. To take a simple example: insert a record and modify it immediately. In this way, two messages will be sent to canal-client. If, due to network and other reasons, the updated message is processed earlier than the inserted message, and no record has been inserted, the final effect of the update operation will fail.

What should I do? Canal can be combined with message queues. And support a variety of kafka,rabbitmq,rocketmq options, so excellent. We implement message sequencing at the message queuing layer.

Select the canal+kafka scheme

We choose the industry benchmark of message queuing: kafka UCloud provides kafka and rocketMQ message queuing product services, which can be used to build a message queuing system quickly and easily. Accelerate development and facilitate operation and maintenance.

Let's take a look at this:

1. Select kafka message queuing product and apply for activation

! [kafka message queue] (https://atts.yisu.com/attachments/image/20200817/1597643207499951.jpg "kafka message queue")

two。 After activation, create a kafka cluster in the management interface, and select the corresponding hardware configuration according to your own needs.

! [hardware configuration] (https://atts.yisu.com/attachments/image/20200817/1597643247605810.jpg "hardware configuration")

3. A kafka + ZooKeeper cluster has been set up.

! [kafka+ZooKeeper Cluster] (https://atts.yisu.com/attachments/image/20200817/1597643275823817.jpg "kafka+ZooKeeper Cluster")

And it includes node management, Topic management and Consumer Group management, which is very convenient to modify the configuration directly in the console.

In terms of monitoring view, the monitored data include kafka generation and consumption QPS, cluster monitoring, and ZooKeeper monitoring. Can provide more perfect monitoring indicators.

! [monitoring metrics] (https://atts.yisu.com/attachments/image/20200817/1597643316579478.jpg "monitoring metrics")

! [monitoring metrics] (https://atts.yisu.com/attachments/image/20200817/1597643347799100.jpg "monitoring metrics")

! [monitoring metrics] (https://atts.yisu.com/attachments/image/20200817/1597643363690805.jpg "monitoring metrics")

Kafka configuration of canal

Canal with kafka is also very simple. Vi / usr/local/canal/conf/canal.properties

# optional: tcp (default), kafka, RocketMQcanal.serverMode = kafka#... # kafka/rocketmq cluster configuration: you can increase this value in flagMessage mode, but do not exceed the upper limit of MQ message body size canal.mq.batchSize = 16384canal.mq.maxRequestSize = 104857 in flatMessage mode. Please increase this value. It is recommended that the batch size of 50-200canal.mq.lingerMs = 1canal.mq.bufferMemory = 3355443 canal.mq.canalBatchSize Canal. The default is 50K. Due to the maximum message body limit of kafka, please do not exceed the timeout of 1m (below 900K) canal.mq.canalBatchSize = 5mm Canal get data (in milliseconds). Empty unlimited timeout canal.mq.canalGetTimeout = 10 whether the flat json format object canal.mq.flatMessage = falsecanal.mq.compressionType = nonecanal.mq.acks = all# kafka message delivery uses transaction canal.mq.transaction = false# mq configcanal.mq.topic=default# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\. * . * canal.mq.dynamicTopic=mydatabase.mytablecanal.mq.partition=0# hash partition configcanal.mq.partitionsNum=3canal.mq.partitionHash=mydatabase.mytable solves the problem of sequential consumption

See the following line of configuration

Canal.mq.partitionHash=mydatabase.mytable

We configured partitionHash for kafka, and our Topic is a table. The effect is that the data from a table is only pushed to a fixed partition, and then pushed to consumer for consumption processing and synchronized to the new database. In this way, the problem of sequential processing of binlog logs encountered before is solved. So even if we deploy multiple kafka consumer sides to form a cluster, so that consumer consumes messages from one partition, it consumes data that processes the same table. This sacrifices parallel processing for a table, but personally, with the powerful processing architecture of kafka, it is not easy for our business to create bottlenecks in the kafka node. And our business goal is not real-time consistency, under a certain delay, the two databases ensure the ultimate consistency.

(recommended micro-class: SQL micro-class)

The following figure shows the final synchronization architecture, and we have implemented clustering at each service node. All of them run on the UK8s service of UCloud, which ensures the high availability of the service node.

Canal is also a cluster switch, but at some point only one canal is dealing with binlog, and the rest are redundant services. When the canal service is down, one of the redundant services will switch to the working state. Similarly, only one binlog is working because it is necessary to ensure that the canal is read sequentially.

In addition, we also use this architecture to synchronize cache failures. The caching mode we use is: Cache-Aside. Similarly, if the cache invalidation operation is performed where the data in the code changes, the code will become complicated. Therefore, on the basis of the above architecture, the complex logic of triggering cache failure is put into the kafka-client side to deal with uniformly to achieve a certain purpose of decoupling.

This is the end of the article on "how to use canal+Kafka for database synchronization". Thank you for reading! I believe that everyone has a certain understanding of the knowledge of "how to use canal+Kafka for database synchronization operation". If you want to learn more knowledge, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report