In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to understand the construction of Weibo's real-time computing platform based on Flink". In daily operation, I believe many people have doubts about how to understand the construction of Weibo's real-time computing platform based on Flink. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the questions of "how to understand the construction of Weibo's real-time computing platform based on Flink". Next, please follow the editor to study!
one。 Technology selection
Compared with Spark, the overall ecology of Spark is more perfect, and it is temporarily ahead in the integration and application of machine learning. However, as a strong competitor of the next generation big data engine, Flink has obvious advantages in streaming computing. Flink belongs to the real sense of single processing in streaming computing, and each piece of data triggers the calculation, rather than Mini Batch like Spark as a compromise of streaming processing. The fault-tolerant mechanism of Flink is relatively lightweight, has little impact on throughput, and has some optimizations on graph and scheduling, so that Flink can achieve high throughput. The fault-tolerant mechanism of Strom needs to ack each piece of data, so its throughput bottleneck is also criticized.
Here is a diagram to compare the commonly used real-time computing frameworks.
Cdn.com/3da0ac542030556f0def525ccf6ec7ee9eec5b1f.jpeg ">
Characteristics of Flink
Flink is an open source distributed real-time computing framework. Flink is stateful and fault-tolerant and can seamlessly recover from failures while maintaining the state of an application; it supports large-scale computing power and can run concurrently on thousands of nodes; it has good throughput and latency characteristics. Meanwhile, Flink provides a variety of flexible window functions.
1) State management mechanism
Flink checkpoint mechanism can keep the computation of exactly-once semantics. State retention means that the application can save the results and states of the dataset that have been processed.
2) event mechanism
Flink supports flow processing and window event time semantics. The event time can be easily calculated from the order in which the events arrive and the possible arrival delay flow of the events.
3) window mechanism
Flink supports a very flexible window mechanism (window) based on time, number, and session. Window triggers can be customized to support more complex flow patterns.
4) Fault-tolerant mechanism
Flink's efficient fault-tolerant mechanism allows the system to support exactly-once semantic computing in the case of high throughput. Flink can recover from the failure accurately and quickly with zero data loss.
5) High throughput and low latency
Flink has the characteristics of high throughput and low latency (which can process large amounts of data quickly). The following figure shows the performance comparison of Apache Flink and Apache Storm in completing distributed project counting tasks.
two。 Architecture in the early stages of architecture evolution
The initial architecture consists of only two layers of computing and storage. After the new computing requirements are connected, a new real-time computing task needs to be developed to go online. The code reuse rate of repetition module is low and the repetition rate is high. The difference between computing tasks is mainly focused on the calculation index caliber of the task.
In the storage layer, the storage paths required by each demand side are different, the computing indicators may be duplicated on the impassable storage engine, and there is a waste of computing resources and storage resources. And the calculation caliber of the index is only limited to the requirements of a single task, and the calculation caliber of the same index is not uniformly limited to the guarantee. Various business parties also develop data acquisition services on different storage engines. For those teams that focus on the data application itself, there is no doubt that the current model has some drawbacks.
Late architecture
With the increase of the volume of data and the expansion of business lines, the disadvantages of the early architecture model gradually began to appear. From the original single-demand single-task model gradually changed to a general data architecture model. To this end, we have developed some general components based on the Flink framework to support rapid data access, and ensure the unity and maintenance of the code pattern. In the data layer, we use Clickhouse as the calculation and storage engine of our data warehouse, using its feature of supporting multi-dimensional OLAP computing to deal with the fast query requirements under multi-dimensional, multi-index and large amounts of data. In data layering, we refer to the experience and methods of offline data warehouse, build multi-layer real-time data warehouse service, and develop a variety of micro services to support data aggregation, index extraction, data export, data quality, alarm monitoring and so on.
The overall structure is divided into five layers:
1) access layer: access the original data for processing, such as Kafka, RabbitMQ, File, etc.
2) Computing layer: Flink is selected as the real-time computing framework to clean and correlate the real-time data.
3) Storage layer: the cleaned data is stored, and the model of real-time data warehouse is layered and built, and the data of different application scenarios are stored in storage such as Clickhouse,Hbase,Redis,Mysql. In the service, and abstract the common data layer and the dimension layer data, deal with the compressed data layer and unify the data caliber.
4) Service layer: provides unified data query service, and supports multi-dimensional computing services from underlying detail data to aggregation layer data 5min/10min/1hour. At the same time, the uppermost characteristic index data, such as computing layer input to Redis, Mysql and so on, are also obtained from this data interface.
5) Application layer: support each business line data scenario with the support of unified query service.
Monitoring alarm: monitor the survival status of Flink tasks, send email alarms to abnormal tasks, and automatically pull up and restore tasks according to the set parameters. According to the offset index such as Kafka consumption, an alarm is given to real-time tasks with delayed consumption processing.
Data quality: monitor the real-time data index, regularly compare the historical real-time data with the data calculated by offline hive, provide the data quality index of real-time data, and alarm the index data that exceed the threshold.
three。 Data processing flow 1. Overall process
After the whole data is accessed from the original data, it is processed by ETL and entered into the underlying data table of the real-time data warehouse, and the hierarchical data is aggregated upward through the configured aggregation micro-service components. According to the index requirements of different services, the feature extraction micro-service can also be directly extracted from the data warehouse, such as Redis, ES, Mysql. Most of the data requirements can be obtained through the unified data service interface.
two。 Problems and challenges
Because the original log data is different from each business log, the dimension or metric data is not complete. Therefore, it is necessary to carry out real-time log correlation in order to obtain the query results of index data under different dimensions. And the backhaul cycle of the associated logs is different. There are business logs that complete more than 95% of the backhaul within the 10min, and there are task logs that rely on third-party backhaul, such as the activation log, and the delay window may be longer than 1 day.
And the average daily data volume of the maximum log association task is above 1 billion, so how to quickly deal with the real-time association task is first in front of us. To solve this problem, we have developed configuration related components based on Flink framework. For the index extraction of different associated logs, we also develop a configuration index extraction component to quickly extract complex log formats. The above two self-developed components will be described in detail in the following content.
1) what to do with logs whose backhaul period exceeds the associated window?
For the logs that are sent back late, we did not get the association result in the association window. We use the method of real-time + offline to complete the data flashback. For logs processed in real time, we will output the unassociated original logs to another temporary storage (Kafka). At the same time, we will continue to consume and process this set of unassociated logs, setting the number of timeout re-associations and the timeout re-association time, and then re-associate when any threshold is exceeded. In the offline part, we use Hive to calculate yesterday's all-day log and associate the full amount of the associated log table within N days, and write back the final result to replace the real-time calculated yesterday's associated data.
2) how to improve the performance of Flink tasks?
① Operator Chain
In order to perform distributed execution more efficiently, Flink will try its best to chain the subtask of operator together to form a task. Each task executes in a thread. Linking operators to task is a very effective optimization: it reduces switching between threads, reduces message serialization / deserialization, reduces data exchange in buffers, reduces latency while improving overall throughput.
During the JobGraph generation phase, Flink optimizes the optimizable operators in the code into a chain of operators (Operator Chains) to be executed in a task (a thread) to reduce the overhead of switching and buffering between threads and improve overall throughput and latency. The following is illustrated by an example on the official website.
The long brown bar in the figure indicates the waiting time, which can be found that the network waiting time greatly hinders the throughput and delay. To solve the problem of synchronous access, asynchronous mode can handle multiple requests and replies concurrently. In other words, you can continuously send requests from users a, b, c, etc., to the database, and at the same time, the reply of which request is returned first will be processed, so that there is no need to block waiting between consecutive requests, as shown on the right side of the figure above. This is the principle of the implementation of Async Imax O.
③ Checkpoint optimization
Flink implements a powerful checkpoint mechanism that not only achieves high throughput performance, but also ensures fast recovery at the Exactly Once level.
The first consideration to improve the performance of each node checkpoint is the execution efficiency of the storage engine. Of the three checkpoint state storage schemes officially supported by Flink, Memory is only used at the debug level and cannot recover data after a failure. Secondly, there are Hdfs and Rocksdb, when the data size of the Checkpoint is large, we can consider using Rocksdb as the storage of checkpoint to improve efficiency.
The second idea is the resource setting, we all know that the checkpoint mechanism is carried out on every task, then when the total state data size is constant, how to allocate and reduce the checkpoint data divided by a single task has become the key to improve the efficiency of checkpoint implementation.
Finally, incremental snapshots. Under a non-incremental snapshot, each checkpoint contains all the status data of the job. In most scenarios, there are relatively few changes in the data before and after the checkpoint, so setting incremental checkpoint will only store and calculate the difference in status between the last checkpoint and this checkpoint, reducing the time-consuming of checkpoint.
3) how to ensure the stability of the task?
In the process of task execution, a variety of problems will be encountered, resulting in abnormal or even failure of the task. Therefore, how to do a good job in the recovery work under abnormal circumstances is extremely important.
① sets restart policy
Flink supports different restart strategies to control how jobs are restarted in the event of a failure. The cluster is started with a default restart policy, which is used when no specific restart policy is defined. If a restart policy is specified when the work is submitted, the policy overrides the default policy of the cluster.
The default restart policy can be specified through Flink's configuration file flink-conf.yaml. The configuration parameter restart-strategy defines which policy is used.
Common restart strategies:
Fixed interval (Fixed delay)
Failure rate (Failure rate)
No restart (No restart).
② sets HA
Flink assigns HA configuration at task startup mainly to utilize Zookeeper for distributed coordination among all running JobManager instances. Zookeeper provides highly available distributed coordination services through leader selection and lightweight consistent state storage.
③ task monitoring and alarm platform
In the real world, we have encountered task failures caused by unstable state of the cluster. In the Flink 1.6 release, we have even encountered the fake death of a task, that is, the job resource on Yarn still exists, while the Flink task is actually dead. In order to monitor and recover these abnormal tasks, and to manage real-time task submission, alarm monitoring and task recovery, we have developed a task submission and management platform. Pull the list of Running status and Flink Job status on Yarn by Shell to compare them, monitor all tasks on the heartbeat platform, and perform operations such as alarm and automatic recovery.
④ job index monitoring
When the Flink task is running, each Operator will generate its own metrics data. For example, Source will generate metrics information such as numRecordIn, numRecordsOut and other metrics. We will collect these metrics information and display them on our visualization platform. The metric platform is shown below:
⑤ task running node monitoring
Our Flink tasks are run on Yarn, and we need to monitor the environment in which each job runs. The indicators of JobManager and TaskManager are collected. The indicators collected are jobmanager-fullgc-count, jobmanager-younggc-count, jobmanager-fullgc-time, jobmanager-younggc-time, taskmanager-fullgc-count, taskmanager-younggc-count, taskmanager-fullgc-time, taskmanager-younggc-time and so on, which are used to judge the health of the running environment of the task and to troubleshoot possible problems. The monitoring interface is as follows:
four。 Data association component 1. How to choose the association method? 1) Flink Table
From the official documentation of Flink, we know that the programming model of Flink is divided into four layers. Sql is the highest-level api, Table api is the middle layer, DataSteam/DataSet Api is the core, and stateful Streaming process layer is the underlying implementation.
At the beginning, we directly used Flink Table as the data association method. After registering the incoming DataStream as Dynamic Table, we made an association query between the two tables, as shown below:
However, after trying, it is found that when doing the associated query with a large amount of log data, we can only do the query in a small time window, otherwise it will exceed the single memory limit of the datanode node and cause an exception. However, in order to meet the delay of arrival of different business logs, this implementation is not universal.
2) Rocksdb
After that, we deal with it directly on the DataStream, associate the data in the CountWindow window, store the associated data in the Rocksdb of each datanode node after the Hash is scattered, and make use of the feature that Flink State natively supports Rocksdb to do Checkpoint to backup and restore the data within the operator. This approach is feasible, but it is subject to the fact that the physical disk of the Rocksdb cluster is non-SSD, which is more time-consuming in our actual online scenario.
3) external storage association
For example, the KV storage of Redis does improve the query speed a lot, but the larger size of a single log such as advertising log data will take up a lot of valuable machine memory resources. After research, we chose Hbase as the associated data storage scheme of our log association component.
In order to build association tasks quickly, we have developed a configuration component platform based on Flink, which can generate data association tasks and automatically submit them to the cluster by submitting configuration files. The following figure shows the process flow of task execution.
The schematic diagram is as follows:
The following figure is an execution flowchart within the associated component:
two。 Problem and Optimization 1) join Interval Join
With the increase of the number of logs, the number of logs that need to be associated may reach the order of magnitude of more than one billion or even billions per day. The early configuration of related components to generate tasks does solve most of the online business requirements, but with the further increase of related requirements, Hbase is facing great query pressure. After we have optimized the Hbase table, including rowkey, and so on, we began to iterate and optimize the associated components.
The first step is to reduce the number of Hbase queries. Using Flink Interval Join, we first complete most of the related requirements within the program, and only a small number of logs that still need to be queried will query external storage (Hbase). It has been verified that, taking the association between the request log and the experiment log as an example, the number of Interval Join query requests can be reduced by 80% when the hbase window is set at about 10s.
Semantic schematic diagram of ① Interval Join
The interval of data JOIN-for example, EXP with time 3 will JOIN at the interval of IMP time [2,4].
WaterMark-for example, if the data time of EXP is 3 and the data time of IMP is 5, then WaterMark is generated based on the actual minimum minus UpperBound, that is, Min (3Magne5)-1 = 2
Expired data-for performance and storage reasons, the expired data should be cleared, as shown in the figure. When the WaterMark is 2, the data before 2 expires and can be cleared.
② Interval Join internal implementation logic
③ Interval Join transformation
Because the native Intervak Join of Flink implements Inner Join, while what we need in our business is Left Join. The specific modifications are as follows:
Cancel the join flag bit of the right data stream
There is no state when there is join data in the left data stream.
2) dynamic monitoring of correlation rate
In the execution of tasks, unexpected situations often occur, such as missing associated data logs or exceptions caused by log format errors, resulting in a serious decline in the correlation rate of related tasks. At this time, although the associated task continues to run, it has little significance or even reverse effect on the overall data quality. When the task recovers, you also need to clear the data within the exception interval and set the Kafka Offset to the location before the exception before processing.
Therefore, we have added dynamic monitoring to the optimization of related components, as shown in the following diagram:
In the associated task, regularly detect whether the latest data is written in the specified time range Hbase. If not, it indicates that there is a problem with the writing Hbase task, and the associated task is terminated.
When the write Hbase task accumulates, the correlation rate decreases accordingly, and the association task is terminated when the correlation rate is lower than the specified threshold.
An alarm will be issued when the associated task is terminated. After the upstream task is repaired, the associated task can be restored to ensure that the associated data is not lost.
five。 Data cleaning component
In order to quickly extract the index of log data, we develop the index extraction component Logwash based on Flink computing platform. The template engine based on Freemaker is encapsulated as the parsing module of the log format to extract the log, arithmetic operation, condition judgment, replacement, loop traversal and other operations.
The following figure shows the processing flow of the Logwash component:
The component supports parsing and extracting two types of logs: text and Json. At present, the cleaning component supports nearly 100 real-time cleaning requirements for Weibo advertisements, providing third-party non-real-time computing personnel such as operation and maintenance groups with the ability to quickly extract logs.
Example of the profile section:
VI. FlinkStream component Library
The development of DataStream in Flink extracts the general logic and the same code, and generates our general component library FlinkStream. FlinkStream includes the abstraction and default implementation of Topology, the abstraction and default implementation of Stream, the abstraction and some implementation of Source, the abstraction and some implementation of Operator, the abstraction of Sink and some implementation. The task submission uniformly uses the executable Jar and the configuration file, and Jar reads the configuration file to build the corresponding topology diagram.
1.Source abstraction
Abstract Source and create abstract classes and corresponding interfaces. For existing implementations in Flink Connector, such as kafka,Elasticsearch, directly create a new class and inherit the interface, and implement the corresponding methods. For the connector that needs to be implemented by itself, directly inherit the abstract class and the corresponding interface, and implement the method. Only KafkaSource has been implemented so far.
2.Operator abstraction
Similar to the Source abstraction, we implemented an Operator abstraction based on the Stream to Stream level. Create abstract Operate classes and abstract Transform methods. For the Transform operation to be implemented, directly inherit the abstract class and implement its abstract method. The current implementation of Operator is used directly according to the document. As follows:
3.Sink abstraction
For Sink, we also create abstract classes and interfaces. Encapsulate the existing Sink in Flink Connector. Currently, Sink can be configured for data output. At present, the Sink components that are implemented and encapsulated are: Kafka, Stdout, Elasticsearch, Clickhouse, Hbase, Redis, MySQL.
4.Stream abstraction
Create the Stream abstract class and the abstract method buildStream for building StreamGraph. We implement the default Stream,buildStream method to read the Source configuration to generate DataStream, generate the topology diagram sequentially through the Operator configuration list, and write out the components through the Sink configuration.
5.Topology abstraction
For a single Stream, the logic to deal with may be relatively simple, mainly reading a Source for various operations of the data and output. For complex multi-Stream business requirements, such as multi-stream Join, multi-stream Union, Split stream and so on, we abstract the multi-stream business and produce Topology. Multiple streams can be configured at the Topology layer. For general operations, we have implemented the default Topology, which can meet the business requirements directly through the configuration file. For more complex business scenarios, users can implement Topology by themselves.
6. Configuration
We can configure the abstract components directly by writing the configuration file, constructing the running topology of the task, and specifying the configuration file when starting the task.
Body text box Flink Environment configuration, including time processing type, restart strategy, checkpoint, etc.
Topology configuration, which can configure processing logic and Sink between different Stream
Stream configuration, configurable Source,Operator list, Sink.
Examples of configurations are as follows:
Run_env: timeCharacteristic: "ProcessingTime" # ProcessingTime\ IngestionTime\ EventTime restart: # restart Policy configuration type: # noRestart, fixedDelayRestart, fallBackRestart FailureRateRestart checkpoint: # enable checkpoint type: "rocksdb" # streams: impStream: # fan economic exposure log type: "DefaultStream" config: source: type: "Kafka011" # Source is kafka011 version config: parallelism: 20 operates:-type: "StringToMap" config:-type: "SplitElement" config:...-type: "SelectElement" config:transforms:-type: "KeyBy" config:-type: "CountWindowWithTimeOut" # Window needs to be used in combination with KeyBy config:-type: "SplitStream" config:-type: "SelectStream" config:sink:-type: Kafka Config:-type: Kafka config:7. Deployment
In the real-time task management platform, create a new task, fill in the task name, select the task type (Flink) and version, upload the executable Jar file, import the configuration or write the configuration manually, fill in the JobManager and TaskManager memory configuration, fill in the parallelism configuration, select whether to retry, select whether to recover from checkpoint and other options, save the task in the task list, and observe the startup log to troubleshoot startup errors.
VII. FlinkSQL extension
The SQL language is a declarative, simple, flexible language, and Flink itself provides support for SQL. Flink 1.6 and 1.8 have limited support for SQL language, do not support table-building statements, and do not support association operations on external data. So we extend Flink SQL API through Apache Calcite, and users only need to care about how to express business requirements in SQL language.
1. Support for creating source tables
Extended support for creating source table SQL, parsing SQL statements, obtaining data source configuration information, creating corresponding TableSource instances, and registering them with Flink environment. Examples are as follows:
two。 Support for creating dimension tables
SQL is parsed by Apache Calcite, dimension table is identified by dimension table keywords, dimension table data is read asynchronously by RichAsyncFunction operator, and associated DataStream is generated by flatMap operation, and then converted to Table to register to Flink Environment. Examples are as follows:
3. Support for view creation
Using the SQLQuery method, you can create a view chart from a previous table or view, and register a new view chart with Flink Environment. The creation statement needs to be written in order. For example, if the myView2 is created from the view myView1, the myView1 creation statement should precede the myView2 statement. As follows:
4. Support to create result table
You can create a result table, parse the SQL statement, obtain configuration information, create a corresponding AppendStreamTableSink or UpsertStreamTableSink instance, and register it with Flink Environment. Examples are as follows:
5. Support for custom UDF
Support custom UDF function, inherit ScalarFunction or TableFunction. There is a corresponding UDF resource configuration file in the resources directory, and all UDF configured in the executable Jar package are registered by default. You can use it directly according to the usage.
6. Deployment
Deploy in the same way as the Flink Stream component.
eight。 Construction of Real-time data Warehouse
In order to ensure the unified export of real-time data and the unified caliber of data indicators, we design and construct Weibo advertising real-time digital warehouse according to the experience of offline data warehouse in the industry.
1. Hierarchical overview
Data warehouse is divided into three layers, from bottom to top: data introduction layer (ODS,Operation Data Store), data common layer (CDM,Common Data Model) and data application layer (ADS,Application Data Service).
Data introduction layer (ODS,Operation Data Store): the original data is stored in the data warehouse system with almost no processing, and the structure is basically consistent with the source system, which is the data accuracy of the data warehouse.
Data common layer (CDM,Common Data Model, also known as general data model layer): contains DIM dimension tables, DWD and DWS, which are processed from ODS layer data. It mainly completes data processing and integration, establishes consistent dimensions, builds reusable analysis and statistics-oriented detailed fact tables, and aggregates indicators of common granularity.
Common Dimension layer (DIM): based on the idea of dimensional modeling, the consistent dimension of the whole enterprise is established. Reduce the risk of disunity of data calculation caliber and algorithm.
Tables at the common dimension level are often referred to as logical dimension tables, and dimensions and dimension logical tables usually correspond one to one.
Common summary granularity fact layer (DWS,Data Warehouse Service): take the analyzed subject object as the modeling driver, construct a common granularity summary index fact table based on the upper application and product index requirements, and use a wide tabular means to physicochemical model. Construct statistical indicators with standard naming and consistent caliber, provide public indicators for the upper layer, and establish summary width table and detail fact table.
Tables in the common summary granularity fact layer are often referred to as summary logical tables and are used to store derived metric data.
Fine-grained fact layer (DWD,Data Warehouse Detail): take the business process as the modeling driver, and build the finest detail layer fact table based on the characteristics of each specific business process. Combined with the data usage characteristics of the enterprise, some important dimension attribute fields of the detail fact table can be appropriately redundant, that is, wide table processing.
A table with a fine-grained fact layer is often called a logical fact table.
Data application layer (ADS,Application Data Service): stores the personalized statistical index data of data products. It is generated according to the CDM and ODS layers.
two。 Detailed hierarchical model
For the original log data, the ODS layer preserves almost every log field after extraction, so that the problem can be traced back and tracked. In the CDM layer, the data of ODS is only compressed in terms of time granularity, that is, in the specified time segmentation window, aggregate operations are performed on indicators under all dimensions, without involving business operations. In the ADS layer, we will have a configured extraction micro-service to customize and extract the underlying data, and output it to the user-specified storage service.
At this point, the study on "how to understand the construction of Weibo's Flink-based real-time computing platform" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.