In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the Flink-based MQ-Hive real-time data integration how to achieve byte jump, the article is very detailed, has a certain reference value, interested friends must read it!
In the process of data center construction, a typical data integration scenario is to import the data of MQ (Message Queue, such as Kafka, RocketMQ, etc.) into Hive for downstream data warehouse construction and index statistics. Because MQ-Hive is the first layer of data warehouse construction, it requires high accuracy and real-time performance of data.
This paper mainly focuses on the MQ-Hive scenario, aiming at the pain points of the existing solutions in the byte runout, proposes a real-time solution based on Flink, and introduces the application status of the new scheme in the byte runout.
There are plans and pain points.
The existing solution in byte jump is shown in the following figure, which consists of two steps:
Write MQ data to HDFS file through Dump service
Then import the HDFS data into Hive through Batch ETL and add Hive partitions
The pain point task chain is long, and the original data needs to be converted many times before it can be entered into Hive. The delay of Dump Service and Batch ETL will lead to delayed storage of final data output and high computing overhead. The repeated storage and calculation of MQ data is based on native Java. After the data traffic continues to grow, there are some problems such as single point failure and unbalanced machine load. Architecturally unable to reuse existing infrastructure such as Hadoop/Flink/Yarn within the company does not support remote disaster recovery.
Real-time solution based on Flink
In view of the pain points of the company's traditional solutions, we propose a real-time solution based on Flink, which writes MQ data to Hive in real time, and supports event time and Exactly Once semantics. Compared with the old scheme, the advantages of the new scheme are as follows:
Based on streaming engine Flink development, it supports higher semantic real-time performance of Exactly Once, MQ data goes directly into Hive, and no intermediate computing link reduces intermediate storage. The whole process data will only support Yarn deployment mode once, making it convenient for users to migrate resource management flexibility, convenient for expansion and operation and maintenance to support dual computer room disaster recovery. The overall architecture is shown in the following figure, which mainly includes DTS (Data Transmission Service) Source, DTS Core and DTS Sink modules. The specific functions are as follows:
DTS Source connects to different MQ data sources, supports DTS Sink such as Kafka and RocketMQ to output data to the target data source, supports DTS Core such as HDFS and Hive to run through the whole data synchronization process, reads the source data through Source, processes it with DTS Framework, and finally outputs the data to the target side through Sink. Core functions such as DTS Framework integrated type system, file segmentation, Exactly Once, task information collection, event time and dirty data collection support Yarn deployment mode, and resource scheduling and management are more flexible.
The DTS Dump architecture diagram Exactly OnceFlink framework can provide Exactly Once or At Least Once semantics through the Checkpoint mechanism. In order to realize the full link of MQ-Hive to support Exactly-once semantics, it is also necessary for MQ Source and Hive Sink to support Exactly Once semantics. This paper is implemented by Checkpoint + 2PC protocol, and the specific process is as follows:
When the data is written, the Source side pulls the data from the upstream MQ and sends it to the Sink side; the Sink side writes the data to the temporary directory in the Checkpoint Snapshot stage, and the Source side saves the MQ Offset to the State; the Sink side closes the written file handle and saves the current Checkpoint ID to the State; in the Checkpoint Complete phase, the Source side Commit MQ Offset The Sink side moves the data in the temporary directory to the Checkpoint Recover stage under the official directory, loads the latest successful Checkpoint directory and recovers the State information, where the Source side takes the MQ Offset saved in the State as the starting location The Sink side restores the latest successful Checkpoint ID and moves the data from the temporary directory to the official directory for optimization. In practical use scenarios, especially in large concurrency scenarios, HDFS write latency is prone to burrs because individual Task Snapshot timeouts or failures lead to the failure of the entire Checkpoint. Therefore, in view of Checkpoint failure, it is more important to improve the fault tolerance and stability of the system.
Here, we make full use of the strict monotonous increment of Checkpoint ID. Every time you do Checkpoint, the current Checkpoint ID must be larger than before, so in the Checkpoint Complete phase, you can submit temporary data that is less than or equal to the current Checkpoint ID. The specific optimization strategies are as follows:
The temporary directory on the Sink side is {dump_path} / {next_cp_id}, where next_cp_id is defined as the latest cp_id + 1Checkpoint Snapshot stage. The Sink side saves the latest cp_id to State, and updates the next_cp_id to the cp_id + 1Checkpoint Complete stage. The Sink side moves all data less than or equal to the current cp_id in the temporary directory to the Checkpoint Recover stage under the official directory, and the Sink side restores the latest successful cp_id. And move the data smaller than or equal to the current cp_id in the temporary directory to the official directory type system. Due to the different data types supported by different data sources, in order to solve the problem of data synchronization between different data sources and compatibility of different type conversions, we support the DTS type system. DTS types can be refined into basic types and composite types, where composite types support type nesting. The specific conversion process is as follows:
On the Source side, the source data type is uniformly converted to the DTS type within the system on the Sink side, and the DTS type within the system is converted into the target data source type. The DTS type system supports the conversion between different types, such as the conversion between String types and Date types.
The Rolling PolicySink side of the DTS Dump architecture diagram is concurrent writes, and each Task handles different traffic. In order to avoid generating too many small files or generating files too large, it is necessary to support a custom file segmentation strategy to control the size of a single file. Currently, three file segmentation strategies are supported: file size, maximum unupdated time of a file, and Checkpoint. ■ optimization strategy Hive supports Parquet, Orc, Text and other storage formats. The data writing process for different storage formats is different. Specifically, it can be divided into two categories:
RowFormat: based on a single write, HDFS Truncate operations are supported according to Offset, such as Text format BulkFormat: write based on Block, HDFS Truncate operations are not supported, such as Parquet and ORC format
In order to ensure the semantics of Exactly Once and support Parquet, Orc, Text and other formats at the same time, file segmentation is forced in each Checkpoint to ensure that all written files are complete, and there is no need to do Truncate operation when Checkpoint is restored. Fault-tolerant processing ideally, streaming tasks will run all the time and do not need to be restarted, but the following scenarios are inevitable:
Flink computing engine upgrade, need to restart task upstream data increase, need to adjust task concurrency Task Failover ■ concurrency adjustment currently Flink native support State Rescale. In the specific implementation, when Task does Checkpoint Snapshot, save the MQ Offset to ListState; after Job restart, Job Master will distribute the ListState to each Task equally according to the concurrency of Operator. Due to the influence of external factors such as network jitter and write timeout in ■ Task Failover, write failure will inevitably occur in Task. How to do Task Failover quickly and accurately is more important. Currently, Flink natively supports a variety of Task Failover policies. This article uses Region Failover policy to restart all Task of the Region where the failed Task resides. Remote disaster recovery
■ background
In the era of big data, the accuracy and real-time performance of data are particularly important. This article provides a solution for multi-server room deployment and remote disaster recovery. When the main server room is temporarily unable to provide services due to network outage, power outage, earthquake, fire and other reasons, it can quickly switch the service to the disaster preparedness room and guarantee the Exactly Once semantics at the same time.
■ disaster recovery component
The overall solution requires multiple disaster recovery components to be implemented together. The disaster recovery components are shown in the figure below, including MQ, YARN and HDFS, as shown below:
MQ needs to support multi-computer room deployment. When the main computer room fails, the Leader can be switched to the standby computer room, so that the downstream consumer Yarn cluster can be deployed in both the main computer room and the standby computer room, so that the Flink Job migration downstream HDFS needs to support multi-computer room deployment. When the main computer room fails, the Master can be switched to the standby computer room Flink Job to run on the Yarn, while the task State Backend is saved to HDFS, and the multi-computer room of State Backend is guaranteed through HDFS's multi-computer room support.
■ disaster recovery process the overall disaster recovery process is as follows:
Normally, MQ Leader and HDFS Master are deployed in the main server room, and the data is synchronized to the slave server room. At the same time, Flink Job runs in the main computer room and writes the task State to HDFS. Note that State is also a multi-computer room deployment mode. In the case of disaster, MQ Leader and HDFS Master are migrated from the main computer room to the disaster preparedness room, while Flink Job is also migrated to the disaster preparedness room, and the pre-disaster Offset information is restored through State to provide Exactly Once semantics.
Event time archiving ■ background in data warehouse construction, the processing logic of processing time (Process Time) is different from that of event time (Event Time). For processing time, the data will be written to the time partition corresponding to the current system time; for event time, the data will be written to the corresponding time partition according to the production time of the data, which is also referred to as archiving.
In real scenarios, it is inevitable to encounter various upstream and downstream failures and recover after a period of time. If the Process Time processing strategy is adopted, the data during the accident will be written to the time partition after recovery, resulting in the problem of partition hole or data drift. If the archiving strategy is used, it will be written according to the event time, then there is no such problem.
Because the upstream data event time will be out of order, and the Hive partition should not continue to write after it is generated, it is impossible to achieve unlimited archiving in the actual writing process, but can only be archived within a certain time range. The difficulty of archiving lies in how to determine the global minimum archiving time and how to tolerate a certain disorder.
■ global minimum archiving time
The Source side is read concurrently, and a Task may read data from multiple MQ Partition at the same time. For each Parititon of MQ, the current partition archiving time is saved, and the minimum value in the partition is taken as the minimum archiving time of Task, and finally, the minimum value in Task is taken as the global minimum archiving time.
■ out-of-order processing to support out-of-order scenarios, an archive interval is supported, where Global Min Watermark is the global minimum archiving time, Partition Watermark is the current archiving time of the partition, and Partition Min Watermark is the minimum archiving time of the partition. Archiving will be carried out only if the event time meets the following conditions:
Event time is greater than global minimum archiving time event time is greater than partition minimum archiving time
Hive partition generation ■ principle the difficulty of Hive partition generation is how to determine whether the data of the partition is ready and how to add the partition. Since the Sink side writes concurrently and multiple Task writes to the same partition data at the same time, only when all Task partition data writes are completed can the partition data be considered to be ready. The solution of this article is as follows:
On the Sink side, for each Task to save the current minimum processing time, it needs to meet the characteristic of monotonously increasing. When Checkpoint Complete, Task reports the minimum processing time to the JM side after JM gets the minimum processing time of all Task, you can get the global minimum processing time, and use this as the minimum ready time for Hive partitions. When the minimum ready time is updated, you can determine whether to add Hive partitions.
■ dynamic partitioning determines which partition directory the data is written to according to the value of upstream input data, instead of writing to a fixed partition directory, such as date= {date} / hour= {hour} / app= {app}, and determines the final partition directory according to the partition time and the value of the app field, so that within every hour, the same app data is under the same partition.
In a static partition scenario, only one partition file is written to each Task at a time, but in a dynamic partition scenario, multiple partition files may be written to each Task at the same time. For writes in Parque format, the data is first written to the local cache, and then batches are written to Hive. When Task processes too many file handles at the same time, OOM is easy to occur. In order to prevent a single Task OOM, the file handle is periodically probed to release the file handle that has not been written for a long time.
The MessengerMessenger module is used to collect Job running status information in order to measure the health of Job and the construction of market indicators.
The principle of ■ meta-information collection is as follows: the core indicators of Task, such as flow, QPS, dirty data, writing Latency, event time writing effect, are collected through Messenger on the Sink side, and summarized by Messenger Collector. Dirty data needs to be output to external storage, and task operation metrics are output to Grafana for display of market metrics.
Under the ■ dirty data collection data integration scenario, dirty data will inevitably be encountered, such as type configuration errors, field overflow, type conversion incompatibility and other scenarios. For streaming tasks, because the task will run all the time, it is necessary to be able to count the dirty data traffic in real time, save the dirty data to external storage for troubleshooting, and sample the output in the running log.
■ market monitoring
Market metrics cover global metrics and individual Job metrics, including write success traffic and QPS, write Latency, write failure traffic and QPS, archive effect statistics, etc., as shown in the following figure:
The above is all the content of the article "how to achieve byte jump in Flink-based MQ-Hive real-time data integration". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.