Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize the real-time incremental data transmission function of MySQL based on Docker and Canal

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces how to realize the real-time incremental data transmission function of MySQL based on Docker combined with Canal. It is very detailed and has a certain reference value. Interested friends must read it!

Introduction of Canal

The Historical Origin of Canal

In the early days, Alibaba Company deployed database instances in computer rooms in Hangzhou and the United States, but because of the business need to synchronize data across computer rooms, it gave birth to Canal, which is mainly based on trigger (trigger) to obtain incremental changes. Starting from 2010, Alibaba Company began to gradually try database log parsing to obtain the data of incremental changes for synchronization, which derived incremental subscription and consumption business.

The current data source-side MySQL versions supported by Canal include: 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x.

Application scenarios of Canal

At present, businesses based on log incremental subscription and consumption mainly include:

Provide incremental data subscription and consumption based on database incremental log parsing

Database mirroring database real-time backup

Index construction and real-time maintenance (split heterogeneous indexes, inverted indexes, etc.)

Business Cache refresh

Incremental data processing with Business Logic

How Canal works

Before introducing the principle of Canal, let's take a look at the principle of MySQL master-slave replication.

Principle of MySQL master-slave replication

MySQL Master writes the operation of data changes to the binary log binary log, which is called binary log event binary log events, which can be viewed through the show binlog events command

MySQL Slave copies the binary log events from Master's binary log to its relay log relay log

MySQL Slave rereads and executes events in relay log to map data changes to its own database tables

After understanding how MySQL works, we can roughly guess that Canal should use similar logic to implement incremental data subscription, so let's take a look at how Canal actually works.

How Canal works

Canal simulates the interaction protocol of MySQL Slave, disguises itself as MySQL Slave, and sends dump protocol to MySQL Master

MySQL Master receives the dump request and starts pushing binary log to Slave (that is, Canal)

Canal parses binary log objects (data is a byte stream)

Based on this principle and method, we can complete the acquisition and analysis of database incremental logs, provide incremental data subscription and consumption, and realize the function of MySQL real-time incremental data transmission.

Since Canal is such a framework and written in pure Java language, let's start to learn how to use it and apply it to our actual work.

Docker Environment preparation for Canal

Because of the current popularity of containerization technology, this paper uses Docker to quickly build the development environment, while the traditional environment, after we have learned how to build the Docker container environment, can also build successfully according to the gourd. Since this article focuses on Canal, it will not cover much about Docker, but will mainly introduce the basic concepts of Docker and the use of commands. If you want to communicate with more container technical experts, you can add me Wechat liyingjiese and remark "add group". The group has weekly best practices from major companies around the world and the latest developments in the industry.

What is Docker?

It is believed that most people have used virtual machine VMware. When using VMware to build an environment, you only need to provide a normal system image and install it successfully. The rest of the software environment and application configuration are also operated in the virtual machine as we do on the local machine. Moreover, VMware takes up more resources of the host, which is easy to cause host stutter, and the system image itself also takes up too much space.

In order to make it easier for you to quickly understand Docker, compare it with VMware to introduce it. Docker provides a platform to start, package, and run APP, isolating APP (applications) from the underlying infrastructure (infrastructure). The two main concepts in Docker are mirroring (system images similar to VMware) and containers (similar to systems installed in VMware).

What is Image (Mirror)

Collection of files and meta data (root filesystem)

Hierarchical, and each layer can add, change and delete files to become a new image

Different image can share the same layer

Image itself belongs to read-only.

What is Container (container)

Create through Image (copy)

Create a container layer (readable and writable) on top of Image layer

Analogy object-oriented: classes and instances

Image is responsible for the storage and distribution of APP, and Container is responsible for running APP

Network introduction of Docker

There are three types of networks for Docker:

Bridge: bridging the network. The Docker containers launched by default use the bridge network created during Bridge,Docker installation. Each time the Docker container is restarted, the corresponding IP address will be obtained sequentially. This will cause the IP address of Docker to change under restart.

