Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Some preliminary practices of offline Analysis platform based on binlog

2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/02 Report--

Some preliminary practices of offline Analysis platform based on binlog

Reference documentation:

Http://quarterback.cn/%e9%80%9a%e8%bf%87kafka-nifi%e5%bf%ab%e9%80%9f%e6%9e%84%e5%bb%ba%e5%bc%82%e6%ad%a5%e6%8c%81%e4%b9%85%e5%8c%96mongodb%e6%9e%b6%e6%9e%84/

Http://seanlook.com/2018/01/13/maxwell-binlog/

Https://yq.aliyun.com/articles/338423

Directly above the picture

Option 1:

Option 2:

Option 3

Option 1 is relatively simple, basically satisfies the use, and is also a good choice. But the function is relatively simple.

Scenario 2 is more complex and introduces more components to store data in MongoDB. This introduction of kafka is more suitable for scenarios with multiple heterogeneous databases or DW data warehouse decimation.

Scenario 3 is also more complex, similar to scenario 2, except that the data is stored in ES, and graylog comes with a web query interface.

Here we experiment with scheme 2, which first collects binlog to kafka, and then can consume binlog freely, which is more flexible.

The software involved in the experiment:

OS version: CentOS7.5

Maxwell version: 1.22.4

Nifi version: 1.9.2

Kafka-eagle version: 1.3.9

Maxwell deployment node: 192.168.20.10

Zk+kafka deployment node: 192.168.2.4

Node for kafka-eagle deployment: 192.168.2.4

Node for nifi deployment: 192.168.2.4

Simulated business MySQL database: 192.168.2.4purl 3306

Kafka and zk

The deployment of kafka and zk is not the focus here. My zk and kafka here are both deployed on 192.168.2.4, and I will skip the specific operations here.

In my experiment, both zk and kafka are deployed on a stand-alone, so cluster mode must be used in production environment.

1. It is best to write the hostname and ip relationship to the / etc/hosts of each host, otherwise you may encounter resolution failure.

2. It should be noted that my zk here is a high version, and port 8080 will be listened to by default. It is recommended to change port 8080 to other services.

[root@Test-dba01 / usr/local/zookeeper-3.5.5-bin] # cat conf/zoo.cfg

TickTime=2000

InitLimit=10

SyncLimit=5

DataDir=./data/

ClientPort=2181

Admin.serverPort=12345

After startup, you can see that the listening port is up.

[root@Test-dba01 / usr/local/kafka] # ss-lnt | egrep 2181

LISTEN 0 50:: 2181:: *

[root@Test-dba01 / usr/local/kafka] # ss-lnt | egrep 12345

LISTEN 0 50:: 12345:: *

Kafka-eagle

Kafka-eagle is a domestic tycoon developed, I use it here is mainly like its accompanying ksql function, support direct query kafka topic inside the data.

In addition, there are many useful features of this tool, which I will not introduce here.

Post my configuration

Cd / root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9

Egrep-v'^ $| ^ #'/ root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9/conf/system-config.properties

Kafka.eagle.zk.cluster.alias=cluster1

Cluster1.zk.list=192.168.2.4:2181

Kafka.zk.limit.size=25

Kafka.eagle.webui.port=8048

Cluster1.kafka.eagle.offset.storage=kafka

Cluster2.kafka.eagle.offset.storage=zk

Kafka.eagle.metrics.charts=false

Kafka.eagle.sql.fix.error=false

Kafka.eagle.sql.topic.records.max=5000

Kafka.eagle.mail.enable=false

Kafka.eagle.mail.sa=alert_sa@163.com

Kafka.eagle.mail.username=alert_sa@163.com

Kafka.eagle.mail.password=mqslimczkdqabbbh322222

Kafka.eagle.mail.server.host=smtp.163.com

Kafka.eagle.mail.server.port=25

Kafka.eagle.topic.token=keadmin

Cluster1.kafka.eagle.sasl.enable=false

Cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT

Cluster1.kafka.eagle.sasl.mechanism=PLAIN

Cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username= "admin" password= "kafka-eagle"

Cluster2.kafka.eagle.sasl.enable=false

Cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT

Cluster2.kafka.eagle.sasl.mechanism=PLAIN

Cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username= "admin" password= "kafka-eagle"

Kafka.eagle.driver=org.sqlite.JDBC

Kafka.eagle.url=jdbc:sqlite:./db/ke.db

Kafka.eagle.username=root

Kafka.eagle.password=www.kafka-eagle.org

The main reason is to modify the address of the zk and the path of the sqlite database, and the rest remain the default.

Start the process:

Export KE_HOME=/root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9

Export PATH=$PATH:$KE_HOME/bin

. / bin/ke.sh start

Log in to the web page

Http://192.168.2.4:8048/ke/

User name admin

Password 123456

Specific functions, we are free to explore, the whole tool is still very powerful.

Maxwell

Maxwell uses version 1.22.4

0. Open an account in mysql of 192.168.2.4 to make it easier for maxwell to connect and pull binlog.

Mysql > CREATE USER 'maxwell'@'%' IDENTIFIED BY' XXXXXX'

Mysql > GRANT ALL ON maxwell.* TO 'maxwell'@'%'

Mysql > GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *. * TO 'maxwell'@'%'

1. Deploy maxwell on 192.168.20.10

Cd / usr/local/

Curl-sLo-https://github.com/zendesk/maxwell/releases/download/v1.22.4/maxwell-1.22.4.tar.gz | tar zxvf-

Cd maxwell-1.22.4/

2. How to output to kafka

2.1 copy kafka-clients-2.3.0.jar to the lib/kafka-clients/ directory of maxwell

2.2 modify the configuration file

Cp config.properties.example config.properties is then modified, and the modified content is as follows:

Log_level=info

Producer=kafka

# connection information of MySQL stored in metadata of maxwell

Host=localhost

User=maxwell

Password=maxwell

Producer=kafka

Host=127.0.0.1

Port=3306

User=maxwell

Password=XXXXXX

Schema_database=maxwell

Gtid_mode=true

Ssl=DISABLED

Replication_ssl=DISABLED

Schema_ssl=DISABLED

# connection information of upstream MySQL

Replication_host=192.168.2.4

Replication_user=maxwell

Replication_password=XXXXXX

Replication_port=3306

# define what data needs to be output

Output_binlog_position=true

Output_gtid_position=true

Output_nulls=true

Output_server_id=true

Output_ddl=true

Output_commit_info=true

In kafka.bootstrap.servers=192.168.2.4:9092 # production environment, you need to fill in the connection method of multiple kafka

Kafka_topic=maxwell

Ddl_kafka_topic=maxwell_ddl

Kafka.compression.type=snappy

Kafka.retries=5

Kafka.acks=1

Producer_partition_by=database

# here are the copied filtering rules. Binlog that does not meet the following conditions will not be retained [regular expressions are supported]

# filter= exclude: test.*, include: db.*, include: coupons.*, include: testdb.user

# expose metrics address for monitoring

Metrics_type=http

Metrics_prefix=MaxwellMetrics

Metrics_jvm=true

Http_port=8081

2.3 Front desk launch

Before starting, create 2 topic:

Bin/kafka-topics.sh-- zookeeper 192.168.2.4 create-- topic maxwell-- partitions 20-- replication-factor 2

Bin/kafka-topics.sh-- zookeeper 192.168.2.4 create-- topic maxwell_ddl-- partitions 6-- replication-factor 2

During the test, we start the maxwell process at the foreground first.

Bin/maxwell-config config.properties-producer=kafka-kafka_version=2.3.0

Another suggestion: we start two foreground consumer processes on 192.168.2.4 to observe how data enters kafka:

Cd / opt/kafka1/bin/

. / kafka-console-consumer.sh-- bootstrap-server 192.168.2.4 topic maxwell

. / kafka-console-consumer.sh-- bootstrap-server 192.168.2.4 topic maxwell_ddl

