Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of DB data synchronization to data Warehouse

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article shares with you the content of a sample analysis of DB data synchronization to a data warehouse. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

Background

In data warehouse modeling, the original business layer data without any processing is called ODS (Operational Data Store) data. In Internet enterprises, there are two common types of ODS data: business log data (Log) and business DB data (DB). For business DB data, collecting business data from MySQL and other relational databases and then importing them into Hive is an important part of data warehouse production.

How to synchronize MySQL data to Hive accurately and efficiently? The commonly used solution is to batch fetch data and Load: directly connect MySQL to the data in the Select table, then save it to the local file as intermediate storage, and finally Load the file to the Hive table. The advantage of this scheme is that it is easy to implement, but with the development of the business, the disadvantages are gradually exposed:

Performance bottleneck: with the growth of business scale, data streams such as Select From MySQL-> Save to Localfile-> Load to Hive take longer and longer, unable to meet the time requirements of downstream warehouse production.

Select a large amount of data directly from MySQL, which has a great impact on MySQL. It is easy to cause slow query and affect the normal service on the business line.

Because the syntax of Hive itself does not support update, delete and other SQL primitives, the data in which Update/Delete occurs in MySQL can not be well supported.

In order to solve these problems thoroughly, we gradually turn to the technical solution of CDC (Change Data Capture) + Merge, that is, real-time Binlog acquisition + offline processing Binlog to restore business data. Binlog is the binary log of MySQL, which records all data changes in MySQL. The master-slave synchronization of MySQL cluster itself is based on Binlog.

This paper mainly introduces how to realize the accurate and efficient entry of Binlog data into data warehouse from two aspects: real-time collection of DB data and offline processing of Binlog restore business data.

Overall architecture

The overall architecture is shown in the figure above. In terms of Binlog real-time collection, we use Alibaba's open source project Canal, which is responsible for pulling Binlog from MySQL in real time and completing appropriate parsing. After Binlog is collected, it will be temporarily stored on Kafka for downstream consumption. The whole real-time acquisition part is shown by the red arrow in the picture.

Take the part of the Binlog processed offline, as shown by the black arrow in the figure, and restore an MySQL table on the Hive by following these steps:

Camus, an open source project using Linkedin, is responsible for pulling Binlog data from Kafka to Hive every hour.

For each ODS table, we first need to make a snapshot (Snapshot) at one time to read the stock data in the MySQL to the Hive. The bottom layer of the process is to connect the MySQL directly to the Select data.

For each ODS table, Merge each day based on the stock data and the Binlog generated by the increments of the day, so as to restore the business data.

Let's look back at the various problems encountered in the batch fetching and Load solution introduced in the background, why can we solve the above problems with this solution?

First of all, Binlog is generated by streaming. Through the real-time collection of Binlog, part of the data processing requirements are allocated from the daily batch processing to the real-time stream. Both in terms of performance and the pressure to access MySQL, there will be a significant improvement.

Second, Binlog itself records the type of data change (Insert/Update/Delete), through some semantic processing, can achieve accurate data restore.

Binlog real-time acquisition

The real-time acquisition of Binlog consists of two main modules: one is CanalManager, which is mainly responsible for the allocation of collection tasks, monitoring and alarm, metadata management and docking with external dependent systems, and the other is Canal and CanalClient that really perform acquisition tasks.

When a user submits a Binlog collection request for a DB, CanalManager will first call the relevant APIs of the DBA platform to obtain the relevant information of the MySQL instance where the DB resides, in order to select the machine most suitable for Binlog collection. Then distribute the collection instance (Canal Instance) to the appropriate Canal server, namely CanalServer. When selecting a specific CanalServer, CanalManager will consider factors such as load balancing and cross-room transmission, and give priority to machines with low load and transmission in the same region.

After receiving the collection request, CanalServer will register the collection information on ZooKeeper. The contents of registration include:

A permanent node named after the Instance name.

Register a temporary node named with its own ip:port under the permanent node.

This is done for two purposes:

High availability: when CanalManager distributes Instance, it selects two CanalServer, one is a Running node and the other is a Standby node. The Standby node listens to the Instance. When the Running node fails, the temporary node disappears, and then the Standby node preempts it. In this way, the goal of disaster recovery is achieved.

Interact with CanalClient: when CanalClient detects the Running CanalServer of the Instance it is responsible for, it connects to receive Binlog data from CanalServer.

Subscriptions to Binlog are based on the granularity of MySQL's DB, and a DB's Binlog corresponds to a Kafka Topic. In the underlying implementation, all subscribed DB under a MySQL instance are processed by the same Canal Instance. This is because Binlog is generated at a granularity of MySQL instances. CanalServer discards unsubscribed Binlog data, and CanalClient distributes the received Binlog to Kafka at DB granularity.

Restore MySQL data offline

After the completion of Binlog collection, the next step is to use Binlog to restore business data. The first problem to solve is to synchronize Binlog from Kafka to Hive.

Kafka2Hive

The management of the whole Kafka2Hive task is carried out under the ETL framework of Meituan data platform, including the expression of task primitives and scheduling mechanism, which is similar to other ETL. The bottom layer uses LinkedIn's open source project Camus, and has carried out targeted secondary development to complete the real Kafka2Hive data transmission work.

The secondary development of Camus

The Binlog stored on Kafka does not have Schema, while the Hive table must have Schema, and its partitions, fields, and so on are designed to facilitate efficient consumption downstream. The first modification to Camus is to parse the Binlog on Kafka into a format that conforms to the target Schema.

