In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Today, I will talk to you about how to build a real-time data analysis platform based on Flink+ClickHouse, which may not be well understood by many people. In order to make you understand better, the editor has summarized the following contents for you. I hope you can get something from this article.
The following mainly introduces Flink-to-Hive hourly scenario and Flink-to-ClickHouse second scenario.
I. Analysis of business scenario and current situation
The query page is divided into offline query page and real-time query page. The transformation realized this year is to integrate the ClickHouse computing engine into the real-time query. According to different business scenarios, data index graphs and detailed data index tables are displayed in real-time data reports. At present, the collection and calculation of data indicators is a time window every five minutes, of course, there are special cases of three minutes or one minute. All the data index data are exported from Kafka real-time data and imported into ClickHouse for calculation.
Second, Flink-to-Hive hourly scenario
1. Hour-level implementation architecture diagram
As shown in the following figure, the Binlog in Database is exported to Kafka, and the Log Server data is also reported to Kafka. After all the data is landed in Kafka in real time, it is extracted to HDFS through Flink. In the following figure, there is a dotted line between HDFS and Hive, that is, after Flink is not landed directly from Hive,Flink to HDFS, the time from landing to Hive may be hourly, half-hour, or even minute. You need to know when the Event time of the data has reached, and then trigger alter table,add partition,add location and so on to write to its partition.
At this point, you need a program to monitor when the data time of the current Flink task has been consumed, such as 9: 00. When landing, you need to check whether the data consumed in Kafka has reached 9: 00, and then trigger partition write in Hive.
two。 Realization principle
StreamingFileSink, a feature of the higher-end version of Flink, is mainly used. StreamingFileSink has several main functions.
First, forBulkFormat supports avro and parquet formats, that is, column storage formats.
Second, withBucketAssigner is customized to divide buckets by data time. Here, an EventtimeBucket is defined, that is, data is landed to offline according to data time.
Third, OnCheckPointRollingPolicy, according to the CheckPoint time for data landing, in a certain CheckPoint time data landing and stabilization. There are other strategies for CheckPoint landing, such as data size.
Fourth, StreamingFileSink is the semantic implementation of Exactly-Once.
There are two Exactly-Once semantic implementations in Flink, the first is Kafka and the second is StreamingFileSink. The following figure shows the demo that OnCheckPointRollingPolicy designed to land in the HDFS file every 10 minutes.
How to implement Exactly-Once with ■
On the left side of the figure below is a simple two-PC model. Coordinator sends a prepare, and the executor starts to trigger the ack action. After Coordinator receives all the messages from ack, all ack starts to trigger commit, and all executors land and convert it to the model of Flink. When Source receives the checkpoint barrier stream, it starts to trigger a snapshot.
After the CheckPoint and snapshot of each operator are completed, CheckPoint sends notifyCheckpointComplete to Job Manager. In the following figure, the two-phase model is consistent with the left three lines of the Flink model. Therefore, the two-phase commit protocol can be implemented with Flink.
How ■ uses Flink to implement the two-phase commit protocol first, StreamingFileSink implements two interfaces, CheckpointedFunction and CheckpointListener. CheckpointedFunction implements the initializeState and snapshotState functions. CheckpointListener is the method implementation of notifyCheckpointComplete, so these two interfaces can implement two-phase commit semantics.
InitializeState
InitializeState triggers three actions when the task starts. The first one is commitPendingFile. There are three states for real-time data landing on Hdfs. The first state is in-progress, which is in progress. The second state is the pending state, and the third state is the finished state.
InitializeState also triggers restoreInProgressFile when the task starts, and the operator writes in real time. If there is a problem with the program when CheckPoint is not successful, initializeState will commit PendingFile when it starts again, and then use Hadoop version 2.7 + truncate to reset or truncate the in-progress file.
Invoke
Write data in real time.
SnapshotState
When CheckPoint is triggered, the in-progress file is converted to pending state, and the data length is recorded (truncated length is required for truncate mode). SnapshotState does not actually write data to HDFS, but to ListState. Flink implements Exactly-Once semantics internally when Barrier aligns state, but it is difficult to implement external end-to-end Exactly-Once semantics. Exactly-Once, which is implemented in Flink, stores all the data in ListState through ListState, waits for all operator CheckPoint to complete, and then brushes the data in ListState to HDFS.
NotifyCheckpointComplete
NotifyCheckpointComplete triggers pending-to-finished state data writes. The realization method is that rename,Streaming constantly writes temporary files to HDFS, and after all the actions are finished, they are written into official documents through the rename action.
3. Cross-cluster multi-nameservices
Qitoutiao's real-time cluster and offline cluster are independent, there are many offline clusters, and there is currently one set of real-time cluster. Writing offline clusters through real-time clusters will cause HDFS nameservices problems. It is not appropriate to integrate all the nameservices of offline clusters into the real-time cluster by namenode HA in the real-time cluster. So how do you submit it to each offline cluster in a task through a real-time cluster?
As shown in the following figure, under resource of the Flink task, add in the middle of the xml of HDFS. Add nameservices to the PropertyHong Kong, for example, stream is the namenode HA configuration of the real-time cluster, and data is the namenode HA configuration of the offline cluster that will be written. Then the HDFS set between the two clusters does not need to modify each other and can be implemented directly on the client side.
4. Multi-user write permission
Writing offline HDFS in real time may involve user rights issues. The real-time submitted user has defined that the user is the same user in all programs, but it is multi-user offline, which will result in unequal real-time and offline users. The interesting headline adds withBucketUser to API to write HDFS. Once the nameservices is configured, you only need to know which user writes the HDFS path, such as configuring a stream user to write.
The advantage of the API hierarchy is that a Flink program can specify multiple different HDFS and different users. The implementation of multi-user writing is to add a ugi.do as to the Hadoop file system to proxy the user. The above are some of the work of interesting headlines using Flink to synchronize real-time data to Hive. Small file problems may occur. Small files are daemons for regular merge. If the CheckPoint interval is short, such as once every 3 minutes, a large number of small file problems will occur.
Third, Flink-to-ClickHouse second-level scene
1. Second-level implementation architecture diagram
Interesting headlines currently have many real-time metrics, which are calculated every five or three minutes on average. If each real-time metric is written with a Flink task or a Flink SQL, such as consuming a Kafka Topic, you need to calculate the number of daily active users, new users, processes, and so on. When a user puts forward a new request, you need to change the current Flink task or start a new Flink task to consume Topic.
As a result, there will be the problem that Flink tasks are constantly modified or new Flink tasks are created. Qitoutiao tries to connect to ClickHouse after Flink to achieve the overall OLAP. The following figure shows a second-level implementation architecture diagram. From Kafka to Flink, to Hive, to ClickHouse cluster, docking external Horizon (real-time report), QE (real-time adhoc query), thousand fathom (data analysis), user profile (real-time circle).
2.Why Flink+ClickHouse
Sql description of indicators: the indicators proposed by analysts are basically described in SQL.
The upper and lower lines of metrics do not affect each other: a Flink task consumes Topic. If you need other metrics, you can ensure that the upper and lower lines of metrics do not affect each other.
Data can be traced back to facilitate exception troubleshooting: when the number of daily active users drops, there are logical problems with which metrics, such as differences in reported data or loss of data flow Kafka, or because users fail to report a certain indicator, resulting in a decline in the number of daily active users, but Flink is unable to do so.
The calculation is fast, and all the indicators are calculated in one cycle: the indicators of hundreds or thousands of dimensions need to be calculated in five minutes.
Support real-time streaming, distributed deployment, simple operation and maintenance: support real-time streaming of Kafka data.
At present, the Qitoutiao Flink cluster has 100 + 32-core 128G 3.5T SSD, daily data volume of 200 billion, daily query volume of 21w + times, and 80% of the queries are completed in 1 s. The following figure shows the test results of a single table. The testing speed of ClickHouse single meter is fast. But limited by the architecture, ClickHouse's Join is weak.
The following figure shows that SQL,count+group by+order by,ClickHouse, which is relatively complex, completes 2.6 billion data calculation in 3.6 seconds.
3.Why ClickHouse so Fast
ClickHouse adopts column storage + LZ4, ZSTD data compression. Secondly, computing storage is combined with localization + vectorization. Presto data may be stored in a Hadoop cluster or HDFS, and the data may be pulled in real time for calculation. The localization of ClickHouse computing storage means that each computing machine has a local SSD disk and only needs to calculate its own data before merging nodes. Meanwhile, LSM merge tree+Index. After the data is written to ClickHouse, a thread starts in the background to merge the data for Index indexing. Such as building common DT indexes and small-level data indexes to improve query performance. Fourth, SIMD+LLVM optimization. SIMD is a single instruction multiple data set. Fifth, SQL grammar and UDF are perfect. ClickHouse has a great demand for this. Higher features, such as part of the function point of the time window, are required for data analysis or dimension drop.
Merge Tree: as shown in the following figure. The first layer is real-time data writing. Merge of each level of data is carried out in the background. Merge will sort the data and do the Index index.
ClickHouse Connector:ClickHouse has two concepts, Local table and Distributed table. Usually write Local table and read Distributed table. ClickHouse generally writes data in a batch of 5 to 10 weeks and a cycle of 5 seconds. Interesting headlines also realized RoundRobinClickHouseDataSource.
BalancedClickHouseDataSource: you can write data by configuring an IP and port number in MySQL, while BalancedClickHouseDataSource needs to write the Local table, so you must know how many Local tables the cluster has, and the IP and port number of each Local table. If you have a hundred machines, you need to configure all the IP and port numbers of the machines before writing them. BalancedClickHouseDataSource has two schedule. ScheduleActualization and scheduleConnectionsCleaning. If you configure the IP and port number of a hundred machines, some machines will not connect or the service will not respond. ScheduleActualization will regularly find that the machine cannot connect and trigger actions such as going offline or deleting IP. ScheduleConnectionsCleaning periodically cleans up useless http requests in ClickHouse.
RoundRobinClickHouseDataSource: as a result of strengthening BalancedClickHouseDataSource with interesting headlines, three semantics are implemented. Set testOnBorrow to true and try ping to see if you can get a connection. Write with ClickHouse as a batch, set testOnReturn to false,testWhileIdel, set true to true, and fill in the official scheduleActualization and scheduleConnectionsCleaning functions. Merge continues to be performed in the ClickHouse backend. If the insert is too fast, the speed of the background merge slows down and cannot keep up with the insert, an error occurs. Therefore, you need to keep writing down as much as possible, and then write to the next machine after writing the current machine, and write at 5s intervals, so that the merge speed can be kept consistent with the insert speed as far as possible.
4.Backfill
When Flink imports ClickHouse, it will encounter some problems when querying data or displaying reports, such as failure of Flink task, error report or data reverse pressure, or problems such as unresponsive ClickHouse cluster, unable to keep up with zk, insert too fast or cluster load, which will lead to problems in the whole task.
If the amount of streaming data suddenly soars, starting Flink may keep chasing data for a period of time. You need to adjust parallelism and other operations to help Flink catch up with data. But at this time, there is already a backlog of data. If you want to increase the concurrency of Flink to process data, ClickHouse limits insert not to be too fast, otherwise it will lead to a vicious circle. Therefore, when the Flink fails or the ClickHouse cluster fails, after waiting for the recovery of the ClickHouse cluster, the Flink task starts to consume the latest data, no longer traces the data of the past for a period of time, and imports the data into ClickHouse through Hive.
Because the data has been landed on the Hive in real time through Kafka, the data is written to ClickHouse through Hive. ClickHouse has partitions, so you only need to delete the data of the previous hour and import the one-hour data of Hive to continue the data query operation. Backfill provides Flink task hourly fault tolerance and ClickHouse cluster hourly fault tolerance mechanism.
Future development and thinking
1.Connector SQL
Currently, Flink-to-Hive and Flink-to-ClickHouse are solid scenarios with interesting headlines. You only need to specify the HDFS path and the user, and the rest of the process can be described through SQL.
2.Delta lake
Flink is a streaming batch integrated computing engine, but there is no streaming batch integrated storage. Qitoutiao uses HBase, Kudu, Redis and other KV stores that can interact with Flink in real time for data calculation. For example, to calculate new problems, the current solution of Xitoutiao is to brush Hive historical users into Redis or HBase, and interact with Flink in real time to determine whether users are added. But because the data in Hive and the data in Redis are stored as two pieces of data. Secondly, Binlog extraction data will involve delete actions. Hbase,Kudu supports data modification and returns to Hive on a regular basis. The problem is that there is data in HBase,Kudu, and Hive saves one more data, one or more copies of data. If the storage of streaming batches supports the above scenarios, when the Flink task comes, you can interact with offline data in real time, including real-time query of Hive data, and you can determine whether users are new in real time, modify, update or delete the data in real time, and also support the action storage of Hive batches. After reading the above, do you have any further understanding of how to build a real-time data analysis platform based on Flink+ClickHouse? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.