The data in maxwell topic; something like this:

{"database": "test", "table": "resourcesinfo", "type": "delete", "ts": 1571644826, "xid": 5872872, "xoffset": 78, "position": "mysql-bin.0003306,"data": {"id": 94, "name": "222"," hostname ":" 33 "," spec ":", "belong": "," createtime ":" 0000-00-0000: 000.000000 "}}

The data in maxwell_ddl topic; something like this:

{"type": "table-create", "database": "leaf", "table": "d2sf", "def": {"database": "leaf", "charset": "utf8mb4", "table": "d2sf", "columns": [{"type": "varchar", "name": "biz_tag", "charset": "utf8mb4"}, {"type": "bigint", "name": "max_id", "signed": true}, {"type": "int", "name": "step" "signed": true}, {"type": "varchar", "name": "description", "charset": "utf8mb4"}, {"type": "timestamp", "name": "update_time", "column-length": 0}], "primary-key": ["biz_tag"]}, "ts": 1571642076000, "sql": "create table d2sf like leaf_alloc", "position": "mysql-bin.000003:172413504", "gtid": "fd2adbd9-e263-11e8-847a-141877487b3d:1386014"}}

Build MongoDB replication set

It's not the key step here.

Here is the mongodb replication set of stand-alone multiple instances deployed on 192.168.2.4.

192.168.2.4:27017 standby

192.168.2.4:27017 primary

192.168.2.4:27019 ARBITER

No password is set for login.

Then, create a test database and table

Production:PRIMARY > use testdb

Production:PRIMARY > db.createCollection ("maxwell")

Building NIFI here is the key.

NIFI is an ETL tool that is relatively simple.

Cd / root/

Tar xf nifi-1.9.2.tar.gz-C. /

Cd / root/nifi-1.9.2

We don't optimize the relevant parameters here, let's try to run and see the effect.

. / bin/nifi.sh start

Wait for 3 minutes to check the status.

. / bin/nifi.sh status

Java home: / usr/local/jdk

NiFi home: / root/nifi-1.9.2

Bootstrap Config File: / root/nifi-1.9.2/conf/bootstrap.conf

2019-10-21 17 org.apache.nifi.bootstrap.Command Apache NiFi is currently running 4615 48372 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 43024, PID=130790

Access the web interface

Http://192.168.2.4:8080/nifi/

Drag the "process group" button to the middle of the page to create a "process group" named test

Then double-click the test box, and on this page, create two processpor and connect them with lines

High-energy early warning: the following configuration operation, a little difficult, I posted the picture is not very good description, may not be able to help you, if there is a problem need to explore their own!

Then, on 192.168.2.4, we randomly crud some data to see if there are any numerical changes on the NIFI interface.

If, there is no problem here. Let's go to the mongodb database to see if the data is in.

Processing of verification data and subsequent data

Go to mongodb to see if any data is coming in.

Use maxwell

Db.maxwell.findOne ()

Once we have the data, we can continue to operate based on mongodb.

Db.maxwell.createIndex ({ts:1}, {background:true})

Db.maxwell.createIndex ({table:1}, {background:true})

Db.maxwell.createIndex ({database:1}, {background:true})

Db.maxwell.createIndex ({database:1,table:1}, {background:true})

Db.maxwell.find ({table: "tbsdb"}) .pretty ()

Db.maxwell.find ({table: "leaf_alloc"}) .pretty ()

Db.maxwell.find ({database: "leaf"}) .pretty ()

The db.maxwell.find ({"database": "test"}) .pretty () log looks like this:

Count the operations within a certain time range:

Db.maxwell.count ({'ts': {$lt:1571673600,$gt:1571587200}, "database": "test", "type": "delete"})

Db.maxwell.count ({'ts': {$lt:1571673600,$gt:1571587200}, "database": "test", "type": "update"})

Db.maxwell.count ({'ts': {$lt:1571673600,$gt:1571587200}, "database": "test", "type": "insert"})

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report