In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
Today, I will talk to you about how to analyze Debezium MySQL module design, many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.
Note: this article will not focus on the structure and parsing process of the MySQL binlog format, but on the architectural design of debezium.
Debezium is an open source distributed platform for change data capture.
Quoting this sentence from the official website of debezium, we can see that debezium is still very ambitious. It defines itself as a general CDC platform. In fact, it is also true. Especially since version 0.8, developers have devoted a lot of energy to the development of PostgreSQL modules. On the one hand, they have introduced the support of SQL Server, Oracle, Db2, Cassandra and other databases, on the other hand, they have adapted to Pulsar,Amazon Kineis,Google Pub/Sub and other messaging engines, and have gradually reconstructed them. Decoupling and binding of specific databases and dependencies of specific messaging systems are closer to unified architecture and cloud natives.
In fact, the early Debezium is tightly coupled to the Kafka Connect framework, Debezium is a Source Plugin of Kafka Connect, and mainly adapts to MySQL, with a little MongoDB.
At present, the latest version of Debezium is 1.2, and 1.3 has entered the beta phase. In fact, the MySQL module has been basically finalized in version 0.8, and only 0.9 of the subsequent changes have been incorporated into DBZ-175, which is an online table addition feature, only as an internal experimental feature, and is not mentioned in the document. Personally, I think this design is very wonderful and will be discussed at length in this article. In subsequent development, the same early MongoDB was completely refactored, but the MySQL module remained the same.
Rebase MySQL connector to common framework used by the other connectors.
This has been on Roadmap for some time, but it can be predicted that there will be no action in the near future. A large part of the reason is that there is a lot of extra handling of MySQL and Kafka Connect defects in the code of the MySQL module, as well as features such as DBZ-175 that are not supported by the unified architecture, and because of the widespread use of MySQL, the community has discovered and fixed bug in a large number of scenarios over the years, so it is risky to bring down a proven module architecture.
The subsequent analysis only focuses on the architecture and code of the MySQL module and basically does not involve the new unified architecture.
Kafka Connect
As mentioned above, Debezium was originally designed as a Source Plugin for Kafka Connect, and although developers are working to decouple it from Kafka Connect, the current code implementation has not changed. The following figure is quoted from the official Debeizum documentation, and you can see the location of a Debezium in a complete CDC system.
Kafka Connect provides a series of programming interfaces for Source Plugin, the most important of which is to implement the poll method of SourceTask, whose return List will be delivered to Kafka at least once semantically. If you want to know more details about Kafka Connect, please refer to my other article: https://www.jianshu.com/p/538b2f0a7462
Public abstract class SourceTask implements Task {... Public abstract List poll () throws InterruptedException;.} Debezium MySQL architecture
The Reader architecture constitutes the main thread of the code in the MySQL module, and our analysis starts with Reader.
Here is the entire inheritance tree of Reader, let's ignore ParallelSnapshotReader,ReconcilingBinlogReader for the time being, they are the things introduced by DBZ-175.
As can be seen from the name, the real main ones are SnapshotReader and BinlogReader, which achieve full reading and incremental reading of MySQL data respectively. They inherit from AbstractReader and encapsulate shared logic. The following figure shows the internal design of AbstractReader.
As you can see, when AbstractReader implements, instead of delivering the record fed by enqueue directly to Kafka, it is decoupled by a memory blocking queue BlockingQueue. This design has many advantages:
Duty decoupling
In the figure above, before feeding the BlockingQueue, it is necessary to judge whether to accept the record; according to the conditions. Before delivering the record to the Kafka, judge the running status of the task. In this way, similar functions are limited to a specific location.
Thread isolation
BlockingQueue is a thread-safe blocking queue, and the producer-consumer model implemented through BlockingQueue can run in different threads to avoid the overall interference caused by local blocking. For example, on the right side of the figure above, consumers will periodically judge the running flag bit. If the running is set to false by the stop signal, the entire task can be stopped immediately without delay due to MySQL IO blocking.
Mutual Transformation between Single and Batch
Enqueue record is a single delivery record,drain_to is a batch of consumer records. This usage can also be reversed to achieve the transformation from batch to single.
There are two ChainedReader and TimedBlockingReader left.
ChainedReader, as its name implies, wraps several Reader and executes them serially.
TimedBlockingReader is a simple sleep for a period of time, and it exists to deal with the design flaws of Kafka Connect rebalance, which I mentioned in another article above.
Seamless connection of Snapshot Stream
If you have built the master-slave synchronization of MySQL, you should know that when building the slave database, you need to export the full amount of data first (MySQL 8.0.x seems to have a more convenient way), and then record the location of binlog, import the full data into the slave database, and continue incremental synchronization from the binlog location to maintain data consistency.
You may also know that another MySQL CDC tool, canal, which is open source by Ali, is only responsible for the stream process and does not deal with the snapshot process, which is one of the advantages of debezium over canal.
For Debezium, the idea of building a slave library is basically followed. Let's take a look at the detailed steps described in the official documentation. (if there are no additional instructions, the following discussion will only focus on the Innodb engine)
Grabs a global read lock that blocks writes by other database clients. The snapshot itself does not prevent other clients from applying DDL which might interfere with the connector's attempt to read the binlog position and table schemas. The global read lock is kept while the binlog position is read before released in a later step.
Starts a transaction with repeatable read semantics to ensure that all subsequent reads within the transaction are done against the consistent snapshot.
Reads the current binlog position.
Reads the schema of the databases and tables allowed by the connector's configuration.
Releases the global read lock. This now allows other database clients to write to the database.
Writes the DDL changes to the schema change topic, including all necessary DROP... And CREATE... DDL statements. This happens if applicable.
Scans the database tables and generates CREATE events on the relevant table-specific Kafka topics for each row.
Commits the transaction.
Records the completed snapshot in the connector offsets.
Debezium broke down the process into nine steps, which seemed a little more complicated than we thought.
In the current version of Debezium, these nine steps are single-threaded serial, and the main time-consuming step is step 7, which is actually the simplest way to use select * from table [where.] through jdbc. To read the full amount of data, if there are many tens of millions or larger tables, this step takes a long time. In fact, this step can be parallelized. In step 1, the global lock has been acquired. Before the global lock is released, multiple connections can be opened to pull out all the data in parallel, which greatly improves the efficiency.
In addition, if the whole process of snapshot fails, it cannot be recovered. after all, the transaction has been lost and the snapshot at that time can no longer be read to ensure data consistency.
The long Snapshot process and unrecoverable interruptions, coupled with Kafka Connect's rough rebalance strategy, are a major pain point in the early use of debezium. The introduction of TimedBlockingReader is to alleviate this problem to some extent.
ChainedReader ├── TimedBlockReader ├── SnapshotReader └── BinlogReader
When you are ready to submit multiple synchronization tasks at a time, because each task submission will trigger a rebalance, insert a TimedBlockReader in front of SnapshotReader and BinlogReader to ensure that the synchronization task will not be executed immediately after the synchronization task is submitted. When multiple tasks are submitted, the cluster stabilizes and the concurrent execution begins.
In particular, both snapshot and stream procedures are optional, and like canal, you can only listen to binlog and capture stream data from the current moment. For more information, please refer to the official documentation.
Schema timeline construction
Let's take a look at the stream process, that is, the binlog parsing process. (data synchronization binlog must be set to row mode)
I believe that most of the students who can read here have executed the following command, which is to parse the contents of the binlog file with MySQL's official binlog tool. If you look closely, you will find that there are library names and table names, values for each field, but no field names. In other words, binlog does not contain schema information!
Mysqlbinlog-- no-defaults-- base64-output=decode-rows-vvv ~ / Downloads/mysql-bin.001192 | less#190810 12:00:20 server id 206195699 end_log_pos 8624 CRC32 0x46912d80 GTID last_committed=12 sequence_number=13 rbrachionlyzed accounts / set @ @ SESSION.GTID_NEXT= '5358e6dc-d161-11e8-8a6cmure 7cd30ac4dc4414115781 / # at 8624 / 190810 12:00:20 server id 206195699 end_log_pos 8687 CRC32 0xe14a2f5a Query thread_id=576127 exec_time=0 error_code=0SET timestamp 1565409620 # at 8687controls 190810 12:00:20 server id 206195699 end_log_pos 8775 CRC32 0xaf16fb7d Table_map: `risk_ control`.`log _ operation` mapped to number 3928controls at 8775190810 12:00:20 server id 206195699 end_log_pos 9055 CRC32 0x9bdc15ae Write_rows: table id 39286 flags: STMT_END_F### INSERT INTO `risk_ control`.`log _ operation` # # SET### @ 1' 8166048 / * INT meta=0 nullable=0 is_null=0 * / # @ 2managers' 7b0d526124ba40f6ac71cfe1d0d90665' / * VARSTRING (160C) meta=160 nullable=0 is_null=0 * / # @ 3room17 / * INT meta=0 nullable=1 is_null=0 * / # # @ 4roomcake 2' / * VARSTRING (64) meta=64 nullable=1 is_null=0 * / # @ 5signals workshop Method public void com.xxx.risk.service.impl.ActivitiEventServiceImpl.updateAuditStatus (java.lang.String,java.lang.String) Parameter {"auditStatus": 3}'/ * VARSTRING (1020) meta=1020 nullable=0 is_null=0 * / # @ 6customers'\ X00\ X01\ X00\ x16\ X00\ X0b\ X0b\ X05\ x03\ x00auditStatus' / * JSON meta=4 nullable=1 is_null=0 * / # # @ 7 cycles 2019-08-10 12 hours DATETIME (0) meta=0 nullable=0 is_null=0 * / # @ 8 cycles'/ * VARSTRING (128) meta=128 nullable=0 is _ null=0 * / # @ 9 stories'/ * VARSTRING (256) meta=256 nullable=0 is_null=0 * / # at 9055 1908 10 12:00:20 server id 206195699 end_log_pos 9086 CRC32 0xbe19a1b1 Xid = 180175929COMMIT Universe
In fact, this design is understandable, as an efficient binary format, binlog does not store highly redundant column names can significantly reduce the size, and, with table names, table structure information can be obtained from the MySQL information_ schema table, why save another copy?
However, debezium steals a column and simulates taking binlog from Kula for parsing. He is not a real slave library, there is no information_ schema table to look up, and can only be queried from the MySQL main library. But is this really foolproof?
Consider the following scenario:
15:00 normal consumption of BinlogReader
15:05 Kafka Connect cluster maintenance, suspend BinlogReader
Table A was modified at 15:10 by adding 1 column after the third column (the new column is not necessarily at the end)
15:15 Kafka Connect cluster maintenance ends, resume BinlogReader
In this scenario, after BinlogReader resumes at 15:15, it continues to read and parse binlog from 15:05. If you read information_schema from MySQL to get the schema information of Table A, then binlog and schema do not match between 15:05 and 15:10, and the correct data cannot be parsed. In other words, if debezium has a delay in reading binlog, and the main library schema has been modified during this time, then there will be a problem with the scheme of reading the main library information_schema.
To solve this problem, you need to simulate the information_schema mechanism and maintain a current schema snapshot, but is that enough?
Going back to the AbstractReader internal design mentioned earlier, BinlogReader, as a producer, delivers the parsed data to BlockingQueue, and if an DDL statement (such as alter table add column...) is encountered during parsing binlog, the current schema snapshot is updated.
At this time, if the stop task, as shown in the figure above, the unconsumed records in the BlockingQueue will be discarded. If the record parsed before the schema modification is included, the binlog will be parsed from here next time, while the schema snapshot stored by the debezium will already correspond to the modified one, and it will also be treated as a mismatch between binlog and schema. In this edge scenario, just saving the current schema snapshot won't work. Of course, we will mention later that this model also does not satisfy many other scenarios.
At this point, we have to use the ultimate solution to save every schema change and construct a complete schema timeline to ensure that the corresponding version of the schema snapshot can be found when parsing the binlog event at any time.
DatabaseHistory is used in Debezium to implement this function, and the functionality has been satisfied, but the implementation is indeed crude. MySQLDatabaseHistory exports all create table statements from the start of the synchronization task (see step 6 of the snapshot process). On this basis, each DDL statement is recorded. Debezium provides memory, files, and kafka topic implementations for these DDL stores, where kafka topic must set the expiration policy to never expire.
When you want to revert to an schema snapshot at any time, start from scratch, parse all the DDL one by one, and stack the changes until the last DDL before the specified time. As you can see, the efficiency of this implementation is relatively low. When the task lasts for several months, a large amount of DDL will be accumulated (especially on Aliyun RDS. If you don't know what Aliyun has changed, a large amount of DDL will be generated in the binlog). A recovery may take tens of minutes or even hours, and if there is an error in DDL parsing, it will cause errors in all subsequent snapshots. Developers have been aware of this problem for a long time and have come up with some ideas for improvement, which may make progress in the near future.
Seeing here, I believe you can understand why some commercial data synchronization engines have restrictions on schema changes during synchronization, and it is not easy to fully support various situations.
Change the engine on the plane-add the meter online
We've mentioned DBZ-175 many times before, and now we're talking about this wonderful design. Https://issues.redhat.com/browse/DBZ-175
Note: this feature is only used as an internal experimental feature and is not mentioned in the official document. If you have any questions, please refer to the JIRA discussion or read the debugging source code.
Let's start with some background. Debezium allows you to specify the tables to be synchronized through table.whitelist and table.blacklist during data synchronization. Let's say that at the beginning, we configure table.whitelist as two tables, which have completed the Snapshot phase and have been steadily switched to the Stream phase. At this time, there is a new synchronization requirement, it is necessary to re-synchronize the c table, and it is best not to interfere with the synchronization progress of the two tables. The natural idea is that I will start a new synchronization task to deal with the c table. In the long run, you will find that there are many "slave libraries" hanging on a MySQL master library, which will put some pressure on the MySQL master library. Therefore, an ideal solution is: after modifying the table.whitelist of the synchronization task, debezium can automatically complete the full and incremental synchronization of the newly added table, and this process will not interfere with the original synchronization task; when the progress of the new and old synchronization tasks are similar, they will be merged into one, and only the same BinlogReader will be used to complete the subsequent stream synchronization.
Easy! How can it be realized? That is, ParallelSnapshotReader and ReconcilingBinlogReader, which we temporarily skipped earlier.
First of all, describe the structure of the whole Reader after adding tables online.
ChainedReader ├── ParallelSnapshotReader │ ├── OldTablesBinlogReader │ └── ChainedReader │ ├── NewTablesSnapshotReader │ └── NewTablesBinlogReader ├── ReconcilingBinlogReader └── UnifiedBinlogReader
ParallelSnapshotReader, like its name, starts full and incremental synchronization of newly added tables in parallel without interfering with the operation of OldTablesBinlogReader.
When the newly added table enters the stream phase, each pull of OldTablesBinlogReader and NewTablesBinlogReader will be compared with each other, and when the progress difference between them is within a certain period of time (the default is 5 minutes), the two will be stopped.
At this point, the ParallelSnapshotReader exits, and the ReconcilingBinlogReader synchronizes the progress of the two BinlogReader, which is about to level with the leader.
The original two BinlogReader exits, and the newly built UnifiedBinlogReader continues to do stream parsing of all the new and old tables from its site, and the whole merging process ends.
I came up with this design in combination with code debugging after flipping through the discussion records of several years of discussions among several developers on JIRA. Looking back on my recent graduation, when I received the demand for this online watch addition, I thought it was impossible, until I found this design, I could not help but sigh and surprise.
This is only a first version of the implementation, in the whole process, because of metadata design problems, does not support schema changes, may be for this reason, serious developers choose not to disclose this feature, only as an internal experimental feature.
From the perspective of architecture, this paper combs the basic structure of the debezium MySQL module, hoping to make everyone understand the overall structure and design concept of the code, and there are many details to be explored. If I have the opportunity later, I will share some problems encountered in the production environment and solutions. After reading the above, do you have any further understanding of how to analyze the Debezium MySQL module design? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
Http://blog.sina.com.cn/s/blog_5007d1b1010087ng.html
© 2024 shulou.com SLNews company. All rights reserved.