Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to achieve data consistency and real-time extraction synchronously based on logs?

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

I. background

The story starts with the needs of the company some time ago. We all know that Yixin is a financial technology company. Many of our data are different from standard Internet companies. Generally speaking, they are:

People who play with data know that data is very valuable, and then the data is stored in the database of each system, how to make the users who need data get consistent, real-time data?

There are several common practices in the past, which are:

DBA opens up the repository of each system, and users extract the required data during the business trough (such as at night). Because the extraction time is different, the data of each data user is inconsistent, the data conflicts, and repeated extraction, I believe that many DBA have a headache. The company's unified big data platform extracts data from each system through Sqoop during the business trough, saves it to the Hive table, and then provides data services for other data users. This approach solves the problem of consistency, but the timeliness is poor, which is basically the limitation of Thousand 1. The main problem of obtaining incremental changes based on trigger is that the business side is intrusive, and trigger also brings performance loss.

None of these plans are perfect. After understanding and considering different implementation methods, we finally draw lessons from the idea of linkedin, and think that in order to solve the problem of data consistency and real-time at the same time, a more reasonable method should come from log.

(this figure is from: https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)

Take incremental Log as the basis of all systems. Subsequent data users consume log by subscribing to kafka.

For example:

Big data users can save data to Hive tables or Parquet files to Hive or Spark queries; users providing search services can save them to Elasticsearch or HBase; users providing caching services can cache logs to Redis or alluxio; users of data synchronization can save data to their own databases Because kafka logs can be consumed repeatedly and cached for a period of time, users can consume kafka logs to maintain consistency with the database and ensure real-time performance.

Why use log and kafka as the basis instead of Sqoop for extraction? Because:

Why not use dual write (double write)? , please refer to https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

There are no more explanations here.

II. Overall structure

So we came up with the idea of building a company-level platform based on log.

Let's explain the DWS platform. The DWS platform consists of three sub-projects:

Dbus (data bus): responsible for real-time extraction of data from the source side, and conversion into the agreed json format data with schema (UMS data), into kafka; Wormhole (data exchange platform): responsible for reading data from kafka and writing data to the target; Swifts (real-time computing platform): responsible for reading data from kafka, real-time computing, and writing data back to kafka.

In the figure:

Log extractor and dbus work together to complete data extraction and data conversion, including full and incremental extraction. Wormhole can save all log data to HDFS; it can also land data in all databases that support jdbc, and go to HBash,Elasticsearch,Cassandra; Swifts supports streaming computing in configuration and SQL, including streaming join,look up,filter,window aggregation and other features; Dbus web is the configuration management side of dbus, and rider also includes run-time management of Wormhole and Swifts, data quality verification, and so on.

Due to time constraints, I'll focus on Dbus and Wormhole in DWS today, with Swifts as needed.

III. Log parsing of dbus solution 3.1

As mentioned earlier, the main problem of Dbus is to extract logs from the source in real time. Here we take MySQL as an example to briefly illustrate how to implement it.

We know that although MySQL InnoDB has its own log,MySQL active / standby synchronization is achieved through binlog. As shown below:

Picture from: https://github.com/alibaba/canal

Binlog has three modes:

Row mode: each row of data is recorded in the log as modified, and then the same data is modified on the slave side. Statement mode: every sql that modifies the data is recorded in the bin-log of the master. When slave is copied, the SQL process parses to the same SQL that was executed on the original master side and executes again. Mixed mode: MySQL distinguishes the log form of the record based on each specific sql statement executed, that is, choosing between Statement and Row.

Their respective advantages and disadvantages are as follows:

Here from: http://www.jquerycn.cn/a\_13625

Due to the shortcomings of statement mode, in the process of communicating with our DBA, we learned that row mode is used for replication in the actual production process. This makes it possible to read full logs.

Usually, our MySQL layout adopts the solution of 2 master master libraries (vip) + 1 slave slave database + 1 backup disaster recovery library. Because the disaster recovery database is usually used for remote disaster recovery, the real-time performance is not high and it is not easy to deploy.

In order to minimize the impact on the source side, it is obvious that we should read the binlog log from the slave from the library.

There are many solutions to read binlog, and there are many on github. Refer to https://github.com/search?utf8=%E2%9C%93&q=binlog. In the end, we chose Ali's canal as the bit log extractor.

Canal was first used to synchronize computer rooms in Ali China and the United States. The principle of canal is relatively simple:

Canal simulates MySQL Slave's interaction protocol, disguises itself as MySQL Slave, sends dump protocol MySQL master to MySQL Slave, receives dump request, and starts pushing binary log to Slave (that is, canal) Canal parsing binary log object (originally byte stream)

