In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/02 Report--
Some preliminary practices of offline Analysis platform based on binlog
Reference documentation:
Http://quarterback.cn/%e9%80%9a%e8%bf%87kafka-nifi%e5%bf%ab%e9%80%9f%e6%9e%84%e5%bb%ba%e5%bc%82%e6%ad%a5%e6%8c%81%e4%b9%85%e5%8c%96mongodb%e6%9e%b6%e6%9e%84/
Http://seanlook.com/2018/01/13/maxwell-binlog/
Https://yq.aliyun.com/articles/338423
Directly above the picture
Option 1:
Option 2:
Option 3
Option 1 is relatively simple, basically satisfies the use, and is also a good choice. But the function is relatively simple.
Scenario 2 is more complex and introduces more components to store data in MongoDB. This introduction of kafka is more suitable for scenarios with multiple heterogeneous databases or DW data warehouse decimation.
Scenario 3 is also more complex, similar to scenario 2, except that the data is stored in ES, and graylog comes with a web query interface.
Here we experiment with scheme 2, which first collects binlog to kafka, and then can consume binlog freely, which is more flexible.
The software involved in the experiment:
OS version: CentOS7.5
Maxwell version: 1.22.4
Nifi version: 1.9.2
Kafka-eagle version: 1.3.9
Maxwell deployment node: 192.168.20.10
Zk+kafka deployment node: 192.168.2.4
Node for kafka-eagle deployment: 192.168.2.4
Node for nifi deployment: 192.168.2.4
Simulated business MySQL database: 192.168.2.4purl 3306
Kafka and zk
The deployment of kafka and zk is not the focus here. My zk and kafka here are both deployed on 192.168.2.4, and I will skip the specific operations here.
In my experiment, both zk and kafka are deployed on a stand-alone, so cluster mode must be used in production environment.
1. It is best to write the hostname and ip relationship to the / etc/hosts of each host, otherwise you may encounter resolution failure.
2. It should be noted that my zk here is a high version, and port 8080 will be listened to by default. It is recommended to change port 8080 to other services.
[root@Test-dba01 / usr/local/zookeeper-3.5.5-bin] # cat conf/zoo.cfg
TickTime=2000
InitLimit=10
SyncLimit=5
DataDir=./data/
ClientPort=2181
Admin.serverPort=12345
After startup, you can see that the listening port is up.
[root@Test-dba01 / usr/local/kafka] # ss-lnt | egrep 2181
LISTEN 0 50:: 2181:: *
[root@Test-dba01 / usr/local/kafka] # ss-lnt | egrep 12345
LISTEN 0 50:: 12345:: *
Kafka-eagle
Kafka-eagle is a domestic tycoon developed, I use it here is mainly like its accompanying ksql function, support direct query kafka topic inside the data.
In addition, there are many useful features of this tool, which I will not introduce here.
Post my configuration
Cd / root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9
Egrep-v'^ $| ^ #'/ root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9/conf/system-config.properties
Kafka.eagle.zk.cluster.alias=cluster1
Cluster1.zk.list=192.168.2.4:2181
Kafka.zk.limit.size=25
Kafka.eagle.webui.port=8048
Cluster1.kafka.eagle.offset.storage=kafka
Cluster2.kafka.eagle.offset.storage=zk
Kafka.eagle.metrics.charts=false
Kafka.eagle.sql.fix.error=false
Kafka.eagle.sql.topic.records.max=5000
Kafka.eagle.mail.enable=false
Kafka.eagle.mail.sa=alert_sa@163.com
Kafka.eagle.mail.username=alert_sa@163.com
Kafka.eagle.mail.password=mqslimczkdqabbbh322222
Kafka.eagle.mail.server.host=smtp.163.com
Kafka.eagle.mail.server.port=25
Kafka.eagle.topic.token=keadmin
Cluster1.kafka.eagle.sasl.enable=false
Cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
Cluster1.kafka.eagle.sasl.mechanism=PLAIN
Cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username= "admin" password= "kafka-eagle"
Cluster2.kafka.eagle.sasl.enable=false
Cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
Cluster2.kafka.eagle.sasl.mechanism=PLAIN
Cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username= "admin" password= "kafka-eagle"
Kafka.eagle.driver=org.sqlite.JDBC
Kafka.eagle.url=jdbc:sqlite:./db/ke.db
Kafka.eagle.username=root
Kafka.eagle.password=www.kafka-eagle.org
The main reason is to modify the address of the zk and the path of the sqlite database, and the rest remain the default.
Start the process:
Export KE_HOME=/root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9
Export PATH=$PATH:$KE_HOME/bin
. / bin/ke.sh start
Log in to the web page
Http://192.168.2.4:8048/ke/
User name admin
Password 123456
Specific functions, we are free to explore, the whole tool is still very powerful.
Maxwell
Maxwell uses version 1.22.4
0. Open an account in mysql of 192.168.2.4 to make it easier for maxwell to connect and pull binlog.
Mysql > CREATE USER 'maxwell'@'%' IDENTIFIED BY' XXXXXX'
Mysql > GRANT ALL ON maxwell.* TO 'maxwell'@'%'
Mysql > GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *. * TO 'maxwell'@'%'
1. Deploy maxwell on 192.168.20.10
Cd / usr/local/
Curl-sLo-https://github.com/zendesk/maxwell/releases/download/v1.22.4/maxwell-1.22.4.tar.gz | tar zxvf-
Cd maxwell-1.22.4/
2. How to output to kafka
2.1 copy kafka-clients-2.3.0.jar to the lib/kafka-clients/ directory of maxwell
2.2 modify the configuration file
Cp config.properties.example config.properties is then modified, and the modified content is as follows:
Log_level=info
Producer=kafka
# connection information of MySQL stored in metadata of maxwell
Host=localhost
User=maxwell
Password=maxwell
Producer=kafka
Host=127.0.0.1
Port=3306
User=maxwell
Password=XXXXXX
Schema_database=maxwell
Gtid_mode=true
Ssl=DISABLED
Replication_ssl=DISABLED
Schema_ssl=DISABLED
# connection information of upstream MySQL
Replication_host=192.168.2.4
Replication_user=maxwell
Replication_password=XXXXXX
Replication_port=3306
# define what data needs to be output
Output_binlog_position=true
Output_gtid_position=true
Output_nulls=true
Output_server_id=true
Output_ddl=true
Output_commit_info=true
In kafka.bootstrap.servers=192.168.2.4:9092 # production environment, you need to fill in the connection method of multiple kafka
Kafka_topic=maxwell
Ddl_kafka_topic=maxwell_ddl
Kafka.compression.type=snappy
Kafka.retries=5
Kafka.acks=1
Producer_partition_by=database
# here are the copied filtering rules. Binlog that does not meet the following conditions will not be retained [regular expressions are supported]
# filter= exclude: test.*, include: db.*, include: coupons.*, include: testdb.user
# expose metrics address for monitoring
Metrics_type=http
Metrics_prefix=MaxwellMetrics
Metrics_jvm=true
Http_port=8081
2.3 Front desk launch
Before starting, create 2 topic:
Bin/kafka-topics.sh-- zookeeper 192.168.2.4 create-- topic maxwell-- partitions 20-- replication-factor 2
Bin/kafka-topics.sh-- zookeeper 192.168.2.4 create-- topic maxwell_ddl-- partitions 6-- replication-factor 2
During the test, we start the maxwell process at the foreground first.
Bin/maxwell-config config.properties-producer=kafka-kafka_version=2.3.0
Another suggestion: we start two foreground consumer processes on 192.168.2.4 to observe how data enters kafka:
Cd / opt/kafka1/bin/
. / kafka-console-consumer.sh-- bootstrap-server 192.168.2.4 topic maxwell
. / kafka-console-consumer.sh-- bootstrap-server 192.168.2.4 topic maxwell_ddl
The data in maxwell topic; something like this:
{"database": "test", "table": "resourcesinfo", "type": "delete", "ts": 1571644826, "xid": 5872872, "xoffset": 78, "position": "mysql-bin.0003306,"data": {"id": 94, "name": "222"," hostname ":" 33 "," spec ":", "belong": "," createtime ":" 0000-00-0000: 000.000000 "}}
The data in maxwell_ddl topic; something like this:
{"type": "table-create", "database": "leaf", "table": "d2sf", "def": {"database": "leaf", "charset": "utf8mb4", "table": "d2sf", "columns": [{"type": "varchar", "name": "biz_tag", "charset": "utf8mb4"}, {"type": "bigint", "name": "max_id", "signed": true}, {"type": "int", "name": "step" "signed": true}, {"type": "varchar", "name": "description", "charset": "utf8mb4"}, {"type": "timestamp", "name": "update_time", "column-length": 0}], "primary-key": ["biz_tag"]}, "ts": 1571642076000, "sql": "create table d2sf like leaf_alloc", "position": "mysql-bin.000003:172413504", "gtid": "fd2adbd9-e263-11e8-847a-141877487b3d:1386014"}}
Build MongoDB replication set
It's not the key step here.
Here is the mongodb replication set of stand-alone multiple instances deployed on 192.168.2.4.
192.168.2.4:27017 standby
192.168.2.4:27017 primary
192.168.2.4:27019 ARBITER
No password is set for login.
Then, create a test database and table
Production:PRIMARY > use testdb
Production:PRIMARY > db.createCollection ("maxwell")
Building NIFI here is the key.
NIFI is an ETL tool that is relatively simple.
Cd / root/
Tar xf nifi-1.9.2.tar.gz-C. /
Cd / root/nifi-1.9.2
We don't optimize the relevant parameters here, let's try to run and see the effect.
. / bin/nifi.sh start
Wait for 3 minutes to check the status.
. / bin/nifi.sh status
Java home: / usr/local/jdk
NiFi home: / root/nifi-1.9.2
Bootstrap Config File: / root/nifi-1.9.2/conf/bootstrap.conf
2019-10-21 17 org.apache.nifi.bootstrap.Command Apache NiFi is currently running 4615 48372 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 43024, PID=130790
Access the web interface
Http://192.168.2.4:8080/nifi/
Drag the "process group" button to the middle of the page to create a "process group" named test
Then double-click the test box, and on this page, create two processpor and connect them with lines
High-energy early warning: the following configuration operation, a little difficult, I posted the picture is not very good description, may not be able to help you, if there is a problem need to explore their own!
Then, on 192.168.2.4, we randomly crud some data to see if there are any numerical changes on the NIFI interface.
If, there is no problem here. Let's go to the mongodb database to see if the data is in.
Processing of verification data and subsequent data
Go to mongodb to see if any data is coming in.
Use maxwell
Db.maxwell.findOne ()
Once we have the data, we can continue to operate based on mongodb.
Db.maxwell.createIndex ({ts:1}, {background:true})
Db.maxwell.createIndex ({table:1}, {background:true})
Db.maxwell.createIndex ({database:1}, {background:true})
Db.maxwell.createIndex ({database:1,table:1}, {background:true})
Db.maxwell.find ({table: "tbsdb"}) .pretty ()
Db.maxwell.find ({table: "leaf_alloc"}) .pretty ()
Db.maxwell.find ({database: "leaf"}) .pretty ()
The db.maxwell.find ({"database": "test"}) .pretty () log looks like this:
Count the operations within a certain time range:
Db.maxwell.count ({'ts': {$lt:1571673600,$gt:1571587200}, "database": "test", "type": "delete"})
Db.maxwell.count ({'ts': {$lt:1571673600,$gt:1571587200}, "database": "test", "type": "update"})
Db.maxwell.count ({'ts': {$lt:1571673600,$gt:1571587200}, "database": "test", "type": "insert"})
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.