In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article focuses on "MySQL specific table full, incremental data synchronization to the message queue how to achieve", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to synchronize the full and incremental data to the message queue in a specific MySQL table.
1. Original demand
It is necessary to synchronize not only the original full data, but also the specific table incremental data of the specific MySQL library in real time, and correspond to the corresponding modifications and deletions.
Data synchronization cannot be intrusive: the business program cannot be changed, and there cannot be too much performance pressure on the business side.
Application scenarios: data ETL synchronization to reduce the pressure on business servers.
2. Solution
3. Introduction and installation of canal
Canal is an open source project under Alibaba, pure Java development. Based on database incremental log parsing, it provides incremental data subscription-consumption. Currently, it mainly supports MySQL (also supports mariaDB).
Working principle: mysql active and standby replication implementation
From the upper level, replication is divided into three steps:
Master records changes to a binary log (binary log) (these records are called binary log events, binary log events, and can be viewed through show binlog events)
Slave copies the binary log events of master to its relay log (relay log)
Slave redoes the events in the relay log and changes the data that reflects itself.
How canal works
The principle is relatively simple:
Canal simulates the interaction protocol of mysql slave, disguises itself as mysql slave, and sends dump protocol to mysql master
Mysql master receives the dump request and starts pushing binary log to slave (that is, canal)
Canal parses binary log objects (originally byte streams)
Architecture
Description:
Server represents a canal running instance, corresponding to a jvm
Instance corresponds to a data queue (1 server corresponds to 1.. n instance)
Instance module:
EventParser (data source access, simulation of slave protocol and master interaction, protocol parsing)
EventSink (Parser and Store linkers for data filtering, processing, distribution)
EventStore (data storage)
MetaManager (incremental subscription & consumption Information Manager)
Installation
1. Mysql and kafka environment preparation
2. Canal download: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3. Decompress: tar-zxvf canal.deployer-1.1.3.tar.gz
4. Configure the file parameters in the directory conf
Configure canal.properties:
Go to conf/example and configure instance.properties:
5. Start: bin/startup.sh
6. Log view:
4. Verification
1. Develop corresponding kafka consumers
Package org.kafka;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;/** Title: KafkaConsumerTest * Description: * kafka Consumer demo * Version:1.0.0 * @ author pancm * @ date January 26, 2018 * / public class KafkaConsumerTest implements Runnable {private final KafkaConsumer consumer Private ConsumerRecords msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest (String topicName) {Properties props = new Properties (); props.put ("bootstrap.servers", "192.168.7.1932 GROUPID"); props.put ("group.id", GROUPID); props.put ("enable.auto.commit", "true") Props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "latest"); props.put ("key.deserializer", StringDeserializer.class.getName ()); props.put ("value.deserializer", StringDeserializer.class.getName ()); this.consumer = new KafkaConsumer (props) This.topic = topicName; this.consumer.subscribe (Arrays.asList (topic));} @ Override public void run () {int messageNo = 1; System.out.println ("- start consumption -"); try {for (;;) {msgList = consumer.poll (1000) If (null! = msgList & & msgList.count () > 0) {for (ConsumerRecord record: msgList) {/ / the printed data is not necessarily this regular System.out.println (messageNo + "= receive: key =" + record.key () + ") Value = "+ record.value () +" offset=== "+ record.offset ()) / / String v = decodeUnicode (record.value ()); / / System.out.println (v); / / quit if when consuming 1000 (messageNo% 1000 = = 0) {break } messageNo++;}} else {Thread.sleep (11);} catch (InterruptedException e) {e.printStackTrace ();} finally {consumer.close () } public static void main (String args []) {KafkaConsumerTest test1 = new KafkaConsumerTest ("sample-data"); Thread thread1 = new Thread (test1); thread1.start ();} / * * Chinese to unicode coding * / public static String gbEncoding (final String gbString) {char [] utfBytes = gbString.toCharArray (); String unicodeBytes = "" For (int I = 0; I < utfBytes.length; iTunes +) {String hexB = Integer.toHexString (utfBytes [I]); if (hexB.length ()-1) {end = dataStr.indexOf ("\ u", start + 2); String charStr = "" If (end = =-1) {charStr = dataStr.substring (start + 2, dataStr.length ());} else {charStr = dataStr.substring (start + 2, end);} char letter = (char) Integer.parseInt (charStr, 16); / / hexadecimal parse shaping string. Buffer.append (new Character (letter). ToString ()); start = end;} return buffer.toString ();}}
2. Add data to the table bak1
CREATE TABLE `bak1` (`vin`varchar (20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4show create table bak1;insert into bak1 select'Li Lei abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` from moci limit 10
3. View the output result:
At this point, I believe you have a deeper understanding of "MySQL specific table full, incremental data synchronization to the message queue". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.