Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to listen to the opening steps of binlog in Mysql database

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces the Mysql database how to monitor the opening steps of binlog, the article is very detailed, has a certain reference value, interested friends must read it!

Preface

We often need to do something according to what users do with their own data.

For example, if a user deletes his account, we will send him a text message to scold him and send a text message to beg him to come back.

Similar to this function, of course, it can be implemented in the business logic layer, which can be performed after receiving the delete request from the user, but the binlog of the database provides us with another method of operation.

To monitor binlog, you need two steps. The first step is that your mysql needs to enable this function, and the second is to write a program to read the log.

Mysql opens binlog.

First of all, mysql's binlog is not open every day, so we need to:

Find the configuration file my.cnf of mysql. Because the operating system is different and the location is not necessarily the same, you can find it yourself.

Add the following to it:

[mysqld] server_id = 1log-bin = mysql-binbinlog-format = ROW

Then restart mysql.

/ ubuntuservice mysql restart// macmysql.server restart

Monitor whether it is enabled successfully

Enter the mysql command line and execute:

Show variables like'% log_bin%'

If the result is like the following figure, it is successful:

View the status of the binlog being written:

Code reading binlog

Introduce dependency

We use some open source implementations, and for some strange reasons, I chose the mysql-binlog-connector-java package, (official github repository) [github.com/shyiko/mysq...] The specific dependence is as follows:

Com.github.shyiko mysql-binlog-connector-java 0.17.0

Of course, there are many open source implementations of binlog, Ali's cancl is one, you can also use it.

Write a demo

According to the readme in the official warehouse, simply write a demo.

Public static void main (String [] args) {BinaryLogClient client = new BinaryLogClient ("hostname", 3306, "username", "passwd"); EventDeserializer eventDeserializer = new EventDeserializer (); eventDeserializer.setCompatibilityMode (EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); client.setEventDeserializer (eventDeserializer); client.registerEventListener (new BinaryLogClient.EventListener () {@ Override public void onEvent (Event event) {/ TODO dosomething (); logger.info (event.toString () }); client.connect ();}

This is entirely based on the official tutorial. You can write your own business logic in onEvent. Since I am just testing, I print out every event in it.

After that, I manually logged in to mysql and added, modified and deleted respectively. The listening log is as follows:

INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData {binlogFilename='mysql-bin.000001', binlogPosition=886}}

00RV 23 INFO util.MysqlBinLog 13.334 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData {binlogVersion=4, serverVersion='5.7.23-0ubuntu 0.16.04.1 house logbook, headerLength=19, dataLength=95}}

00Event 2315 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}

00Event 2315 INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData {threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}

00Event 2315 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData {tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability= {}}

00Event 23 main 23.724 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData {tableId=108, includedColumns= {0,1}, rows= [

[[B@546a03af, 2]

]}}

00 Event 23 main 23.725 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData {xid=28}

INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}

00RV 23 INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData {threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}

00RV 23 INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData {tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability= {}}

00RV 23 main 55.875 INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData {tableId=108, includedColumnsBeforeUpdate= {0,1}, includedColumns= {0,1}, rows= [

{before= [[B@6833ce2c, 1], after= [[B@725bef66, 3]}

]}}

INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData {xid=41}}

00RV 24 main 22.333 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}

00RV 24VR 22.334 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData {threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}

00RV 24 main 22.334 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData {tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability= {}}

00RV 24 main 22.335 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData {tableId=108, includedColumns= {0,1}, rows= [

[[B@1888ff2c, 3]

]}}

