In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the relevant knowledge of "how to achieve data synchronization between MySQL and Redis". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Mind map
Preface
In many business cases, we will add redis cache to the system for query optimization.
If the database data is updated, you need to write a piece of code in the business code that updates the redis synchronously.
This kind of data synchronization code and business code will not be very elegant, can these data synchronization code be extracted to form a separate module, the answer is yes.
Architecture diagram
Canal is a binlog disguised as slave subscription mysql to achieve data synchronization. The previous article "getting started with canal"
I have introduced the easiest way to use it, which is the tcp pattern.
In fact, canal supports sending directly to MQ, and the latest version supports three mainstream MQ:Kafka, RocketMQ, and RabbitMQ. The RabbitMQ mode of canal currently has a certain bug, so it generally uses Kafka or RocketMQ.
This paper uses Kafka to realize the data synchronization between Redis and MySQL. The architecture diagram is as follows:
Through the architecture diagram, we have a clear idea of the components to be used: MySQL, Canal, Kafka, ZooKeeper, Redis.
The following is a demonstration of the construction of Kafka, and everyone should know how to build MySQL. There are also many references for ZooKeeper and Redis.
Set up Kafka
First, download the installation package on the official website:
Extract, open the / config/server.properties configuration file, and modify the log directory:
Log.dirs=./logs
To start ZooKeeper first, I used version 3.6.1:
Then start Kafka, open cmd in the bin directory of Kafka, and enter the command:
Kafka-server-start.bat.. /.. / config/server.properties
We can see that Kafka-related configuration information is registered on ZooKeeper:
Then you need to create a queue to receive data from canal, using the command:
Kafka-topics.bat-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic canaltopic
The queue name created is canaltopic.
Configure Cannal Server
Download the relevant installation packages on the canal official website:
Locate the canal.properties configuration file in the canal.deployer-1.1.4/conf directory:
# tcp, kafka, RocketMQ choose kafka mode here
Canal.serverMode = kafka
# the number of threads in the parser. If you open this configuration, if you do not open it, it will block or not parse.
Canal.instance.parser.parallelThreadSize = 16
# configure the service address of MQ. Here, the address and port corresponding to kafka are configured.
Canal.mq.servers = 127.0.0.1 purl 9092
# configure instance. You need to have a directory with the same name as example under the conf directory. You can configure multiple directories.
Canal.destinations = example
Then configure instance and locate the / conf/example/instance.properties configuration file:
# # mysql serverId, v1.0.26 + will autoGen (automatically generated, no configuration required)
# canal.instance.mysql.slaveId=0
# position info
Canal.instance.master.address=127.0.0.1:3306
# execute SHOW MASTER STATUS; in Mysql to view the binlog of the current database
Canal.instance.master.journal.name=mysql-bin.000006
Canal.instance.master.position=4596
# account password
Canal.instance.dbUsername=canal
Canal.instance.dbPassword=Canal@****
Canal.instance.connectionCharset = UTF-8
# MQ queue name
Canal.mq.topic=canaltopic
# Partition subscript for single queue mode
Canal.mq.partition=0
After the configuration is complete, you can start canal.
test
At this point, you can open the consumer window of kafka and test whether kafka received the message.
Use the command to monitor consumption:
Kafka-console-consumer.bat-- bootstrap-server 127.0.0.1 from-beginning-- topic canaltopic
There's a little hole. What I use here is the cmd command line of the win10 system. The default code of the win10 system is GBK, while Canal Server is the code of UTF-8, so garbled codes will appear on the console:
How to solve it?
Switch to UTF-8 encoding before cmd command line execution, using command line: chcp 65001
Then execute the command to open the consumer side of kafka to avoid garbled code:
The next step is to start Redis and synchronize the data to Redis.
Encapsulate Redis client
After setting up the environment, we can write code.
First introduce maven dependencies for Kafka and Redis:
Org.springframework.kafka
Spring-kafka
Org.springframework.boot
Spring-boot-starter-data-redis
Add the following configuration to the application.yml file:
Spring:
Redis:
Host: 127.0.0.1
Port: 6379
Database: 0
Password: 123456
Encapsulates a utility class that operates Redis:
@ Component
Public class RedisClient {
/ * *
* obtain redis template
, /
@ Resource
Private StringRedisTemplate stringRedisTemplate
/ * *
* set the key-value of redis
, /
Public void setString (String key, String value) {
SetString (key, value, null)
}
/ * *
* set the key-value of redis with expiration time
, /
Public void setString (String key, String value, Long timeOut) {
StringRedisTemplate.opsForValue () .set (key value)
If (timeOut! = null) {
StringRedisTemplate.expire (key, timeOut, TimeUnit.SECONDS)
}
}
/ * *
* obtain the corresponding value of key in redis
, /
Public String getString (String key) {
Return stringRedisTemplate.opsForValue () get (key)
}
/ * *
* Delete the corresponding value of key in redis
, /
Public Boolean deleteKey (String key) {
Return stringRedisTemplate.delete (key)
}
}
Create MQ consumers for synchronization
Add the configuration information of kafka to the application.yml configuration file:
Spring:
Kafka:
# Kafka service address
Bootstrap-servers: 127.0.0.1:9092
Consumer:
# specify a default group name
Group-id: consumer-group1
# serialization deserialization
Key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Producer:
Key-serializer: org.apache.kafka.common.serialization.StringDeserializer
Value-serializer: org.apache.kafka.common.serialization.StringDeserializer
# batch crawl
Batch-size: 65536
# Cache capacity
Buffer-memory: 524288
According to the above Kafka consumption command, we know the structure of the json data, and we can create a CanalBean object to receive:
Public class CanalBean {
/ / data
Private List data
/ / Database name
Private String database
Private long es
/ / increment, starting from 1
Private int id
/ / whether it is a DDL statement
Private boolean isDdl
/ / the field type of the table structure
Private MysqlType mysqlType
/ / UPDATE statement, old data
Private String old
/ / Primary key name
Private List pkNames
/ / sql statement
Private String sql
Private SqlType sqlType
/ / Table name
Private String table
Private long ts
/ / (add) INSERT, (update) UPDATE, (delete) DELETE, (delete table) ERASE, etc.
Private String type
/ / getter, setter method
}
Public class MysqlType {
Private String id
Private String commodity_name
Private String commodity_price
Private String number
Private String description
/ / getter, setter method
}
Public class SqlType {
Private int id
Private int commodity_name
Private int commodity_price
Private int number
Private int description
}
Finally, you can create a consumer CanalConsumer to consume:
@ Component
Public class CanalConsumer {
/ / logging
Private static Logger log = LoggerFactory.getLogger (CanalConsumer.class)
/ / redis operation tool class
@ Resource
Private RedisClient redisClient
/ / the name of the listening queue is: canaltopic
@ KafkaListener (topics = "canaltopic")
Public void receive (ConsumerRecord consumer) {
String value = (String) consumer.value ()
Log.info ("topic name: {}, key: {}, partition location: {}, subscript: {}, value: {}", consumer.topic (), consumer.key (), consumer.partition (), consumer.offset (), value)
/ / convert to javaBean
CanalBean canalBean = JSONObject.parseObject (value, CanalBean.class)
/ / get whether it is a DDL statement
Boolean isDdl = canalBean.getIsDdl ()
/ / get type
String type = canalBean.getType ()
/ / not a DDL statement
If (! isDdl) {
List tbCommodityInfos = canalBean.getData ()
/ / expiration time
Long TIME_OUT = 600L
If ("INSERT" .equals (type)) {
/ / add a statement
For (TbCommodityInfo tbCommodityInfo: tbCommodityInfos) {
String id = tbCommodityInfo.getId ()
/ / added to redis with an expiration time of 10 minutes
RedisClient.setString (id, JSONObject.toJSONString (tbCommodityInfo), TIME_OUT)
}
} else if ("UPDATE" .equals (type)) {
/ / Update statement
For (TbCommodityInfo tbCommodityInfo: tbCommodityInfos) {
String id = tbCommodityInfo.getId ()
/ / Update to redis. The expiration time is 10 minutes.
RedisClient.setString (id, JSONObject.toJSONString (tbCommodityInfo), TIME_OUT)
}
} else {
/ / Delete statement
For (TbCommodityInfo tbCommodityInfo: tbCommodityInfos) {
String id = tbCommodityInfo.getId ()
/ / remove from redis
RedisClient.deleteKey (id)
}
}
}
}
}
Test MySQL synchronization with Redis
The table structure corresponding to mysql is as follows:
CREATE TABLE `tb_commodity_ info` (
`id`varchar (32) NOT NULL
`commodity_ name`varchar (512) DEFAULT NULL COMMENT 'trade name'
`commodity_ price` varchar (36) DEFAULT'0' COMMENT 'commodity price'
`number`int (10) DEFAULT'0' COMMENT 'quantity of goods'
`promotion`varchar (2048) DEFAULT''COMMENT' Product description'
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' Commodity Information Table'
First create the table in MySQL. Then start the project and add a new piece of data:
INSERT INTO `canaldb`.`tb _ commodity_ info` (`id`, `canaldname`, `commodity_ price`, `number`, `substitution`) VALUES ('3e71a81fd80711eaaed600163e046cc3', 'Steamed BBQ Pork Bun', '3.99', '3Qing,' Steamed BBQ Pork Bun is big and fragrant, old people and children like it')
The new data is found in the tb_commodity_info table:
Redis also found the corresponding data, which proved that the synchronization was successful!
What if it's updated? Try the Update statement:
UPDATE `canaldb`.`tb _ commodity_ info` SET `canaldb`` = 'green vegetable bag', `vegetion` = 'very cheap green vegetable bag', either buy or open it. Feed 'WHERE `id` =' 3e71a81fd80711eaaed600163e046cc3'
No problem!
Summary
So you would say, doesn't canal have any shortcomings?
There must be:
Canal can only synchronize incremental data. Not real-time synchronization, but quasi-real-time synchronization. There are some bug, but the community activity is high, and the proposed bug can be repaired in time. The problem of MQ sequencing. Here I list the answers on the official website for your reference. "how to achieve MySQL and Redis data synchronization" content is introduced here, thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.