None: no specified network. Using the-- network=none,Docker container will not allocate the local area network IP.

Host: host network. Using-- network=host, at this point, the network of the Docker container is attached to the host, and the two are interconnected. For example, if you run a Web service in a container and listen on port 8080, the host's port 8080 is automatically mapped to the container.

Create a custom network: (set fixed IP)

Docker network create-subnet=172.18.0.0/16 mynetwork

View the existing network type docker network ls:

Build Canal environment

Attach the download and installation address of Docker = > Docker Download.

Download the Canal image docker pull canal/canal-server:

Download the MySQL image docker pull mysql. The downloaded image is shown below:

Check the downloaded image docker images:

Next, the MySQL container and canal-server container are generated by image:

# # generate mysql container docker run-d-- name mysql-- net mynetwork-- ip 172.18.0.6-p 3306 name mysql 3306-e MYSQL_ROOT_PASSWORD=root mysql## generate canal-server container docker run-d-- name canal-server-- net mynetwork-- ip 172.18.0.4-p 11111R 11111 canal/canal-server## command introduction-- net mynetwork # use custom network-- ip # to specify the allocation of ip

View the container docker ps running in Docker:

Configuration modification of MySQL

The above is just a preliminary preparation of the basic environment, but how do you get Canal disguised as Salve and get the binary log in MySQL correctly?

For self-built MySQL, you need to enable Binlog write feature first, configure binlog-format to ROW mode, enable bin_log by modifying MySQL configuration file, and use find /-name my.cnf to find my.cnf. Modify the file as follows:

[mysqld] log-bin=mysql-bin # enable binlogbinlog-format=ROW # Select ROW mode server_id=1 # configuration MySQL replaction needs to be defined, do not repeat with the slaveId of Canal

Enter the MySQL container docker exec-it mysql bash.

Create an account Canal linked to MySQL and grant the permission as MySQL slave. If you already have an account, you can directly GRANT:

Mysql-uroot-proot# create account CREATE USER canal IDENTIFIED BY 'canal'; # Grant permissions GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *. * TO' canal'@'%';-- GRANT ALL PRIVILEGES ON *. * TO 'canal'@'%'; # Refresh and apply FLUSH PRIVILEGES

After the database restart, simply test whether the my.cnf configuration takes effect:

Show variables like 'log_bin';show variables like' log_bin';show master status

Configuration modification of canal-server

Enter the canal-server container docker exec-it canal-server bash.

Edit the configuration vi canal-server/conf/example/instance.properties for canal-server:

For more configuration, please see = > Canal configuration instructions.

Restart the canal-server container docker restart canal-server and enter the container to view the startup log:

Docker exec-it canal-server bashtail-100f canal-server/logs/example/example.log

At this point, our environmental work is ready to be completed!

Pull data and save it to ElasticSearch synchronously

The ElasticSearch of this article is also based on the Docker environment, so the reader can execute the following command:

# download create a container for the mirror docker pull elasticsearch:7.1.1docker pull mobz/elasticsearch-head:5-alpine# and run docker run-d-- name elasticsearch--net mynetwork-- ip 172.18.0.2-p 9200V 9200-p 9300V 9300-e "discovery.type=single-node" elasticsearch:7.1.1docker run-d-name elasticsearch-head-- net mynetwork-- ip 172.18.0.5-p 9100V 9100 mobz/elasticsearch-head:5-alpine

The environment is ready, and now it's time to start the actual part of our coding, how to get the binlog data parsed by Canal through the application. First of all, we build a canal demo application based on Spring Boot. The structure is shown in the following figure:

Student.java

Package com.example.canal.study.pojo;import lombok.Data;import java.io.Serializable;// @ Data user produces getter, setter method @ Datapublic class Student implements Serializable {private String id;private String name;private int age;private String sex;private String city;}

