In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Wen | Lu Peng, DataPipeline architect
In the era of big data, real-time homework has a more and more important position. This paper will explain the real-time data flow practice of DataPipeline on big data platform from the following parts.
I. main problems and challenges faced by enterprise-level data
1. The amount of data continues to rise.
With the vigorous development of Internet + and the rapid expansion of the scale of users, the amount of enterprise data is also growing rapidly, the amount of data in GB as a unit, gradually began to TB/GB/PB/EB, or even ZB/YB and so on. At the same time, big data is also going deep into the financial, retail, manufacturing and other industries, playing a more and more important role.
two。 The requirements of data quality are constantly improving.
At present, the popular AI and data modeling have high requirements for data quality. Especially in the financial field, the requirement for data quality is very high.
3. Complexity of data platform architecture
The change of enterprise application architecture varies with the size of the enterprise. Small-scale enterprises with few users and small amount of data may be able to do so with only one MySQL; medium-sized enterprises, with the increase of business volume, may need to make the main database as OLTP and the standby database as OLAP. When the enterprise enters the scale, the amount of data is very large, and the original OLTP may not be satisfied. At this time, we will make some strategies to ensure the isolation of OLTP and OLAP, and the separation of business systems and BI systems will not affect each other. But after isolation, it brings a new difficulty, the need for real-time synchronization of data streams. At this time, enterprises need a scalable and reliable streaming transmission tool.
II. Practical cases on big data's platform
The following figure is a typical BI platform design scenario, taking MySQL as an example, how DataPipeline implements the SourceConnector of MySQL. When MySQL is used as the Source side:
Total + increment
Full amount: load the data into kafka by select
Increment: the way to read binlog in real time
When using binlog, you need to note that row mode is turned on and image is set to full.
1. The realization of real-time synchronization of full quantity + increment of MySQL SourceConnector
The following is the specific implementation flow chart, first open the repeatable read transaction to ensure that the data can be read before the read lock is executed. Then perform the flush table with read lock operation and add a read lock to prevent new data from entering the reading of the affected data at this time. When we start a truncation with snapshot, we can record the offset of the current binlog and mark a snapshot start. In this case, the offset is the offset that starts with the incremental read. When the transaction starts, you can read all the data. At this point, record marker writes the generated record to kafka, and then commit the transaction. When the full data push is completed, we unlock the read lock and mark the snapshot stop. At this time, all the data has entered the kafka, and then the incremental data synchronization starts from the previously recorded offset.
2. What optimization work has DataPipeline done?
1) in the past, the link of data synchronization is divided into full synchronization and incremental synchronization, and full synchronization is a batch processing. In batch processing, we all carry out all or nothing processing, but when big data, a batch will take quite a long time, and the longer the time is, the more difficult it is to ensure reliability, so it is often broken, and a reprocessing will make many people collapse. DataPipeline solves this pain point and resumes the breakpoint by managing the position during data transmission. at this time, when a large-scale data task has an accident, it can resume the broken point to continue the previous task, which greatly shortens the synchronization time and improves the efficiency of synchronization.
2) when synchronizing multiple tasks, it is difficult to balance the pressure of data transmission on the source side and the real-time performance of the destination side, especially in the case of large data volume. At this time, DataPipeline does a large number of related tests to optimize different connection pools, open the customization of data transmission efficiency, for customers to customize appropriate transmission tasks for their own business systems, and optimize and adjust the transmission of different kinds of databases. To ensure the efficiency of data transmission.
3) the conversion of custom heterogeneous data types, often open source big data transmission tools such as sqoop, the support for heterogeneous data types is not flexible, and the types are not complete. Like the scene in the financial field that requires high data precision, the loss of precision caused by the transmission of traditional database to big data platform is a big problem. DataPipeline supports more data types, such as complex types supported by hive, decimal and timestamp, etc.
3. Hive on Sink
1) characteristics of Hive
Hive internal and external tables
Rely on HDFS
Support for transactional and non-transactional
Multiple compression formats
Partition, bucket.
2) problems with Hive synchronization
How to ensure real-time writing?
What should I do if I schema change?
How do I expand the format I want to save?
How to implement multiple partitioning methods?
What if the synchronization is interrupted?
How can I make sure my data is not lost?
3) Hive synchronization practice of KafkaConnect HDFS
Use appearance: Hive external table, can improve writing efficiency, directly write HDFS, reduce IO consumption, while the inner table will be one more IO than the outside
Schema change: the current practice is that the destination side changes according to the changes of the source side. When the columns are added and deleted, the destination side will change with the source side.
Currently supported storage formats: parquet,avro, csv
Plug-in partitioner provides a variety of partitioning methods, such as Wallclock RecordRecordField:wallclock uses the system time when writing to the hive side, Record uses the time when record is generated when reading, RecordField uses user-defined timestamps to define partitions, and customizable partitioner will be implemented in the future to meet different needs
Recover mechanism ensures that data will not be lost after interruption
The WAL (Write-AheadLogging) mechanism is used to ensure the data consistency at the data destination.
4) the mechanism of Recover
Recover is a recovery mechanism, and different problems may occur in the stage of data transmission, such as network problems and so on. When there is a problem, we need to restore data synchronization, so how does recover ensure that the normal transmission of data is not lost? When recover starts, we get the lease of the target file on hdfs, and if the current HDFS file that needs to be read and written is occupied, we need to wait for it until we can get the lease. When we get the lease, we can start reading the log we wrote before, if we create a new log for the first time, mark a begin, and record the kafka offset at that time. At this point, you need to clean up the temporary data left over before, and then restart the synchronization until the end of the synchronization marks an end. If it is not finished, it is equivalent to being in progress, and the current synchronized offset will be submitted every time in progress to ensure that it will be rolled back to the previous offset in the event of an accident.
5) WAL (Write-Ahead Logging) mechanism
In fact, the core idea of the Write-Ahead Logging mechanism is that it writes a temporary file before the data is written to the database. When a batch ends, the temporary file is renamed to an official file to ensure the consistency of the official document after each submission. If a write error occurs midway, the temporary file will be deleted and rewritten, which is equivalent to a rollback. The synchronization of hive mainly uses this implementation to ensure consistency. First, it synchronizes the data to be written to the temporary HDFS file, ensuring that the data of a batch is normal and then renamed to the official file. The official file name will contain kafka offset, for example, the file name of an avro file is xxxx+001+0020.avro, which means that there are 20 pieces of data from offset 1 to 20 in the current file.
4. GreenPlum on Sink
GreenPlum, is a MPP architecture data warehouse, the bottom by multiple postgres databases as computing nodes, good at OLAP, as a BI data warehouse has a good performance.
1) DataPipeline synchronization practice and optimization strategy for GreenPlum
Greenplum supports a variety of data loading methods. Currently, we use the copy loading method.
Batch processing improves the writing efficiency of the Sink end. Without insert and update operations, batch loading is performed by using delete + copy.
Multithreading plus preloading mechanism:
➢ records a separate offset for each table that needs to be synchronized. When the whole task fails, it can be recovered separately.
➢ uses a thread pool to manage threads that load data, each synchronized table has a separate thread to load data, and multiple tables synchronize at the same time.
During the time of loading data, ➢ consumes kafka in advance and caches a processed data set. When a thread finishes loading data, a new thread starts loading data immediately, reducing the time of processing loaded data.
The way of delete + copy can ensure the final consistency of data.
A table with a primary key on the source side can merge the data that needs to be synchronized in a batch through the primary key. For example, a batch of data that needs to be synchronized contains a piece of insert data, followed by update, then it does not need to be synchronized twice, and the data can be updated to the status copy to gp after update.
Synchronous GreenPlum should pay attention to: because the file is written through copy, the file needs to be structured data. Typically, when using CSV,CSV to write, you should pay attention to spiltquote,escapequote to avoid data dislocation. For the problem of update primary key, when the source side is a primary key of update, the primary key before update needs to be recorded and deleted at the target side. There is also the problem of\ 0 special characters, because the core is in C language, so\ 0 needs to be specially dealt with during synchronization.
III. Future work of DataPipeline
1. At present, we have encountered some problems with kafka connect rebalance, so we have modified it. The previous rebalance mechanism is that if we add or delete a task, it will lead to the whole cluster rebalance, which causes a lot of unnecessary overhead and frequent rebalance is not conducive to the stability of data synchronization tasks. So we transformed the rebalance mechanism into a sticky mechanism:
When we add a new task, we will check all the worker usage is relatively low, when the worker has less task, we can only add it to a relatively small number of worker, and we do not need to do a full balance, of course, there may still be some unbalanced waste of resources, which we can tolerate, at least less than the cost of doing a full rebalance.
If you delete a task, the previous mechanism is to delete a task will also do a full amount of Rebalance, the new mechanism will not trigger rebalance. At this time, if it takes a long time, it will also cause a resource imbalance, so we can automate all the clusters by rebalance.
If a node in the cluster goes down, what about the task of that node? We will not immediately allocate the task on this node, but will wait for 10 minutes, because sometimes it may only be a brief connection timeout, and it will be restored after a period of time. If we do a rebalance based on this, it may not be worth it. When the node still does not recover after waiting for 10 minutes, we will do rebalance to assign the failed node task to other nodes.
two。 The consistency of the data on the source side can be ensured through the mechanism of WAL at present.
3. Synchronization optimization under large amount of data and improve the stability of synchronization.
IV. Summary
1. In big data era, enterprise data integration is mainly faced with a variety of complex architectures, and the requirements for ETL to deal with these complex systems are getting higher and higher. All we can do is weigh the pros and cons and choose a framework that meets the business needs.
2. Kafka Connect is suitable for services with large amount of data and real-time requirements.
3. Based on the excellent characteristics of Kafka Connect, the timeliness and synchronization efficiency of data can be improved according to different characteristics of data warehouse.
4. DataPipeline has reformed and optimized the pain points of large-scale real-time data flow in enterprises. First of all, the guarantee of data end-to-end consistency is encountered by almost all enterprises in the process of data synchronization. At present, it has achieved the optimization and transformation in rebalance in the framework based on kafka connect.
-end-
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.