The second transformation to Camus is determined by Meituan's ETL framework. In our task scheduling system, at present, we only analyze the upstream and downstream dependencies of tasks with the same scheduling queue, and it is impossible to establish dependencies across scheduling queues. In the entire process of MySQL2Hive, Kafka2Hive tasks need to be executed every hour (hour queue) and Merge tasks are executed once a day (day queue). The startup of the Merge task must strictly rely on the completion of the hourly Kafka2Hive task.

To solve this problem, we introduced the Checkdone task. The Checkdone task is a daily task, which is mainly responsible for checking whether the Kafka2Hive of the previous day has been completed successfully. If it completes successfully, the Checkdone task executes successfully so that the downstream Merge task can start correctly.

Detection Logic of Checkdone

How is Checkdone detected? After each Kafka2Hive task successfully completes the data transfer, Camus is responsible for recording the startup time of the task in the corresponding HDFS directory. Checkdone scans all the timestamps of the previous day. If the maximum timestamp exceeds 0, the previous day's Kafka2Hive tasks are completed successfully, so Checkdone completes the detection.

In addition, because Camus itself only completes the process of reading the Kafka and then writing the HDFS file, it must also complete the loading of the Hive partition in order to make the downstream query. Therefore, the final step of the entire Kafka2Hive task is to load the Hive partition. Only in this way can the whole task be considered a successful execution.

Each Kafka2Hive task is responsible for reading a specific Topic and writing the Binlog data to a table under the original_ binlog library, that is, the original_binlog.db in the previous figure, where all the Binlog corresponding to a MySQL DB are stored.

The figure above illustrates the directory structure of the files on the HDFS after a Kafka2Hive is completed. If a MySQL DB is called user, the corresponding Binlog is stored in the original_ binlog. user table. In the ready directory, the start time of all successful Kafka2Hive tasks of the day is stored on a daily basis for Checkdone use. The Binlog of each table is organized into a partition, such as the Binlog of the userinfo table, stored in the partition table_name=userinfo. Under each table_name first-level partition, organize the second-level partition by dt. The xxx.lzo and xxx.lzo.index files in the figure store lzo-compressed Binlog data.

Merge

After Binlog has successfully entered the stock, the next step is to restore the MySQL data based on Binlog. The Merge process does two things, first storing the Binlog data generated that day in the Delta table, and then making a Merge based on the primary key with the existing stock data. The data in the Delta table is the latest data of the day. When a piece of data changes several times in a day, only the data after the last change is stored in the Delta table.

In the process of Merge Delta data and stock data, a unique key is needed to determine whether it is the same piece of data. If the same data appears not only in the inventory table, but also in the Delta table, indicating that this data has been updated, then select the data of the Delta table as the final result; otherwise, it means that there is no change, and retain the data in the original table as the final result. The result data of Merge will be Insert Overwrite to the original table, that is, origindb.table in the figure.

Example of Merge process

Let's use an example to illustrate the process of Merge.

The data table has two columns, id and value, where id is the primary key. When extracting Delta data, only the last updated item is selected for multiple updates to the same piece of data. So for id=1 data, the last updated value value=120 is recorded in the Delta table. After the Merge of Delta data and stock data, a new piece of data (id=4) is inserted in the final result, two pieces of data are updated (id=1 and id=2), and one piece of data remains unchanged (id=3).

By default, we use the primary key of the MySQL table as the only key for this judgment, and the business can also configure a unique key that is different from the MySQL according to the actual situation.

The overall architecture of Binlog-based data acquisition and ODS data restoration is introduced above. The following mainly introduces the actual business problems we have solved from two aspects.

Practice 1: the support of sub-database and sub-table

With the expansion of the scale of business, the sub-database and sub-table of MySQL is more and more, and the number of sub-tables of many services is in the order of thousands. The general data developer students need to aggregate these data together for analysis. If we manually synchronize each sub-table and then aggregate it on Hive, this cost is difficult for us to accept. Therefore, we need to complete the aggregation of sub-tables at the ODS layer.

First of all, when collecting Binlog in real time, we support writing Binlog of different DB to the same Kafka Topic. When you apply for Binlog collection, you can check multiple physical DB under the same business logic. Through the collection in the Binlog collection layer, the Binlog of all sub-libraries is written to the same Hive table, so that when Merge is carried out downstream, only one Hive table needs to be read.

Second, the configuration of the Merge task supports regular matching. By configuring regular expressions that conform to the naming rules of business subtables, the Merge task can know which Binlog of MySQL tables it needs to aggregate, and then select the data of the corresponding partition for execution.

In this way, through the work of the two levels, the merging of sub-libraries and sub-tables in the ODS layer is completed.

There is a technical optimization. During Kafka2Hive, we deal with the table name according to the business sub-table rules, and convert the physical table name to the logical table name. For example, the table name of userinfo123 is converted to userinfo, and its Binlog data is stored in the table_name=userinfo partition of the original_binlog.user table. The goal is to prevent the underlying stress caused by too many small HDFS files and Hive partitions.

Practice 2: delete event support

Delete operations are very common in MySQL. Because Hive does not support Delete, if you want to delete the data deleted in MySQL from Hive, you need to do it in a "roundabout" way.

For the Merge process that needs to handle Delete events, take the following two steps:

First, extract the data from which the Delete event occurred, which is easy to do because the Binlog itself records the event type. Left outer join the stock data (Table A) and the deleted data (Table B) on the primary key. If you can join all the data to both sides, it means that the data has been deleted. Therefore, the data recorded as NULL corresponding to Table B in the result is selected, that is, the data that should be retained.

Then, follow the process described above to do a regular Merge of the retained data obtained above.

Thank you for reading! This is the end of this article on "sample Analysis of DB data synchronization to data Warehouse". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report