Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement full and incremental data synchronization to message queue in MySQL specific table

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article focuses on "MySQL specific table full, incremental data synchronization to the message queue how to achieve", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to synchronize the full and incremental data to the message queue in a specific MySQL table.

1. Original demand

It is necessary to synchronize not only the original full data, but also the specific table incremental data of the specific MySQL library in real time, and correspond to the corresponding modifications and deletions.

Data synchronization cannot be intrusive: the business program cannot be changed, and there cannot be too much performance pressure on the business side.

Application scenarios: data ETL synchronization to reduce the pressure on business servers.

2. Solution

3. Introduction and installation of canal

Canal is an open source project under Alibaba, pure Java development. Based on database incremental log parsing, it provides incremental data subscription-consumption. Currently, it mainly supports MySQL (also supports mariaDB).

Working principle: mysql active and standby replication implementation

From the upper level, replication is divided into three steps:

Master records changes to a binary log (binary log) (these records are called binary log events, binary log events, and can be viewed through show binlog events)

Slave copies the binary log events of master to its relay log (relay log)

Slave redoes the events in the relay log and changes the data that reflects itself.

How canal works

The principle is relatively simple:

Canal simulates the interaction protocol of mysql slave, disguises itself as mysql slave, and sends dump protocol to mysql master

Mysql master receives the dump request and starts pushing binary log to slave (that is, canal)

Canal parses binary log objects (originally byte streams)

Architecture

Description:

Server represents a canal running instance, corresponding to a jvm

Instance corresponds to a data queue (1 server corresponds to 1.. n instance)

Instance module:

EventParser (data source access, simulation of slave protocol and master interaction, protocol parsing)

EventSink (Parser and Store linkers for data filtering, processing, distribution)

EventStore (data storage)

MetaManager (incremental subscription & consumption Information Manager)

Installation

1. Mysql and kafka environment preparation

2. Canal download: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. Decompress: tar-zxvf canal.deployer-1.1.3.tar.gz

4. Configure the file parameters in the directory conf

Configure canal.properties:

Go to conf/example and configure instance.properties:

5. Start: bin/startup.sh

6. Log view:

4. Verification

1. Develop corresponding kafka consumers

Package org.kafka;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;/** Title: KafkaConsumerTest * Description: * kafka Consumer demo * Version:1.0.0 * @ author pancm * @ date January 26, 2018 * / public class KafkaConsumerTest implements Runnable {private final KafkaConsumer consumer Private ConsumerRecords msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest (String topicName) {Properties props = new Properties (); props.put ("bootstrap.servers", "192.168.7.1932 GROUPID"); props.put ("group.id", GROUPID); props.put ("enable.auto.commit", "true") Props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "latest"); props.put ("key.deserializer", StringDeserializer.class.getName ()); props.put ("value.deserializer", StringDeserializer.class.getName ()); this.consumer = new KafkaConsumer (props) This.topic = topicName; this.consumer.subscribe (Arrays.asList (topic));} @ Override public void run () {int messageNo = 1; System.out.println ("- start consumption -"); try {for (;;) {msgList = consumer.poll (1000) If (null! = msgList & & msgList.count () > 0) {for (ConsumerRecord record: msgList) {/ / the printed data is not necessarily this regular System.out.println (messageNo + "= receive: key =" + record.key () + ") Value = "+ record.value () +" offset=== "+ record.offset ()) / / String v = decodeUnicode (record.value ()); / / System.out.println (v); / / quit if when consuming 1000 (messageNo% 1000 = = 0) {break } messageNo++;}} else {Thread.sleep (11);} catch (InterruptedException e) {e.printStackTrace ();} finally {consumer.close () } public static void main (String args []) {KafkaConsumerTest test1 = new KafkaConsumerTest ("sample-data"); Thread thread1 = new Thread (test1); thread1.start ();} / * * Chinese to unicode coding * / public static String gbEncoding (final String gbString) {char [] utfBytes = gbString.toCharArray (); String unicodeBytes = "" For (int I = 0; I < utfBytes.length; iTunes +) {String hexB = Integer.toHexString (utfBytes [I]); if (hexB.length ()-1) {end = dataStr.indexOf ("\ u", start + 2); String charStr = "" If (end = =-1) {charStr = dataStr.substring (start + 2, dataStr.length ());} else {charStr = dataStr.substring (start + 2, end);} char letter = (char) Integer.parseInt (charStr, 16); / / hexadecimal parse shaping string. Buffer.append (new Character (letter). ToString ()); start = end;} return buffer.toString ();}}

2. Add data to the table bak1

CREATE TABLE `bak1` (`vin`varchar (20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4show create table bak1;insert into bak1 select'Li Lei abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` from moci limit 10

3. View the output result:

At this point, I believe you have a deeper understanding of "MySQL specific table full, incremental data synchronization to the message queue". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report