Picture from: https://github.com/alibaba/canal

3.2 solution

The main solutions for the MySQL version of Dbus are as follows:

For incremental log, we get the incremental log of MySQL by subscribing to Canal Server:

According to the output of Canal, the log is in protobuf format, and an incremental Storm program is developed to convert the data in real time to our defined UMS format (json format, which I will describe later) and save it to kafka; the incremental Storm program is also responsible for capturing schema changes to control the version number; and the configuration information of incremental Storm is saved in Zookeeper to meet the high availability requirements. Kafka serves both as an output and as a buffer and message destructor during processing.

When considering using Storm as a solution, we mainly think that Storm has the following advantages:

The technology is relatively mature and stable, and it can be regarded as a standard combination with kafka; it is real-time and can meet the real-time requirements; it can meet the needs of high availability; it can expand the activity performance by configuring the concurrency of Storm.

For pipelined tables, incremental parts are sufficient, but many tables need to know the initial (existing) information. At this point we need initial load (load for the first time).

For initial load (loading for the first time), the full extraction Storm program is also developed to pull from the standby database of the source database through jdbc connection. Initial load pulls all the data, so we recommend it during the business trough. Fortunately, you only do it once, you don't have to do it every day.

Full extraction, we draw lessons from the idea of Sqoop. The full extraction Storm is divided into two parts:

Actual extraction of data fragments

Data sharding needs to consider the sharding column, sharding the data according to the range according to the configuration and automatic selection column, and saving the sharding information to kafka.

Here are the specific sharding strategies:

The fully extracted Storm program reads the fragment information of kafka and uses multiple concurrent degrees to connect to the database standby database in parallel to pull. Because the extraction time may be very long. The real-time status is written to Zookeeper during the extraction process, which is convenient for heartbeat program monitoring.

3.4 Unified message format

Whether incremental or full, the final message output to kafka is a unified message format we have agreed on, called UMS (unified message schema) format.

As shown in the following figure:

The schema part of the message defines that namespace can describe all the tables of the whole company by type + data source name + schema name + table name + version number + partial library number + sub-table number, and can be uniquely located by a namespace.

_ ums_op_ indicates that the type of data is I (insert), U (update), D (delete); the timestamp of the event in which the addition, deletion and modification occurred in _ ums_ts_, obviously the timestamp of the occurrence of the new data is updated; the unique id of the _ ums_id_ message ensures that the message is unique, but here we guarantee the sequence of messages (explained later)

Payload refers to specific data. A json package can contain one or more pieces of data, increasing the payload of the data.

The data types supported in UMS, which refer to the Hive type and are simplified, basically contain all data types.

3.5 consistency between full volume and increment

In the whole data transmission, in order to ensure the sequence of log messages as much as possible, we use a partition method for kafka. In general, it is basically sequential and unique.

But we know that writing kafka will fail, it is possible to rewrite, and Storm also uses the redo mechanism, so we don't strictly guarantee exactly once and complete sequence, but we guarantee at least once.

So _ ums_id_ becomes particularly important.

For full extraction, _ ums_id_ is unique. Different id regions are taken from each concurrency of zk to ensure uniqueness and performance. Enter negative numbers, do not conflict with incremental data, and ensure that they are earlier than incremental messages.

For incremental extraction, we use the log file number + log offset of MySQL as the unique id. Id is a 64-bit long integer, with the highest 7 bits for the log file number and the lower 12 bits for the log offset.

For example: 000103000012345678. 103 is the log file number and 12345678 is the log offset.

In this way, physical uniqueness is guaranteed at the log level (the id number remains the same even if the redo is done), as well as sequencing (and the ability to locate logs). By comparing the _ ums_id_ consumption log, you can know which message is updated by comparing _ ums_id_.

In fact, the intention of _ ums_ts_ is similar to that of _ ums_id_, except that sometimes _ ums_ts_ may repeat, that is, multiple operations occur in 1 millisecond, so you have to compare _ ums_id_.

3.6 heartbeat monitoring and early warning

The whole system involves various links such as database synchronization, Canal Server, multiple concurrency Storm processes and so on.

Therefore, the monitoring and early warning of the process is particularly important.

Through the heartbeat module, such as inserting a state of mind data per minute (configurable) into each extracted table and saving the sending time, the heartbeat table is also extracted, following the whole process, and actually following the same logic as the synchronized table (because multiple concurrent Storm may have different branches), when the heartbeat packet is received, even if there is no added or deleted data, it can prove that the whole link is open.