00RV 24 main 22.335 [main] INFO util.MysqlBinLog-Event {header=EventHeaderV4 {timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData {xid=42}

Encapsulate a better and more customized tool class according to your own business

At the beginning, I planned to paste the code, but the code was written more and more, so it was simply posted on github, and only part of the implementation was posted here. Code transfer gate

Realization idea

Support snooping on a single table because we don't really want to listen to all tables in all databases.

Can be multi-threaded consumption.

Convert the listening content into a form that we like to hear and hear (the data structure in the article is not necessarily very good, I did not think of a more suitable one).

So the idea of implementation is roughly as follows:

Encapsulate a client, provide only the acquisition method, and mask the initialization details.

Provides a way to register listeners (pseudo) to register listeners on a table (redefine a listener interface, and all registered listeners implement this).

The real listener is only the client, which listens to all the operations on this database instance and converts them to the desired format LogItem and puts them in the blocking queue.

Start multiple threads, consume blocking queues, and do some business logic to the listener of a LogItem call to the corresponding data table.

Initialization code:

Public MysqlBinLogListener (Conf conf) {BinaryLogClient client = new BinaryLogClient (conf.host, conf.port, conf.username, conf.passwd); EventDeserializer eventDeserializer = new EventDeserializer (); eventDeserializer.setCompatibilityMode (EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY); client.setEventDeserializer (eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue (1024); this.conf = conf; listeners = new ConcurrentHashMap (); dbTableCols = new ConcurrentHashMap (); this.consumer = Executors.newFixedThreadPool (consumerThreads);}

Registration Code:

Public void regListener (String db, String table, BinLogListener listener) throws Exception {String dbTable = getdbTable (db, table); Class.forName ("com.mysql.jdbc.Driver"); / / Save the colum information of the currently registered table Connection connection = DriverManager.getConnection ("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd); Map cols = getColMap (connection, db, table); dbTableCols.put (dbTable, cols) / / Save the currently registered listener List list = listeners.getOrDefault (dbTable, new ArrayList ()); list.add (listener); listeners.put (dbTable, list);}

In this step, when we register the listener, we obtain the schema information of the table and save it in map to facilitate subsequent processing of the data.

Listening code:

@ Override public void onEvent (Event event) {EventType eventType = event.getHeader (). GetEventType (); if (eventType = = EventType.TABLE_MAP) {TableMapEventData tableData = event.getData (); String db = tableData.getDatabase (); String table = tableData.getTable (); dbTable = getdbTable (db, table);} / / only three operations if (isWrite (eventType) | | isUpdate (eventType) | | isDelete (eventType)) {if (isWrite (eventType)) {WriteRowsEventData data = event.getData () For (Serializable [] row: data.getRows ()) {if (dbTableCols.containsKey (dbTable)) {LogItem e = LogItem.itemFromInsert (row, dbTableCols.get (dbTable)); e.setDbTable (dbTable); queue.add (e);}

I am lazy, which only implements the processing of the add operation, other operations are not written.

Consumption Code:

Public void parse () throws IOException {parseClient.registerEventListener (this); for (int I = 0; I

< consumerThreads; i++) { consumer.submit(() ->

{while (true) {if (queue.size () > 0) {try {LogItem item = queue.take (); String dbtable = item.getDbTable (); listeners.get (dbtable) .forEach (l-> {l.onEvent (item);});} catch (InterruptedException e) {e.printStackTrace ();}} Thread.sleep (1000);}} parseClient.connect ();}

When consuming, get the item from the queue, and then get one or more corresponding listeners to consume the item.

Test the code:

Public static void main (String [] args) throws Exception {Conf conf = new Conf (); conf.host = "hostname"; conf.port = 3306; conf.username = conf.passwd = "hhsgsb"; MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener (conf); mysqlBinLogListener.parseArgsAndRun (args); mysqlBinLogListener.regListener ("pf", "student", item-> {System.out.println (new String ((byte []) item.getAfter (). Get ("name") Logger.info ("insert into {}, value = {}", item.getDbTable (), item.getAfter ();}); mysqlBinLogListener.regListener ("pf", "teacher", item-> System.out.println ("teacher ="); mysqlBinLogListener.parse ();}

In this little code, two listeners are registered to listen to student and teacher tables and print them respectively. After testing, when the teacher table inserts data, it can run the defined business logic independently.

Note: the tool class here can not be directly put into use, because there are a lot of exception handling has not been done, and the function only listens to insert statements, which can be used as a reference for implementation.

The above is all the contents of the article "how to monitor the opening steps of binlog in Mysql database". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report