In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
What is the application of Flink in Kuaishou real-time multi-dimensional analysis scenario? I believe many inexperienced people don't know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
Application scenario and scale of Flink in Kuaishou
First of all, let's look at the application scenario and scale of Flink in Kuaishou.
1. Kuaishou application scenario
Kuaishou computing links are sent from DB/Binlog and WebService Log to Kafka in real time, and then connected to Flink for real-time computing, including real-time data warehouse, real-time analysis and real-time training, and the final results are stored in Druid, Kudu, HBase or ClickHouse. At the same time, a copy of Kafka data is real-time Dump to Hadoop cluster, and then offline calculation is done through Hive, MapReduce or Spark. Finally, the result data of real-time calculation and offline calculation will be displayed by the internal self-developed BI tool KwaiBI.
The typical application scenarios of Flink in Kuaishou are mainly divided into three categories:
80% statistical monitoring: real-time statistics, including indicators of various data, monitoring item alarm, used to assist business real-time analysis and monitoring; 15% data processing: data cleaning, splitting, Join and other logical processing, such as data splitting and cleaning of large Topic; 5% data processing: real-time business processing, real-time processing for specific business logic, such as real-time scheduling.
Typical scenarios where Flink is applied in Kuaishou include:
Kuaishou is a platform for sharing short videos and live streaming. The quality monitoring of Kuaishou short videos and live streaming is based on real-time statistics through Flink, such as live broadcast audience, VJ's broadcast volume, stutter rate, start-up failure rate and other monitoring indicators related to live broadcast quality. User growth analysis, real-time statistics of customer access of each delivery channel, and real-time adjustment of delivery volume of each channel according to the effect. Real-time data processing, advertising display stream, clickstream real-time Join, split of client logs, etc.; live CDN scheduling, real-time monitoring of the quality of each CDN manufacturer, and adjusting the traffic ratio of each CDN manufacturer through Flink real-time training.
2. Flink cluster size
At present, the cluster size of Kuaishou is about 1500, the total number of daily processing items is 3 trillion, and the number of peak processing items is about 300 million / s. Cluster deployment is a mixture of real-time cluster and offline cluster in On Yarn mode, and is physically isolated through Yarn tags. Real-time cluster is a dedicated Flink cluster, which aims at business deployment with high isolation and stability requirements. Note: the data involved in this article only represent the data shared by the guests.
Kuaishou real-time multidimensional analysis platform
Here we focus on sharing the real-time multi-dimensional analysis platform of Kuaishou.
1. Kuaishou real-time multi-dimensional analysis scene
There is such an application scenario within Kuaishou, where the daily data volume is at the level of 10 billion. The business side needs to select a combination of less than five dimensions in the data for full-dimensional modeling to calculate cumulative indicators such as PV (Page View visits), UV (Unique Visitor unique visitors), new or retained, and then the calculation results of the indicators should be graphically displayed in real time for business analysts to analyze.
two。 Scheme selection
At present, there are some OLAP real-time analysis tools in the community, such as Druid and ClickHouse;. At present, Kuaishou adopts the solution of Flink+Kudu. In the early research phase, the three schemes are compared and analyzed in terms of computing power, grouping aggregation ability, query concurrency and query delay combined with real-time multi-dimensional query business scenarios:
In terms of computing power: multidimensional query is a business scenario that needs to support capabilities such as Sum, Count and count distinct, while the Druid community version does not support count distinct. Count distinct;ClickHouse, which supports numeric types but not character types in Kuaishou's internal version, all supports these computing capabilities. Flink is a real-time computing engine, and these capabilities are also available. In terms of packet aggregation ability: the packet aggregation ability of Druid is general, while both ClickHouse and Flink support strong packet aggregation ability. Query concurrency: ClickHouse index is weak, can not support high query concurrency, Druid and Flink support high concurrency, storage system Kudu, it also supports strong index and high concurrency. Query delay: both Druid and ClickHouse are calculated at the time of query, while in Flink+Kudu scheme, the index results are directly stored in Kudu after real-time calculation by Flink, and the query results are queried directly from Kudu without calculation, so the query delay is relatively low.
The main idea of using Flink+Kudu is to learn from the idea of Kylin. Kylin can specify many dimensions and indicators for offline pre-calculation and then store the pre-calculation results in Hbase; Kuaishou's scheme is to calculate the indicators in real time through Flink and then write them to Kudu in real time.
3. Scheme design
The overall flow of real-time multidimensional analysis is as follows:
Users configure the Cube data cube model on KwaiBI, a BI analysis tool developed by Kuaishou, to specify dimension columns and indicator columns and what kind of calculations are made based on the metrics.
The data table selected in the configuration process is the data table stored in the real-time data warehouse platform after processing.
Then, according to the configured calculation rules, the pre-calculation of the modeling index is carried out through the Flink task, and the results are stored in Kudu.
Finally, KwaiBI queries the data from Kudu for real-time Kanban display.
Next, the main modules of real-time multidimensional analysis are introduced in detail.
■ data preprocessing
The data table selected by KwaiBI when configuring dimension modeling is preprocessed in advance:
First of all, there is a meta-information system, which provides a unified schema service in the meta-information system, and all the information is abstracted into logical tables; for example, the topic, Redis, Hbase tables of Kafka are extracted and stored in schema; most of the physical data formats of Kuaishou Kafka are in Protobuf and Json formats, and the schema service platform also supports mapping them to logical tables. Users only need to build the logical table, and then they can clean and filter the data in the real-time data warehouse.
■ modeling and calculation index
After the data preprocessing is completed, the most important step is to calculate the modeling indicators. Here, the combination of Cube and GroupingSet dimensions is supported to calculate the hourly or daily cumulative UV (Unique Visitor), new and retained indicators, and the results can be output periodically according to the user configuration. In the dimension aggregation logic, the dimensionality reduction calculation layer by layer will make the DAG job diagram very complex, as shown in the upper right corner of the above figure. Therefore, Kuaishou designed a two-layer dimensionality reduction model, which is divided into full-dimension layer and residual dimension layer, which not only makes use of the aggregation results of full-dimension layer but also simplifies the DAG job graph.
Taking the calculation of UV index as an example, two yellow dashed frames correspond to two layers of calculation modules: full-dimensional calculation and dimension reduction calculation.
Full-dimensional calculation is divided into two steps. In order to avoid the problem of data skew, the first step is to break up the dimension and pre-aggregate, and hash the same dimension value first. Because the UV index needs to be accurately deduplicated, Bitmap is used for de-duplication operation, and the Bitmap of the data in the incremental window is calculated every minute and sent to the second step of full aggregation by dimension; in the full aggregation, the incremental Bitmap is merged into the full Bitmap to get the accurate UV value. However, some people may have a problem. This scheme can be used for numeric types such as user id, but what should be done with data of character types such as deviceid? In fact, at the source, before the data is aggregated, the variables of character types are converted to unique Long type values through the dictionary service, and then the UV is recalculated through Bitmap. In the dimensionality reduction calculation, the results obtained by full-dimensional calculation are pre-polymerized and then fully polymerized, and finally the results are output.
Then focus on several key points in the calculation of modeling indicators. In the calculation of modeling indicators, in order to avoid the problem of dimensional data skew, pre-aggregation (hash fragmentation of the same dimension) and total aggregation (aggregation of the same dimension after fragmentation) are used to solve the problem.
In order to solve the problem of accurate weight removal of UV, it was mentioned earlier that using Bitmap for precise weight removal and converting String type data into Long type data through dictionary service is easy to store in Bitmap, because statistical UV needs to count historical data, such as accumulating by day, and Bitmap will become larger and larger over time. In Rocksdb state storage, reading and writing too large KV will consume performance, so an internal BitmapState is customized. The Bitmap is stored in blocks, and a blockid corresponds to a local bitmap. In this way, when storing in RocksDB, a KV is relatively small, and when updating, you only need to update the local bitmap according to the blockid, without the need for full updates.
Next, take a look at the metric calculation of the new class. The difference between UV and just now is that you need to determine whether the new user is a new user, determine the new user by asynchronously accessing the external historical user service, and then calculate the new UV based on the new user flow. This calculation logic is the same as the UV calculation.
Then, let's take a look at the retention metric calculation. Different from the UV calculation, we need not only the data of the current day but also the historical data of the previous day, so that the retention rate can be calculated. The internal implementation uses double buffer state storage, and the retention rate can be calculated by dividing the double buffer data when calculating.
■ Kudu Stora
Finally, after the above calculation logic, the result will be stored in Kudu, which has the characteristics of low delay random read and write and fast column scanning, so it is very suitable for real-time interactive analysis scenarios. In the storage mode, we first encode the dimensions, then use the time + dimension combination + dimension value combination as the primary key, and finally partition according to the dimension combination, dimension value combination and time, which is helpful to improve the efficiency of the query to get the data quickly.
4. KwaiBI display
The interface is to configure a screenshot of the Cube model, configure some columns and specify the type, and then use a SQL statement to describe the logic of metric calculation. The final result will also be displayed through KwaiBI.
SlimBase- saves IO and embedded shared state storage
Next, we introduce a more IO-saving and embedded shared state storage engine: SlimBase than RocksDB.
1. The challenges facing
First of all, let's take a look at the problems encountered by Flink in using RocksDB. First, we will describe the application scenario of Kuaishou and the real-time Join scenario in which ads show clickstream: opening Kuaishou App may receive advertising videos recommended by advertising services, and users may click on the advertising videos displayed.
Such behavior will form two data streams at the back end, one is the ad display log and the other is the client click log. The two data are real-time Join, and the Join results are used as sample data for model training, and the trained model will be pushed to the online advertising service.
Clicks in the next 20 minutes in this scenario are considered valid clicks, while real-time Join logic is the presentation of click data Join in the past 20 minutes. Among them, the amount of data showing the stream is relatively large, and the 20-minute data is more than 1TB. The checkpoint is set to five minutes and Backend selects RocksDB.
In such a scenario, when faced with 70% of disk IO overhead, 50% of which comes from Compaction; during Checkpoint, the disk IO cost reaches 100%, which takes 1-5 minutes, or even longer than the Checkpoint interval. The business can obviously feel the reverse pressure. Through analysis, the problem is identified:
First, there are four times as many large-scale copies of data during Checkpoint, that is, all are read from RocksDB and then written to HDFS as three copies; second, for large-scale data writes, the default Level Compaction of RocksDB has significant IO magnification overhead.
two。 Solution
Due to the problems described above, we began to look for solutions, and the overall idea is to land directly into shared storage when data is written, so as to avoid the data copy problems caused by Checkpoint. The approach is to try to use a more IO-efficient Compaction, such as SizeTieredCompation, or to use and transform FIFOCompaction using the characteristics of time series data. Through a comprehensive comparison of shared memory, SizeTieredCompation, FIFOCompaction based on event time and technology stack, a consensus is reached: HBase replaces RocksDB.
In terms of shared storage, HBase supports it, RocksDB does not support SizeTieredCompation, RocksDB does not support it by default, HBase supports FIFOCompaction pushed down based on event time by default, and RocksDB does not support it, but HBase development is relatively simple in terms of technology stack. It is more convenient for RocksDB to use java,HBase to transform it.
But HBase is worse than RocksDB in some aspects:
HBase is a heavyweight distributed system that relies on zookeeper, including Master and RegionServer, while RocksDB is only an embedded Lib library and is very lightweight. In terms of resource isolation, HBase is more difficult, memory and cpu are shared by multiple Container, while RocksDB is relatively easy, memory and cpu are naturally isolated with Container. In terms of network overhead, because HBase is distributed, all costs are much higher than embedded RocksDB.
Combining the above reasons, Kuaishou reached a second consensus to slim down HBase and transform it into an embedded shared storage system.
3. Realization scheme
Next, let's introduce the implementation scheme of transforming HBase into SlimBase, which is mainly divided into two layers:
One layer is SlimBase itself, which consists of three layers: Slim HBase, adapter and interface layer, and the other layer is SlimBaseStateBackend, which mainly contains ListState, MapState, ValueState and ReduceState.
Later, it will be described in detail from the three steps of HBase slimming, adaptation and implementation of the operation interface and the implementation of SlimBaseStateBackend.
■ HBase slimming down
First of all, let's talk about HBase weight loss, mainly from the two steps of weight loss and weight gain, in terms of weight loss:
First cut HBase, remove client, zookeeper and master, keep only RegionServer and then cut RegionServer, remove ZK Listener, Master Tracker, Rpc, WAL and MetaTable, and retain only Cache, Memstore, Compaction, Fluster and Fs in RegionServer.
In terms of weight gain:
Migrate the HFileCleaner used to clean up Hfile on Master to the merge interface on RegionServer that RocksDB supports read and play uppercase, but SlimBase does not, so implement the interface of merge
The interface layer mainly has the following three implementations:
Following the example of RocksDB, the logical view is divided into two levels: DB and ColumnFamily support some basic interfaces: put/get/delete/merge and snapshot have additional support for restore interfaces for restoring from snapshot
The adaptation layer mainly has the following two concepts:
One SlimBase adapts to Hbase's namespace and one SlimBase's ColumnFamily adapts to HBase's table.
The implementation of SlimBaseStateBackend is mainly reflected in two aspects:
One is a variety of States implementations that support multiple data structures, and the other is to transform the processes of Snapshot and Restore with ListState, MapState, ValueState and ReduceState. As can be seen from the following two diagrams, SlimBase saves a lot of resources on disk IO and avoids multiple IO problems.
4. Test conclusion
After the on-line comparison test, the test conclusion is drawn:
Latency for Checkpoint and Restore has been reduced from minutes to seconds. Disk IO decreased by 66% disk write throughput decreased 50%CPU overhead decreased by 33%
5. Later stage optimization
The current Compaction strategy is SizeTieredCompaction. In the later stage, to implement the OldestUnexpiredTime-based FiFOCompaction strategy, the goal is to achieve diskless IO overhead. FiFOCompaction is an IO-free Compaction strategy based on TTL; OldestUnexpiredTime means, for example, setting OldestUnexpiredTime=t2, which means that all data before T2 time expires and can be cleaned by Compaction, and time-based FIFOCompaction can theoretically achieve diskless IO overhead.
There are four optimizations to follow. The first three are based on HBase, and the last is for HDFS:
SlimBase uses InMemoryCompaction to reduce memory Flush and Compaction overhead SlimBase supports prefixBloomFilter and improves Scan performance SlimBase supports short-circuit read HDFS copy modification: non-local copy uses DirectIO directly to improve the hit rate of local read pagecache. This is mainly due to the problem that single copy is more efficient than multiple copies during testing.
6. Future planning
In terms of language, storage, compression strategy, event push, garbage collection, checkpoint time and reload time, SlimBase is more suitable for the development of Kuaishou real-time computing tasks than RocksDB. The future plan is to further optimize the performance of Slimbase. The vision is to replace RocksDB with SlimBase for all business scenarios on Kuaishou Flink. After reading the above, have you mastered the application of Flink in Kuaishou real-time multi-dimensional analysis scene? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.