CanalConfig.java

Package com.example.canal.study.common;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.net.InetSocketAddress / * * @ author */@Configurationpublic class CanalConfig {/ / @ Value get the midrange content of application.properties configuration @ Value ("${canal.server.ip}") private String canalIp;@Value ("${canal.server.port}") private Integer canalPort;@Value ("${canal.destination}") private String destination;@Value ("${elasticSearch.server.ip}") private String elasticSearchIp;@Value ("${elasticSearch.server.port}") private Integer elasticSearchPort @ Value ("${zookeeper.server.ip}") private String zkServerIp;// get a simple canal-server connection @ Beanpublic CanalConnector canalSimpleConnector () {CanalConnector canalConnector = CanalConnectors.newSingleConnector (new InetSocketAddress (canalIp, canalPort), destination, "", "); return canalConnector;} / / obtain canal-server connection @ Beanpublic CanalConnector canalHaConnector () {CanalConnector canalConnector = CanalConnectors.newClusterConnector (zkServerIp, destination,","); return canalConnector } / / elasticsearch 7.x client @ Beanpublic RestHighLevelClient restHighLevelClient () {RestHighLevelClient client = new RestHighLevelClient (RestClient.builder (new HttpHost (elasticSearchIp, elasticSearchPort)); return client;}}

CanalDataParser.java

Because there is a lot of code in this class, the more important parts are extracted in this article, and the rest of the code can be obtained from GitHub:

Public static class TwoTuple {public final An eventType; public final B columnMap; public TwoTuple (An a, B b) {eventType = a; columnMap = b;}} public static List printEntry (List entrys) {List rows = new ArrayList (); for (Entry entry: entrys) {/ / binlog event event long executeTime = entry.getHeader (). GetExecuteTime (); / / time when the current application acquires the binlog lock delay long delayTime = System.currentTimeMillis ()-executeTime Date date = new Date (entry.getHeader (). GetExecuteTime ()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); / / the entry type of the current entry (binary log event) belongs to the transaction if (entry.getEntryType () = = EntryType.TRANSACTIONBEGIN | | entry.getEntryType () = = EntryType.TRANSACTIONEND) {if (entry.getEntryType () = EntryType.TRANSACTIONBEGIN) {TransactionBegin begin = null; try {begin = TransactionBegin.parseFrom (entry.getStoreValue () } catch (InvalidProtocolBufferException e) {throw new RuntimeException ("parse event has an error, data:" + entry.toString (), e) } / / print transaction header information, thread of execution id, transaction time logger.info (transaction_format, new Object [] {entry.getHeader () .getLogfileName (), String.valueOf (entry.getHeader () .getLogfileOffset ()), String.valueOf (entry.getHeader () .getExecuteTime ()), simpleDateFormat.format (date), entry.getHeader () .getGtid (), String.valueOf (delayTime)}) Logger.info ("BEGIN-> Thread id: {}", begin.getThreadId ()); printXAInfo (begin.getPropsList ());} else if (entry.getEntryType () = = EntryType.TRANSACTIONEND) {TransactionEnd end = null; try {end = TransactionEnd.parseFrom (entry.getStoreValue ());} catch (InvalidProtocolBufferException e) {throw new RuntimeException ("parse event has an error, data:" + entry.toString (), e) } / / print transaction commit information, transaction id logger.info ("-\ n"); logger.info ("END-> transaction id: {}", end.getTransactionId ()); printXAInfo (end.getPropsList ()) Logger.info (transaction_format, new Object [] {entry.getHeader () .getLogfileName (), String.valueOf (entry.getHeader () .getLogfileOffset ()), String.valueOf (entry.getHeader () .getExecuteTime ()), simpleDateFormat.format (date), entry.getHeader () .getGtid (), String.valueOf (delayTime)});} continue } / / the entry type of the current entry (binary log event) belongs to the original data if (entry.getEntryType () = = EntryType.ROWDATA) {RowChange rowChage = null; try {/ / get the stored content rowChage = RowChange.parseFrom (entry.getStoreValue ());} catch (Exception e) {throw new RuntimeException ("parse event has an error, data:" + entry.toString (), e) } / / get the event type of the current content EventType eventType = rowChage.getEventType () Logger.info (row_format, new Object [] {entry.getHeader (). GetLogfileName (), String.valueOf (entry.getHeader (). GetLogfileOffset ()), entry.getHeader (). GetSchemaName (), entry.getHeader (). GetTableName (), eventType, String.valueOf (entry.getHeader (). GetExecuteTime ()), simpleDateFormat.format (date), entry.getHeader (). GetGtid (), String.valueOf (delayTime)}) / / if the event type is query or data definition language DDL, print the sql statement directly, jump out and continue the next loop if (eventType = = EventType.QUERY | | rowChage.getIsDdl ()) {logger.info ("sql-- >" + rowChage.getSql () + SEP); continue;} printXAInfo (rowChage.getPropsList ()); / / cycle the specific data of the current content item for (RowData rowData: rowChage.getRowDatasList ()) {List columns / / the event type is that delete returns the column content before deletion, otherwise it returns the content of the changed column if (eventType = = CanalEntry.EventType.DELETE) {columns = rowData.getBeforeColumnsList ();} else {columns = rowData.getAfterColumnsList ();} HashMap map = new HashMap (16); / / loops the name and value of the column into the map for (Column column: columns) {map.put (column.getName (), column.getValue ()) } rows.add (new TwoTuple (eventType, map));} return rows;}

ElasticUtils.java

Package com.example.canal.study.common;import com.alibaba.fastjson.JSON;import com.example.canal.study.pojo.Student;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.elasticsearch.action.DocWriteRequest;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.get.GetRequest;import org.elasticsearch.action.get.GetResponse Import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.update.UpdateRequest;import org.elasticsearch.action.update.UpdateResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.common.xcontent.XContentType;import java.io.IOException;import java.util.Map;/*** @ author */@Slf4j@Componentpublic class ElasticUtils {@ Autowiredprivate RestHighLevelClient restHighLevelClient / * * New * @ param student * @ param index index * / public void saveEs (Student student, String index) {IndexRequest indexRequest = new IndexRequest (index) .id (student.getId ()) .source (JSON.toJSONString (student), XContentType.JSON) .opType (DocWriteRequest.OpType.CREATE); try {IndexResponse response = restHighLevelClient.index (indexRequest, RequestOptions.DEFAULT); log.info ("Save data to ElasticSearch successfully: {}", response.getId ()) } catch (IOException e) {log.error ("Save data to elasticSearch: {}", e);}} / * * View * @ param index index * @ param id _ id * @ throws IOException * / public void getEs (String index, String id) throws IOException {GetRequest getRequest = new GetRequest (index, id); GetResponse response = restHighLevelClient.get (getRequest, RequestOptions.DEFAULT); Map fields = response.getSource () For (Map.Entry entry: fields.entrySet ()) {System.out.println (entry.getKey () + ":" + entry.getValue ());}} / * * update * @ param student * @ param index index * @ throws IOException * / public void updateEs (Student student, String index) throws IOException {UpdateRequest updateRequest = new UpdateRequest (index, student.getId ()); updateRequest.upsert (JSON.toJSONString (student), XContentType.JSON); UpdateResponse response = restHighLevelClient.update (updateRequest, RequestOptions.DEFAULT) Log.info ("data update to ElasticSearch successful: {}", response.getId ());} / * delete data according to id * @ param index index * @ param id _ id * @ throws IOException * / public void DeleteEs (String index, String id) throws IOException {DeleteRequest deleteRequest = new DeleteRequest (index, id); DeleteResponse response = restHighLevelClient.delete (deleteRequest, RequestOptions.DEFAULT); log.info ("data deleted to ElasticSearch successful: {}", response.getId ());}}

BinLogElasticSearch.java

Package com.example.canal.study.action;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.example.canal.study.common.CanalDataParser;import com.example.canal.study.common.ElasticUtils;import com.example.canal.study.pojo.Student;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier Import org.springframework.stereotype.Component;import java.io.IOException;import java.util.List;import java.util.Map;/*** @ author */@Slf4j@Componentpublic class BinLogElasticSearch {@ Autowiredprivate CanalConnector canalSimpleConnector;@Autowiredprivate ElasticUtils elasticUtils;//@Qualifier ("canalHaConnector") uses bean@Autowired@Qualifier ("canalHaConnector") private CanalConnector canalHaConnector;public void binLogToElasticSearch () throws IOException {openCanalConnector (canalHaConnector) named canalHaConnector; / / polling to pull data Integer batchSize = 5 * 1024 While (true) {Message message = canalHaConnector.getWithoutAck (batchSize); / / Message message = canalSimpleConnector.getWithoutAck (batchSize); long id = message.getId (); int size = message.getEntries (). Size (); log.info ("number of binLog messages currently monitored {}", size); if (id = =-1 | size = = 0) {try {/ / wait 2 seconds Thread.sleep (2000);} catch (InterruptedException e) {e.printStackTrace () }} else {/ / 1. Parse the message object List entries = message.getEntries (); List rows = CanalDataParser.printEntry (entries); for (CanalDataParser.TwoTuple tuple: rows) {if (tuple.eventType = = CanalEntry.EventType.INSERT) {Student student = createStudent (tuple); / / 2. Synchronize the parsed objects to elasticUtils.saveEs (student, "student_index") in elasticSearch; / / 3. Message confirmation has been processed / / canalSimpleConnector.ack (id); canalHaConnector.ack (id);} if (tuple.eventType = = CanalEntry.EventType.UPDATE) {Student student = createStudent (tuple); elasticUtils.updateEs (student, "student_index"); / / 3. Message confirmation processed / / canalSimpleConnector.ack (id); canalHaConnector.ack (id);} if (tuple.eventType = = CanalEntry.EventType.DELETE) {elasticUtils.DeleteEs ("student_index", tuple.columnMap.get ("id"). ToString ()); canalHaConnector.ack (id) Encapsulate data to Student * @ param tuple * @ return * / private Student createStudent (CanalDataParser.TwoTuple tuple) {Student student = new Student (); student.setId (tuple.columnMap.get ("id"). ToString ()); student.setAge (tuple.columnMap.get ("age"). ToString ()); student.setName (tuple.columnMap.get ("name"). ToString ()) Student.setSex (tuple.columnMap.get ("sex"). ToString (); student.setCity (tuple.columnMap.get ("city"). ToString (); return student;} / * * Open canal connection * * @ param canalConnector * / private void openCanalConnector (CanalConnector canalConnector) {/ / Connect CanalServer canalConnector.connect (); / / subscribe to destination canalConnector.subscribe () } / * close canal connection * * @ param canalConnector * / private void closeCanalConnector (CanalConnector canalConnector) {/ / close connection CanalServer canalConnector.disconnect (); / / sign out of subscription destination canalConnector.unsubscribe ();}}

CanalDemoApplication.java (Spring Boot startup class)

Package com.example.canal.study;import com.example.canal.study.action.BinLogElasticSearch;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @ author */@SpringBootApplicationpublic class CanalDemoApplication implements ApplicationRunner {@ Autowiredprivate BinLogElasticSearch binLogElasticSearch;public static void main (String [] args) {SpringApplication.run (CanalDemoApplication.class, args) } / / execute the run method @ Overridepublic void run (ApplicationArguments args) throws Exception {binLogElasticSearch.binLogToElasticSearch ();}} when the program starts

Application.properties

Server.port=8081spring.application.name = canal-democanal.server.ip = 192.168.124.5canal.server.port = 11111canal.destination = examplezookeeper.server.ip = 192.168.124.5:2181zookeeper.sasl.client = falseelasticSearch.server.ip = 192.168.124.5elasticSearch.server.port = 9200

High availability Construction of Canal Cluster

Through the above study, we know the Canala application of stand-alone directly connected mode. In today's Internet era, the single-instance model is gradually replaced by the cluster highly available mode, so how to build the multi-instance cluster mode of Canala!

Obtaining Canal instance based on ZooKeeper

Prepare the Docker image and container of ZooKeeper:

Docker pull zookeeperdocker run-d-- name zookeeper-- net mynetwork-- ip 172.18.0.3-p 2181 zookeeperdocker run-d-- name canal-server2-- net mynetwork-- ip 172.18.0.8-p 11113 net mynetwork 11113

1. Machine preparation:

Container IP running Canal: 172.18.0.4, 172.18.0.8

ZooKeeper Container IP:172.18.0.3:2181

MySQL Container IP:172.18.0.6:3306

2. Complete the configuration on a single machine according to the deployment and configuration, and the instance name is example in the demonstration.

3. Modify canal.properties, add ZooKeeper configuration and modify Canal port:

Canal.port=11113canal.zkServers=172.18.0.3:2181canal.instance.global.spring.xml = classpath:spring/default-instance.xml

4. Create an example directory and modify instance.properties:

The canal slaveId before canal.instance.mysql.slaveId = 1235 # is 1234. Make sure that the slaveId is not repeated and canal.instance.master.address = 172.18.0.6 slaveId 3306

Note: the names of the instance directories on both machines need to be exactly the same, the HA mode relies on instance name for management, and both must choose the default-instance.xml configuration.

Start the Canal of two different containers. After startup, you can view the startup log through tail-100f logs/example/example.log, and you will only see the log of successful startup on one machine.

For example, the successful startup here is 172.18.0.4:

If you look at the node information in ZooKeeper, you can also see that the current working node is 172.18.0.4 virtual 11111:

[zk: localhost:2181 (CONNECTED) 15] get / otter/canal/destinations/example/running {"active": true, "address": "172.18.0.4 get 11111", "cid": 1}

Client links, consumption data

You can automatically get the working node of the current service from the running node in ZooKeeper by specifying the ZooKeeper address and the instance name,canal client of Canal, and then establish a link with it:

[zk: localhost:2181 (CONNECTED) 0] get / otter/canal/destinations/example/running {"active": true, "address": "172.18.0.4 get 11111", "cid": 1}

The corresponding client encoding can be in the following form. The canalHaConnector in CanalConfig.java above is a HA connection:

CanalConnector connector = CanalConnectors.newClusterConnector ("172.18.0.3 virtual 2181", "example", "", "")

After the link is successful, canal server will record the canal client information that is currently working, such as client IP, port information of the link, etc. (smart you should also find that canal client can also support HA):

[zk: localhost:2181 (CONNECTED) 4] get / otter/canal/destinations/example/1001/running {"active": true, "address": "192.168.124.5 get 59887", "clientId": 1001}

After successful data consumption, canal server will record the binlog site of the last successful consumption in ZooKeeper (the next time you restart client, you will continue to consume from this last point):

[zk: localhost:2181 (CONNECTED) 5] get / otter/canal/destinations/example/1001/cursor {"@ type": "com.alibaba.otter.canal.protocol.position.LogPosition", "identity": {"slaveId":-1, "sourceAddress": {"address": "mysql.mynetwork", "port": 3306}}, "postion": {"included": false, "journalName": "binlog.000004", "position": 2169, "timestamp": 1562672817000}}

Stop the working canal server of 172.18.0.4:

Docker exec-it canal-server bashcd canal-server/binsh stop.sh

At this point, 172.18.0.8 will immediately launch example instance to provide new data services:

[zk: localhost:2181 (CONNECTED) 19] get / otter/canal/destinations/example/running {"active": true, "address": "172.18.0.8VR 11111", "cid": 1}

At the same time, with the switching of canal server, the client will obtain the latest address in ZooKeeper, establish a link with the new canal server, and continue to consume data, and the whole process will be completed automatically.

Exception and summary

Elasticsearch-head cannot access Elasticsearch

Es and es-head are two independent processes. When es-head accesses es services, there will be a cross-domain problem. So we need to modify the configuration file of es and add some configuration items to solve this problem, as follows:

[root@localhost / usr/local/elasticsearch-head-master] # cd.. / elasticsearch-5.5.2/config/ [root@localhost / usr/local/elasticsearch-5.5.2/config] # vim elasticsearch.yml # file end with the following configuration http.cors.enabled: truehttp.cors.allow-origin: "*"

Restart the es service after modifying the configuration file.

Elasticsearch-head query report 406 Not Acceptable

Solution:

1. Enter the head installation directory

2 、 cd _ site/

3. There are two places for editing vendor.js.

# 6886 Line contentType: "application/x-www-form-urlencoded changed to contentType:" application/json;charset=UTF-8 "# 7574 Line var inspectData = s.contentType = =" application/x-www-form-urlencoded "& & changed to var inspectData = s.contentType =" application/json;charset=UTF-8 "& &

Use elasticsearch-rest-high-level-client to report to org.elasticsearch.action.index.IndexRequest.ifSeqNo

# in addition to adding dependent org.elasticsearch.clientelasticsearch-rest-high-level-client7.1.1#, you also need to join org.elasticsearchelasticsearch7.1.1 in pom

Related reference: git hub issues.

Why can't ElasticSearch use type in version 7.x?

Reference: why did ElasticSearch remove type in version 7.x?

Use spring-data-elasticsearch.jar to report to org.elasticsearch.client.transport.NoNodeAvailableException

Since this article uses a version of elasticsearch7.x or above, the underlying layer of spring-data-elasticsearch uses es official TransportClient, while es officially plans to abandon TransportClient. The tool uses RestHighLevelClient officially recommended by es to make the call request. Please refer to RestHighLevelClient API.

Set Docker container to open and start

If-- restart=always is not specified at the time of creation, you can use the update command docker update-- restart=always [containerID]

Docker for Mac network host mode does not work

The Host mode is for performance, but this destroys the isolation of Docker, resulting in reduced security. In performance scenarios, you can use-netwokr host to enable Host mode, but it is important to note that if you start the container locally with Windows or Mac, you will encounter the failure of Host mode. The reason is that only Linux hosts are supported in Host mode.

See the official document: https://docs.docker.com/network/host/.

Client connects to ZooKeeper and reports to authenticate using SASL (unknow error)

The version of ZooKeeper in zookeeper.jar is inconsistent with that in Dokcer

Zookeeper.jar uses versions prior to 3.4.6

This error means that ZooKeeper as an external application needs to apply for resources from the system, and when applying for resources, you need to go through authentication, and sasl is a way of authentication, we find a way to bypass sasl authentication. Avoid waiting to improve efficiency.

Add System.setProperty ("zookeeper.sasl.client", "false") to the project code; if it is a Spring Boot project, you can add zookeeper.sasl.client=false to the application.properties.

Reference: Increased CPU usage by unnecessary SASL checks.

If you change the version of zookeeper.jar that you depend on in canal.client.jar

Download the official source code of Canal to native git clone https://github.com/alibaba/canal.git, then modify the contents of ZooKeeper in the pom.xml file under the client module, and then re-mvn install:

Replace the package on which your project depends with the package just produced by mvn install:

On the choice of type selection

The above is all the content of this article "how to realize the real-time incremental data transmission function of MySQL based on Docker and Canal". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report