Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use RocketMQ to synchronize MySQL to Redis in Canal1.1.4

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Today, I will talk to you about how to use RocketMQ to synchronize MySQL to Redis in Canal1.1.4. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.

Canal uses RocketMQ to synchronize MySQL

Canal combined with RocketMQ synchronous MySQL

Synchronize data to Redis2.1 and install Redis

Slightly

2.2 Redis configuration

Slightly

2.3 SpringBoot configuration 2.3.1 introduces dependency com.alibaba.otter canal.client 1.1.4 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 javax.persistence persistence-api2.3.2 generic code

SQLType.java

Import lombok.AccessLevel;import lombok.NoArgsConstructor;/** * Canal snooping SQL type * * @ author Yu * @ date 2019-09-08 00:18 * * / @ NoArgsConstructor (access = AccessLevel.PRIVATE) public class SQLType {/ * * insert * / public static final String INSERT = "INSERT"; / * * update * / public static final String UPDATE = "UPDATE"; / * * delete * / public static final String DELETE = "DELETE";}

User.java

Import lombok.Data;import javax.persistence.Id;import java.io.Serializable;/** * UserPo object * * @ author Yu * @ date 2019-09-08 14:13 * * / @ Datapublic class User implements Serializable {private static final long serialVersionUID =-6845801275112259322L; @ Id private Integer uid; private String username; private String password; private String sex;}

CanalSynService.java

Import com.alibaba.otter.canal.protocol.FlatMessage;import java.util.Collection;/** * Canal synchronization Service * * @ author Yu * @ date 00:00 on 2019-09-08 * * / public interface CanalSynService {/ * processing data * * @ param flatMessage CanalMQ data * / void process (FlatMessage flatMessage) / * DDL statement deals with * * @ param flatMessage CanalMQ data * / void ddl (FlatMessage flatMessage); / * inserts * * @ param list new data * / void insert (Collection list); / * * updates * @ param list updates data * / void update (Collection list) / * * Delete * @ param list Delete data * / void delete (Collection list);}

AbstractCanalMQ2RedisService.java

Import com.alibaba.otter.canal.protocol.FlatMessage;import com.google.common.collect.Sets;import com.taco.springcloud.canal.constant.SQLType;import com.taco.springcloud.core.component.ApplicationContextHolder;import com.taco.springcloud.core.exception.BizException;import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum;import com.taco.springcloud.core.utils.JsonUtil;import com.taco.springcloud.redis.utils.RedisUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.connection.RedisConnection Import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.util.ReflectionUtils;import javax.annotation.Resource;import javax.persistence.Id;import java.lang.reflect.Field;import java.lang.reflect.ParameterizedType;import java.util.*;/** * Abstract CanalMQ Universal processing Service * * @ author Yu * @ date 00:05 on 2019-09-08 * * / @ Slf4jpublic abstract class AbstractCanalMQ2RedisService implements CanalSynService {@ Resource private RedisTemplate redisTemplate @ Resource private RedisUtils redisUtils; private Class cache; / * get Model name * * @ return Model name * / protected abstract String getModelName (); @ Override public void process (FlatMessage flatMessage) {if (flatMessage.getIsDdl ()) {ddl (flatMessage); return;} Set data = getData (flatMessage) If (SQLType.INSERT.equals (flatMessage.getType ()) {insert (data);} if (SQLType.UPDATE.equals (flatMessage.getType () {update (data);} if (SQLType.DELETE.equals (flatMessage.getType () {delete (data) } @ Override public void ddl (FlatMessage flatMessage) {/ / TODO: DDL needs synchronization, deletion and emptying, update field processing} @ Override public void insert (Collection list) {insertOrUpdate (list);} @ Override public void update (Collection list) {insertOrUpdate (list) } private void insertOrUpdate (Collection list) {redisTemplate.executePipelined ((RedisConnection redisConnection)-> {for (T data: list) {String key = getWrapRedisKey (data); RedisSerializer keySerializer = redisTemplate.getKeySerializer (); RedisSerializer valueSerializer = redisTemplate.getValueSerializer (); redisConnection.set (keySerializer.serialize (key), valueSerializer.serialize (data)) } return null;});} @ Override public void delete (Collection list) {Set keys = Sets.newHashSetWithExpectedSize (list.size ()); for (T data: list) {keys.add (getWrapRedisKey (data));} / / Set keys = list.stream (). Map (this::getWrapRedisKey) .requests (Collectors.toSet ()) RedisUtils.delAll (keys) } / * encapsulates redis's key * * @ param t original object * @ return key * / protected String getWrapRedisKey (T) {return new StringBuilder () .append (ApplicationContextHolder.getApplicationName ()) .append (":") .append (getModelName) () .append (":") .append (getIdValue (t)) .toString () } / * get class generics * * @ return generic Class * / protected Class getTypeArguement () {if (cache = = null) {cache = (Class) ((ParameterizedType) this.getClass (). GetGenericSuperclass ()). GetActualTypeArguments () [0];} return cache } / * get the field value of Object marked with @ Id annotation * * @ param t object * @ return id value * / protected Object getIdValue (T) {Field fieldOfId = getIdField (); ReflectionUtils.makeAccessible (fieldOfId); return ReflectionUtils.getField (fieldOfId, t) } / * get the field name of Class marked with @ Id annotation * * @ return id field name * / protected Field getIdField () {Class clz = getTypeArguement (); Field [] fields = clz.getDeclaredFields (); for (Field field: fields) {Id annotation = field.getAnnotation (Id.class) If (annotation! = null) {return field;}} log.error (@ Id annotation is not set for PO class); throw new BizException (BaseApiCodeEnum.FAIL) } / * convert data into generic objects in FlatMessage of Canal * * @ param flatMessage Canal send MQ information * @ return generic object collection * / protected Set getData (FlatMessage flatMessage) {List sourceData = flatMessage.getData (); Set targetData = Sets.newHashSetWithExpectedSize (sourceData.size ()) For (Map map: sourceData) {T t = JsonUtil.mapConvertPojo (map, getTypeArguement ()); targetData.add (t);} return targetData;}}

TestUsersConsumer.java

Import com.alibaba.otter.canal.protocol.FlatMessage;import com.taco.springcloud.canal.model.User;import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;@Slf4j@Service@RocketMQMessageListener (topic = "test_users", consumerGroup = "users") public class TestUsersConsumer extends AbstractCanalMQ2RedisService implements RocketMQListener {@ Getter private String modelName = "user" @ Override public void onMessage (FlatMessage s) {process (s);}} after reading the above, do you have any further understanding of how to use RocketMQ in Canal1.1.4 to synchronize MySQL to Redis? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report