In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Introduction
In recent years, enterprises have an increasing demand for real-time data services. This paper collates the performance characteristics and applicable scenarios of common real-time data components, and introduces how Meituan builds a real-time data warehouse through Flink engine, so as to provide efficient and robust real-time data services. Earlier, our Meituan technology blog published an article entitled "performance comparison between Flink and Storm, the streaming computing framework", which compared the computing performance of Flink and Storm engines. This paper mainly describes the experience of using Flink in actual data production.
Initial architecture of real-time platform
In the initial stage of the construction of the real-time data system, the business demand is also relatively less, and a complete data system has not been formed. We adopt the development mode of "all the way to the end": by deploying Storm job processing real-time data queue on the real-time computing platform to extract data indicators and push them directly to the real-time application service.
Figure 1 initial real-time data architecture
However, with the increasing demand for real-time data by products and business people, new challenges arise.
There are more and more data indicators, and the development of "chimney" leads to serious code coupling problems.
There is more and more demand, some need detailed data, some need OLAP analysis. A single development model is difficult to cope with multiple needs.
Lack of a sound monitoring system to identify and fix problems before they have an impact on the business.
Construction of Real-time data Warehouse
To solve the above problems, we choose to use a hierarchical design to build a real-time data warehouse based on our experience in producing offline data. The hierarchical structure is shown below:
Figure 2 hierarchical architecture of real-time data warehouse
The scheme consists of the following four layers:
1. ODS layer: Binlog and traffic logs as well as real-time queues for each business.
two。 Data detail layer: business domain integration to extract factual data, offline full-volume and real-time change data to build real-time dimensional data.
3. Data summary layer: use wide table model to supplement dimensional data to detail data and summarize common indicators.
4. App layer: an application layer built for specific needs, which provides services through the RPC framework.
Through multi-layer design, we can precipitate the process of processing data in each layer. For example, the data filtering, cleaning, standardization and desensitization process are completed uniformly in the data detail layer, and the common multi-dimensional index summary data is processed in the data summary layer. It improves the code reuse rate and the overall production efficiency. At the same time, the types of tasks handled at each level are similar, so a unified technical scheme can be adopted to optimize performance and make the technical architecture of the warehouse more concise.
Technical selection 1. Research on Storage engine
The design of real-time data warehouse is different from offline data warehouse, which uses the same storage scheme at all levels, such as the strategy that is stored in Hive and DB. First of all, for the table of intermediate process, the scheme of mixing structured data through message queue storage and high-speed KV storage is adopted. The real-time computing engine can consume the data in the message queue by listening to messages for real-time computing. On the other hand, the data on high-speed KV storage can be used for fast correlation computation, such as dimensional data.
Secondly, on the application layer, the storage scheme is configured to write directly according to the characteristics of data use. The processing delay caused by offline data warehouse application layer synchronization data flow is avoided. In order to meet the needs of different types of real-time data and reasonably design storage schemes at all levels, we investigated several storage schemes that are widely used in Meituan.
Table 1 list of storage schemes
According to different business scenarios, the storage scheme used at each model level of real-time data warehouse is roughly as follows:
Figure 3 hierarchical architecture of real-time data warehouse storage
The data detail layer can be associated with more than 100000 TPS in the scenario of dimension data. We choose Cellar (KV storage developed by Meituan based on Tair) as storage to encapsulate dimension service to provide dimension data for real-time data warehouse.
For the general summary index, the data aggregation layer needs to associate the historical data with the data, which is the same as the dimensional data through Cellar as storage, and carries on the association operation in the way of service.
The design of the application layer of the data application layer is relatively complex, and then several different storage schemes are compared. We have established the judgment basis based on the data reading and writing frequency of 1000 QPS. For real-time applications where the average frequency of reading and writing is higher than 1000 QPS but the query is not too complex, such as real-time business data of merchants. Cellar is used as storage to provide real-time data service. For some applications that query complex and require detailed lists, it is more appropriate to use Elasticsearch as storage. And some query frequency is low, such as some internal operation data. Druid builds the index by real-time processing messages, and provides real-time data OLAP analysis function quickly through pre-aggregation. For some historical versions of data products for real-time transformation, you can also use MySQL storage to facilitate product iteration.
two。 Research on Computing engine
In the early stage of the construction of the real-time platform, we use Storm engine for real-time data processing. Although the Storm engine performs well in terms of flexibility and performance. However, because API is too low-level, some common data operations need to be functionally implemented in the process of data development. For example, table association, aggregation, etc., have generated a lot of additional development work, not only introducing a lot of external dependencies such as caching, but also not very good performance in actual use. At the same time, the functions supported by the data object Tuple in Storm are also very simple, and usually need to be converted to Java objects to deal with.
For this code-defined data model, usually we can only maintain it through documentation. Not only does it require additional maintenance work, but it is also troublesome to add and change fields. Generally speaking, it is difficult to use Storm engine to build real-time data warehouse. We need a new real-time processing scheme that can achieve:
1. Provide advanced API, support common data operations such as association aggregation, preferably SQL.
two。 It has state management and automatic support for persistence scheme to reduce the dependence on storage.
3. It is easy to access metadata services and avoid managing data structures through code.
4. Processing performance should be at least the same as that of Storm.
We have conducted a technical survey of the main real-time computing engines. The various engine features are summarized as shown in the following table:
Table 2 list of real-time computing schemes
From the research results, the API, fault-tolerant mechanism and state persistence mechanism of Flink and Spark Streaming can solve some of the problems we encounter in using Storm. However, Flink is closer to Storm in terms of data latency and has the least impact on existing applications. And in the company's internal tests, the throughput performance of Flink is about ten times higher than that of Storm. Considering comprehensively, we choose Flink engine as the development engine of real-time data warehouse.
What is even more interesting is Flink's Table abstraction and SQL support. Although the Strom engine can also be used to deal with structured data. But after all, it is still message-based processing API, in the code layer can not fully enjoy the convenience of manipulating structured data. Flink not only supports a large number of commonly used SQL statements, but also basically covers our development scenarios. And Flink's Table can be managed through TableSchema, supporting rich data types and data structures as well as data sources. It can be easily integrated with existing metadata management systems or configuration management systems. From the following figure, we can clearly see the difference between Storm and Flink in the development process.
Fig. 4 Flink-Storm comparison chart
Dealing with logic and implementation needs to be solidified in Bolt code when developing with Storm. Flink can be developed through SQL, the code is more readable, the implementation of logic is guaranteed by the open source framework to ensure reliable and efficient, as long as the optimization of specific scenarios can modify the implementation of Flink SQL optimizer, without affecting the logic code. It allows us to focus more on data development rather than logical implementation. When we need a scenario with a unified caliber of offline data and real-time data, we only need to slightly modify the offline caliber SQL script, which greatly improves the development efficiency. At the same time, compared with the data model used by Flink and Storm in the figure, Storm needs to define the data structure through a Class of Java, while Flink Table can be defined by metadata. It can be well combined with metadata, data governance and other systems in data development to improve development efficiency.
Experience of using Flink
In the process of building a real-time data warehouse using Flink-Table. We summarize some experiences through Flink for some common operations of building data warehouse, such as dimension expansion of data indicators, data association by topic, and data aggregation operation.
1. Dimension expansion
For the dimension expansion of data indicators, we use dimension services to obtain dimension information. Although the usual response latency for Cellar-based dimension services can be below 1ms. However, in order to further optimize Flink throughput, we all use asynchronous interface access to the association of dimensional data, avoiding the use of RPC calls to affect data throughput.
For some streams with a large amount of data, for example, the amount of data in the traffic log is on the order of 100000 seconds per piece. There is a built-in cache mechanism when associating UDF, which can eliminate the cache according to the hit rate and time, and partition with the associated Key value, which significantly reduces the number of requests for external services, effectively reduces processing delay and pressure on the external system.
two。 Data association
Data topic merging is essentially the association of multiple data sources, which is simply a Join operation. Flink's Table is based on the concept of infinite stream. It is not possible to associate two complete tables in the same way as offline data when performing Join operations. The scheme of associating the data within the window time is adopted, which is equivalent to intercepting a period of data from the two data streams for Join operation. It is somewhat similar to the association of offline data by restricting partitions. It is also important to note that when Flink associates a table, there must be at least one "equal" association condition, because the values on both sides of the equal sign are used to group.
Because Flink caches all the data in the window for association, the amount of data cached is proportional to the size of the associated window. Therefore, the association query of Flink is more suitable for scenarios where the time range of associated data can be limited by business rules. For example, the browsing log of the associated order user within 30 minutes before the purchase. A large window not only consumes more memory, but also produces a larger Checkpoint, resulting in a drop in throughput or Checkpoint timeout. RocksDB and incremental SavePoint mode can be used in actual production to reduce the impact of Checkpoint process on throughput.
For some scenarios that require a long correlation window, for example, the associated data may be from a few days ago. For these historical data, we can understand it as a fixed "dimension". The historical data that needs to be associated can be processed in the same way as the dimensional data: "cache + offline" data is stored and associated with the interface. It is also important to note that Flink is directly sequentially linked to multiple table associations, so you need to pay attention to the association with a small result set first.
3. Aggregate operation
When using aggregation operations, Flink supports common aggregation operations such as summation, extremum, mean, and so on. The only drawback is the support for Distinct. The previous solution adopted by Flink-1.6 is to group the deduplicated fields and then aggregate them. For scenarios that need to de-reaggregate multiple fields, it is inefficient to calculate separately and then associate them. For this reason, we have developed a custom UDAF to achieve accurate weight removal of MapView, imprecise weight removal of BloomFilter, and ultra-low memory de-duplication scheme of HyperLogLog to deal with all kinds of real-time deduplication scenes.
However, when using a custom UDAF, it is important to note that the RocksDBStateBackend schema takes a lot of time to serialize and deserialize when updating a larger Key. Consider using FsStateBackend mode instead. It is also important to note that when calculating analysis functions such as Rank, the Flink framework needs to cache all the data under each grouping window in order to sort, which consumes a lot of memory. It is recommended that in this scenario, the logic of TopN should be converted first to see if the requirements can be solved.
The following figure shows a complete process of producing a real-time data table using the Flink engine:
Fig. 4 Real-time calculation flow chart real-time warehouse results
By using real-time data warehouse instead of the original process, we abstract each process in data production into each layer of real-time data warehouse. The data source of all real-time data applications is unified, and the caliber of application data indicators and dimensions is guaranteed. In several scenarios where the caliber of the data has been modified, we change the details and summary of the warehouse to complete the caliber switch of all applications without modifying the application code at all. In the process of development, through strict control of data layering, subject domain division, content organization standards and naming rules. It makes the link of data development clearer and reduces the coupling of code. Coupled with the use of Flink SQL for development, the code is more concise. The amount of code for a single job has shrunk from an average of 300 + lines of Java code to dozens of lines of SQL scripts. The development time of the project has also been greatly reduced, and it is not uncommon for one person to develop multiple real-time data indicators per day.
In addition, we can optimize the performance and configure the parameters according to the different characteristics of the work content of each level. For example, the ODS layer mainly performs data parsing, filtering and other operations, without the need for RPC calls and aggregation operations. We optimized the data parsing process to reduce unnecessary JSON field parsing and to use more efficient JSON packages. In terms of resource allocation, a single CPU can only configure the memory of 1GB to meet the demand.
The aggregation layer mainly carries out aggregation and association operations, which can improve the performance and reduce the cost by optimizing the aggregation algorithm and the common operation of internal and external memory. More memory will also be allocated in resource allocation to avoid memory overflows. Through these optimization methods, although the production link of the real-time warehouse is longer than that of the original process, the data delay does not increase significantly. At the same time, the computing resources used by real-time data applications are also significantly reduced.
Prospect
Our goal is to build a real-time warehouse into a data system comparable to the accuracy and consistency of offline warehouse data. Provide timely and reliable data services for businesses, business personnel and Meituan users. At the same time, it serves as a unified export of real-time data for meals, helping other business departments of the group.
In the future, we will pay more attention to data reliability and real-time data index management. Establish a sound data monitoring, data consanguinity detection, cross-check mechanism. Timely monitoring and early warning of abnormal data or data delay. At the same time, optimize the development process and reduce the learning cost of developing real-time data. So that more people with real-time data needs can solve problems on their own.
About the author
Wei Lun, Meituan, head of real-time data in the catering technology department of the hotel, joined Meituan in 2017 and has been engaged in the development of data platform, real-time data computing and data architecture for a long time. There are some experiences and outputs in using Flink for real-time data production and improving production efficiency. At the same time, it is also actively promoting the practical experience of Flink in real-time data processing.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.