The Storm program and the heartbeat program send the data to the public statistical topic, and then the statistical program saves the data to influxdb, and displays it using grafana. You can see the following results:

The picture shows the real-time monitoring information of a business system. The above is the real-time traffic, and the following is the real-time delay. As you can see, the real-time performance is very good. Basically, the data is already in the end kafka in 1 to 2 seconds.

Granfana provides a real-time monitoring capability.

If there is a delay, an email alarm or an SMS alarm is sent through the heartbeat module of dbus.

3.7 Real-time desensitization

Considering data security, Dbus's full storm and incremental storm programs also complete real-time desensitization for scenarios with desensitization requirements. There are three ways to desensitize:

To sum up: to put it simply, Dbus is to export data from various sources in real time, and provide subscriptions in the way of UMS, supporting real-time desensitization, actual monitoring and alarm.

IV. Wormhole solution

After talking about Dbus, it's time to talk about Wormhole. Why are two projects not one, but docking through kafka?

One of the big reasons is decoupling, kafka has a natural decoupling ability, the program can directly do asynchronous message transmission through kafka. Kafka is also used internally by Dbus and Wornhole for messaging and decoupling.

Another reason is that UMS is self-describing, by subscribing to kafka, any capable user to directly consume UMS for use.

Although the results of UMS can be subscribed directly, it still requires development work. The solution of Wormhole is to provide one-click configuration to land the data in kafka into various systems, so that the data users who have no development ability can use the data through wormhole.

As shown in the figure, Wormhole can ground the UMS in kafka to a variety of systems, the most commonly used HDFS,JDBC database and HBase.

On the technology stack, wormhole chose to use spark streaming.

In Wormhole, a flow refers to a namaspace from the source side to the target side. One spark streaming serves multiple flow.

The reasons for choosing Spark are strong:

Spark naturally supports a variety of heterogeneous storage systems; although Spark Stream has a slightly lower latency than Storm, Spark has better throughput and better computing performance; Spark has more flexibility in supporting parallel computing; Spark provides a unified function to solve Sparking Job,Spark Streaming,Spark SQL in the technology stack, which is convenient for later development.

Here is an addition to the role of Swifts:

The essence of Swifts is to read the UMS data in kafka, do real-time calculation, and write the result to another topic of kafka. Real-time computing can be done in many ways, such as filtering filter,projection (projection), lookup, streaming join window aggregation, and can complete a variety of streaming real-time computing with business value.

Wormhole and Swifts are compared as follows:

4.1Luo HDFS

To consume the UMS of kafka through the Wormhole Wpark Streaming program, first the UMS log can be saved to HDFS.

Kafka generally saves only a few days of information, not all information, while HDFS can save all historical additions, deletions and changes. This makes a lot of things possible:

By replaying the logs in HDFS, we can restore historical snapshots at any time. You can make a zipper table to restore each recorded historical information to facilitate analysis; when there is an error in the program, you can re-consume the message and re-form a new snapshot through backfill.

It can be said that the log in HDFS is the basis of many things.

Since Spark natively supports parquet well, Spark SQL can provide a good query for Parquet. The landing of UMS on HDFS is saved to the Parquet file. The content of Parquet is that all the additions, deletions and modifications of log as well as _ ums_id_,_ums_ts_ are saved.

Wormhole spark streaming stores the data distribution in different directories according to namespace, that is, different tables and versions are placed in different directories.

Because the Parquet written each time is a small file, we all know that HDFS is not good for small files, so there is also a job that merges these Parquet files into large files at a fixed time every day.

Each Parquet file directory has a start time and an end time for the file data. In this way, when recharging the data, we can decide which Parquet files need to be read according to the selected time range, and do not have to read all the data.

4.2 idempotency of inserting or updating data

The requirement we often encounter is to process the data and land it in a database or HBase. So one of the questions involved here is, what kind of data can be updated to the data?

One of the most important principles here is the idempotency of data.

No matter when it comes to adding, deleting or modifying any data, the problem we face is:

Which line should be updated and what is the update strategy.

For the first question, you need to locate the data to find a unique key, and the common ones are:

Use the primary key of the business library; several columns are specified by the business side as joint unique indexes

For the second question, it involves _ ums_id_, because we have guaranteed a large value update of _ ums_id_, so replace and update according to this principle after finding the corresponding data row.

The reason for soft deletion and the addition of the _ is_active_ column is for such a situation:

If the inserted _ ums_id_ is relatively large, it is deleted data (indicating that the data has been deleted). If it is not soft delete, insert a small _ ums_id_ data (old data) at this time, and it will actually be inserted.

This causes the old data to be inserted. It's not idempotent anymore. Therefore, it is valuable to retain the deleted data (soft deletion), and it can be used to ensure the idempotency of the data.

4.3Preservation of HBase

Inserting data into Hbase is fairly simple. The difference is that HBase can retain multiple versions of data (or only one version, of course). The default is to keep three versions.

So the problem you need to solve when inserting data into HBase is:

Choosing the appropriate rowkey:Rowkey design is optional, and the user can select the primary key of the source table or select several columns as the federated primary key. Choose the appropriate version: use a large offset of _ ums_id_+ (such as 10 billion) as the version of the row.

The choice of Version is very interesting, taking advantage of the uniqueness and self-increment of _ ums_id_, which is consistent with the comparison of version itself: larger version is equivalent to larger _ ums_id_, and the corresponding version is newer.

From a performance point of view, we can insert the entire Dataset collection of Spark Streaming directly into HBase without comparison. Let HBase automatically determine for us which data can be retained and which data does not need to be retained based on version.

Jdbc insert data: insert data into the database to ensure that although the principle of idempotency is simple, the implementation becomes much more complex to improve performance. You can't compare and then insert or update one by one.

We know that the RDD/dataset of Spark is operated as a set to improve performance, and we need to achieve idempotency in the way of set operation.

The specific ideas are:

First, query the target database according to the primary key in the collection to get an existing data set; compared with the collection in dataset, it can be divided into two categories:

A: data that does not exist, that is, this part of the data insert can

B: compare the existing data, compare which _ ums_id_, will only update the larger row to the target database, and discard the smaller ones directly.

Students who use Spark know that RDD/dataset can be partition, and you can use multiple worker and operate to improve efficiency.

In the case of concurrency, both inserts and updates may fail, so there is also a strategy to consider the failure.

For example: because other worker has been inserted, then because the uniqueness constraint insertion failed, then need to update instead, but also compare _ ums_id_ to see if it can be updated.

Wormhole also has a retry mechanism for other situations that cannot be plugged in, such as a problem with the target system. Insert into other storage is not introduced, the general principle is: according to their own characteristics of storage, the design of set-based, concurrent insert data implementation. These are all Wormhole's performance efforts, and users who use Wormhole don't have to worry about it.

Fifth, use case 5.1 real-time marketing

After all that has been said, what is the practical use of DWS? Next I will introduce the real-time marketing implemented by a system using DWS.

As shown in the above figure:

The data of system An is saved in its own database. We know that Citic provides a lot of financial services, including borrowing, and credit checking is very important in the borrowing process.

Borrowers need to provide information that proves credit value, such as the central bank's credit report, which has the strongest credit data. And bank flow, online shopping flow is also a strong credit attribute of the data.

When the borrower fills in the credit information in system A through Web or mobile phone APP, he may not be able to continue for some reason. although the borrower may be a high-quality potential customer, he can't know this information for a long time before, so in fact, such customers are lost.

After the application of DWS, the information that the borrower has filled in has been recorded in the database, and real-time extraction, calculation and landing to the target database through DWS. According to the score of the customer, evaluate the high-quality customer. Then immediately output the customer's information to the customer service system.

The customer service staff contacted the borrower (potential customer) by phone in a very short time (within a few minutes) to conduct customer care and turn the potential customer into a real customer. We know that borrowing is timely and worthless if it takes too long.

None of this can be achieved without the ability to extract / calculate / store in real time.

5.2 Real-time report system

Another application of real-time reports is as follows:

Our data users' data comes from multiple systems. In the past, we used to get the report information through Tunable 1, and then guide the operation of the next day, which is not timely.

Through DWS, real-time data extraction from multiple systems, calculation and landing, and provide report display, so that the operation can make timely deployment and adjustment, rapid response.

Sixth, summarize that DWS technology is based on the mainstream real-time streaming big data technology framework, with high availability, large throughput, strong horizontal capacity expansion, low latency and high fault tolerance. DWS supports heterogeneous multi-source multi-target systems, multi-data formats (structured and semi-structured unstructured data) and real-time technology capabilities. DWS merges three sub-projects as a platform, which enables us to have the real-time ability to drive a variety of real-time scene applications. Suitable scenarios include: real-time synchronization / real-time computing / real-time monitoring / real-time report / real-time analysis / real-time insight / real-time management / real-time operation / real-time decision-making

Author: Wang Dong

Source: Yixin Institute of Technology

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report