In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to write the Flink Job main program". In the daily operation, I believe many people have doubts about how to write the Flink Job main program. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to write the Flink Job main program". Next, please follow the editor to study!
1. Flink real-time application scenario
At present, the main application scenarios of Flink in the field of real-time computing can be divided into four types, namely, real-time data synchronization, streaming ETL, real-time data analysis and complex event processing.
II. Real-time data architecture
Real-time data architecture is roughly divided into three types of scenarios: traffic class, business class and feature class, each of which is different.
In the data model, the traffic class is a flattened wide table, the business data warehouse is more paradigm-based modeling, and the characteristic data is KV storage.
In terms of data sources, the data sources of the traffic warehouse are generally log data, the data sources of the business warehouse are business binlog data, and the data sources of the characteristic warehouse are various.
In terms of data volume, traffic and characteristic warehouses are huge amounts of data, more than 1 billion a day, while business warehouses generally have data of 1 million to 10 million a day.
In terms of data update frequency, traffic data is rarely updated, then business and characteristic data are updated more, traffic data generally pay attention to timing and trend, business data and characteristic data pay attention to state change.
In terms of data accuracy, the requirements for traffic data are lower, while the requirements for business data and feature data are higher.
1. The overall architecture of real-time data system.
Thoughts on the Construction of Real-time data Architecture system
The whole real-time data architecture is divided into five layers, namely, access layer, storage layer, computing layer, platform layer and application layer. The above figure is only an overview of the overall architecture, and the specific things to be done in each layer are described in detail through text.
1) access layer: this layer uses various data access tools to collect data from various systems, including binlog logs, buried point logs, and back-end service logs, which will be collected into Kafka; these data will not only participate in real-time computing, but also participate in offline computing to ensure that the real-time and offline raw data are unified.
2) Storage layer: this layer stores the original data and the detail data after cleaning and association. Based on the unified real-time data model layering concept, the data of different application scenarios are stored in storage engines such as Kafka, HDFS, Kudu, Clickhouse, Hbase, Redis, Mysql and so on. The specific data types stored by various storage engines are described in detail in the real-time data model layering section.
3) Computing layer: the computing layer mainly uses four kinds of computing engines: Flink, Spark, Presto and the computing power of ClickHouse. The Flink computing engine is mainly used for real-time data synchronization, streaming ETL, and second-level real-time index calculation scenarios of key systems. Spark SQL is mainly used for quasi-real-time index calculation scenarios of complex multi-dimensional analysis. Presto and ClickHouse mainly meet the scenarios of multi-dimensional self-help analysis and low query response time.
4) platform layer: three main aspects of work are done in the platform layer, namely, providing unified query services, metadata and indicator management, data quality and consanguinity.
5) Application layer: unified query service is used to support various business line data scenarios, including real-time large screen, real-time data products, real-time OLAP, real-time features and so on.
The detailed work of the platform layer is as follows:
Unified query service supports query from underlying detail data to aggregation layer data, and supports SQL-based query of data in KV storage such as Redis, Hbase, etc.
Metadata and index management: mainly uniformly manage the real-time Kafka table, Kudu table, Clickhouse table and Hive table, standardize the naming of the table by the naming way of the table in the warehouse model, clarify the field meaning and user of each table, and the index management is to manage all the real-time indicators through the index management system as far as possible, define the calculation caliber, and provide it to different business parties for use.
Data quality and consanguinity analysis: data quality is divided into two parts: platform monitoring and data monitoring, while consanguinity analysis mainly analyzes the dependence of real-time data and real-time tasks.
The first part of the platform monitoring part is to monitor the running status of the task, alarm the abnormal task and automatically pull up and restore the task according to the set parameters, and the other is to monitor the delay of Kafka consumption processing for Flink tasks and give a real-time alarm.
Data monitoring is divided into two parts:
First of all, streaming ETL is an important part of the whole process of real-time data flow. Various dimension tables will be associated in the process of ETL. During real-time association, the abnormal log will be reported to the monitoring platform regularly for records that are not associated. When the number reaches a certain threshold, the alarm will be triggered.
Secondly, some of the key real-time indicators use lambda architecture, so it is necessary to compare the historical real-time indicators with the data calculated by offline hive, provide data quality monitoring of real-time data, and alarm the index data that exceed the threshold.
In order to cooperate with data monitoring, it is necessary to do real-time data consanguinity, mainly to sort out the data dependencies in the real-time data system, as well as the dependencies of real-time tasks, from the underlying ODS to DW to DM, and which models are used in the DM layer to connect the whole chain, so that the associated downstream can be notified when the data / task is adjusted actively, and the problem of consanguinity location can be used when the index is abnormal. At the same time, based on the analysis of consanguinity, we can also evaluate the application value of the data and calculate the cost of the data.
2. Real-time data model layering
Thoughts on the Construction of Real-time data Architecture system
Considering the efficiency problem, the offline data warehouse will generally adopt the way of space for time, and there will be more hierarchical division; the real-time data warehouse will take into account the real-time problem, and the less layering, the better, and it also reduces the possibility of errors in the intermediate process, so it is divided into four layers.
1) ODS layer
Operate the data layer, save the original data, structure the unstructured data, clean it slightly, and almost never delete the original data.
The data in this layer mainly comes from the binlog log, buried point log and application log of the business database.
Binlog logs are monitored by canal and written to message queue Kafka, corresponding to burial points and application logs, nginx and tomcat logs are collected through Filebeat and reported to Kafka.
In addition to being stored in Kafka, the binlog logs of the business database will also be written to HDFS, Kudu and other storage engines through Flink, and landed in the 5min Hive table to query the detailed data, and also provided to the offline data warehouse as its original data. In addition, for the buried log data, because the ODS layer is unstructured, it is not necessary to land.
2) DWD layer
In the real-time detail data layer, the business process is used as the modeling driver, and the finest-grained detail layer fact table is constructed based on the characteristics of each specific business process; it can be combined with the data usage characteristics of the enterprise. some important dimension attribute fields of the detail fact table are appropriately redundant, that is, wide table processing.
The data in this layer comes from the ODS layer, and is obtained through simple Streaming ETL. The processing of the binlog log is mainly for simple data cleaning, dealing with data drift, and possibly Streaming Join for the tables of multiple ODS layers. The main purpose of the traffic log is to do some general ETL processing to structure the unstructured data and associate the general dimension fields.
The data of this layer is stored in the message queue Kafka. At the same time, the Hive 5min table is written in real time with Flink to query the detailed data, and at the same time it is provided to the offline data warehouse as its original data.
3) DIM layer
The common dimension layer, based on the idea of dimensional modeling, establishes the consistency dimension of the whole business process, and reduces the risk of data computing caliber and algorithm disunity.
DIM layer data comes from two parts: one is obtained by real-time processing of ODS layer data by Flink programs, and the other is obtained by offline tasks.
DIM layer dimension data mainly uses three storage engines: MySQL, Hbase and Redis. You can use MySQL when the dimension table data is less. For the case where the size of a single data is relatively small and the query QPS is relatively high, you can use Redis storage to reduce the machine memory resource footprint. For scenarios where the amount of data is large and is not particularly sensitive to dimension table data changes, you can use HBase storage.
4) DM layer
① data Mart layer
Building a public summary layer based on the concept of data domain + business domain is more complex for the DM layer. It is necessary to comprehensively consider the requirements for data landing and specific query engines to choose different storage methods, which are divided into light summary layer and high summary layer. At the same time, high summary layer data is used for relatively simple front-end KV queries to improve query performance, such as real-time large screen, real-time reports, etc. The timeliness of the data is required to be in seconds, and the wide table in the mild summary layer Kafka is written to the OLAP storage engine in real time, which is used in the complex OLAP query scenarios of the front-end products to meet the needs of self-help analysis and the output of complex reports. The timeliness of the data can be tolerated to the level of minutes.
② mild aggregation layer
The light summary layer is obtained by the detail layer through Streaming ETL, which mainly exists in the form of wide table. The business detail summary is obtained by the business fact list and dimension table join, and the traffic detail summary is obtained by splitting the traffic log according to the business line and the dimension table join.
The data storage in the mild summary layer is relatively diversified. Firstly, the dimension table needed by the detailed data in the DWD layer Kafka is consumed by Flink in real time, and then written into the Kafka of this layer in real time, and stored in Json or PB format.
At the same time, the summary data of multi-dimensional business is real-time written to Kudu through Flink, which is used to query detailed data and more complex multi-dimensional data analysis requirements. For traffic data, it is written to HDFS and ClickHouse through Flink for complex multi-dimensional data analysis, while real-time characteristic data is written to HDFS in real time through Flink join dimension table for offline ETL consumption downstream.
For wide-table data of landing Kudu and HDFS, Spark SQL can be used to do minute-level pre-calculation to meet the business needs of complex data analysis and provide data with minute-level delay, thus accelerating the delay of offline ETL process. in addition, with the continuous improvement of the ecological integration of Flink SQL and Hive, we can try to use Flink SQL to do offline ETL and OLAP computing tasks (Flink stream computing is based on memory computing, which is very similar to presto. This makes it possible to become an OLAP computing engine), using a set of computing engines to solve real-time offline requirements, thus achieving batch-stream unification.
For the business detail data in Kudu and the traffic detail data in ClickHouse, it can also meet the personalized data analysis needs of the business side. We can use the powerful OLAP computing engine to query the detail data in real time and give the results within the response time of 10 seconds. This kind of demand is the real-time OLAP requirement with high flexibility.
③ height aggregation layer
The high aggregation layer is written to the storage engine by the detail data layer or the light aggregation layer through aggregation calculation, which produces part of the real-time data index requirements, and the flexibility is relatively poor.
The calculation engine uses Flink Datastream API and Flink SQL. According to different requirements, the metrics storage engine can directly put the common simple metrics summary model in MySQL, and the models with more dimensions and large write updates will be placed in HBase. Another is to sort, query QPS, response time is very high, and does not need persistent storage, such as online TopN goods during promotion activities, directly stored in Redis.
For second-level metrics, a mixture of Lambda and Kappa architecture is required. Most real-time metrics are calculated using Kappa architecture, and a few key metrics (such as amount-related) are reprocessed with batch processing using Lambda architecture to add a proofreading process.
In general, the DM layer provides three kinds of timeliness data:
First of all, the second-level real-time indicators pre-calculated by real-time computing engines such as Flink, which requires high timeliness of data, and is used for real-time reports with large real-time screen and uncomplex computing dimensions.
The second is the quasi-real-time index with the delay of Spark SQL pre-calculation in minutes, which meets some complex data analysis scenarios that do not require too much timeliness of data, and may involve the join of multiple fact tables, such as sales attribution.
The last one is a complex multi-dimensional data analysis scenario that does not require pre-calculation and ad-hoc query. This kind of demand is more personalized and flexible. If the performance of the OLAP computing engine is strong enough, it can fully meet the needs of second-level computing. The proportion of second-level real-time data provided to the public and the other two kinds of quasi-real-time data is about 3:7, and the vast majority of business requirements give priority to quasi-real-time computing or ad-hoc, which can reduce the use of resources, improve data accuracy, and meet complex business scenarios in a more flexible way.
3. Construction mode of real-time data system.
The whole real-time data system is divided into two construction methods, namely real-time and quasi-real-time (their implementation is based on stream computing engine and ETL, OLAP engine respectively, and the data timeliness is seconds and minutes respectively.
The main results are as follows: 1) in terms of scheduling overhead, quasi-real-time data is a batch process, so it still needs the support of scheduling system, and the scheduling frequency is high, while real-time data has no scheduling overhead.
2) in terms of business flexibility, because quasi-real-time data is implemented based on ETL or OLAP engine, the flexibility is better than that of stream-based computing.
3) in terms of tolerance for late arrival of data, quasi-real-time data can be fully calculated based on data within a cycle, so the tolerance for late arrival of data is relatively high, while incremental calculation is used for real-time data. The tolerance for late data is lower.
4) in terms of applicable scenarios, quasi-real-time data is mainly used in scenarios with real-time requirements but not too high, involving multi-table association and frequent business changes, such as real-time analysis of transaction types. Real-time data is more suitable for scenarios with high real-time requirements and large amount of data, such as real-time characteristics, real-time analysis of traffic types and other scenarios.
4. Development of streaming and batch integrated real-time data architecture.
From the concept of data warehouse put forward by Inmon in 1990 to today, big data architecture has experienced the flow and batch integration architecture brought by the initial offline big data architecture, Lambda architecture, Kappa architecture and Flink. The essence of data architecture technology is to develop to the direction of integration of flow and approval, so that users can complete real-time computing at the most natural and minimum cost.
1) offline big data architecture: data sources are imported into offline repositories by offline means. Downstream applications choose to read DM directly or add a layer of data services, such as MySQL or Redis, according to business needs. The data storage engine is HDFS/Hive,ETL tools, which can be MapReduce scripts or HiveSQL. From the model level, the data warehouse is divided into operation data layer ODS, data warehouse detail layer DWD, data Mart layer DM.
2) Lambda architecture: with the development of big data's application, people gradually put forward requirements for the real-time performance of the system. In order to calculate some real-time indicators, a real-time calculation link is added to the original offline data warehouse, and the data source is modified by streaming (that is, sending the data to the message queue), real-time calculation to subscribe to the message queue, the calculation of the index increment is directly completed, and pushed to the downstream data service. The merging of offline-real-time results is completed by the data service layer.
3) Kappa architecture: although the Lambda architecture meets the real-time requirements, it brings more development and operation and maintenance work. The architecture background is that the stream processing engine is not perfect, and the stream processing results can only be used as a temporary and approximate value for reference. Later, with the emergence of stream processing engines such as Flink, stream processing technology became mature. In order to solve the problem of two sets of codes, LickedIn's Jay Kreps put forward the Kappa architecture.
4) flow-batch integrated architecture: the perfect implementation of the stream-batch integrated architecture is the dual-engine architecture of stream computing and interactive analysis, in which flow computing is responsible for the basic data, while the interactive analysis engine is the center. The flow computing engine performs real-time ETL work on the data, which reduces the latency of the ETL process compared with offline, while the interactive analysis engine has its own storage and collaborative optimization through computing and storage. Achieve high write TPS, high query QPS and low query latency, so as to achieve real-time and SQL of the whole link, so that real-time analysis and on-demand analysis can be realized in batch mode, and can quickly respond to business changes. The two cooperate to achieve the effect of 1 + 1 > 2. This architecture has very high requirements for interactive analysis engine, which may be a focus and direction of the development of large database technology in the future.
In order to meet the more complex multi-dimensional real-time data analysis needs of the business side, the author currently introduces Kudu as an OLAP storage engine in data development, and the calculation scheme of using Presto + Kudu for business data such as orders is also exploring the feasibility of streaming and batch integrated architecture in the field of real-time data analysis. In addition, the current hot data lake technologies, such as Delta lake and Hudi, support upsert updates on HDFS. With the maturity of streaming write and SQL engine support, a set of storage engine can be used to solve real-time and offline data requirements in the future, thus reducing the cost of multi-engine operation and maintenance development.
3. Flink SQL calculates UV index in real time.
The last part introduces how to build a real-time data system from a macro level, which is very unapproachable. Maybe all you need is a specific case to understand what to do, then use an approachable case to introduce how to calculate UV data in real time.
As we all know, in the Internet company of ToC, UV is a very important indicator, which will have a great impact on the timely decision-making of bosses, businesses, and operations. in an e-commerce company, the author's main job is to calculate UV, sales and other real-time data, and the experience is particularly profound, so use a simple demo to demonstrate how to use Flink SQL to consume PV data in Kafka, calculate the UV index in real time and write it into Hbase.
1. Kafka source data parsing
After the buried data is reported and cleaned by FileBeat, the PV data is written to the downstream Kafka in ProtoBuffer format. When consuming, the first step is to deserialize the data in PB format to the Row type that Flink can recognize. Therefore, you need to customize the implementation of DeserializationSchema API. The following code is specific. Here, only the mid and event time time_local of the PV used for calculation are extracted, and the log_date field is obtained from its parsing:
Public class PageViewDeserializationSchema implements DeserializationSchema {public static final Logger LOG = LoggerFactory.getLogger (PageViewDeserializationSchema.class); protected SimpleDateFormat dayFormatter; private final RowTypeInfo rowTypeInfo; public PageViewDeserializationSchema (RowTypeInfo rowTypeInfo) {dayFormatter = new SimpleDateFormat ("yyyyMMdd", Locale.UK); this.rowTypeInfo = rowTypeInfo;} @ Override public Row deserialize (byte [] message) throws IOException {Row row = new Row (rowTypeInfo.getArity ()); MobilePage mobilePage = null; try {mobilePage = MobilePage.parseFrom (message); String mid = mobilePage.getMid () Row.setField (0, mid); Long timeLocal = mobilePage.getTimeLocal (); String logDate = dayFormatter.format (timeLocal); row.setField (1, logDate); row.setField (2, timeLocal);} catch (Exception e) {String mobilePageError = (mobilePage! = null)? MobilePage.toString (): "; LOG.error (" error parse bytes payload is {}, pageview error is {} ", message.toString (), mobilePageError, e);} return null;}
2. Write the main program of Flink Job
After parsing the PV data to the Row type of Flink, it is very simple to write the main function and write SQL to count the UV metrics. The code is as follows:
Public class RealtimeUV {public static void main (String [] args) throws Exception {/ / step1 parses the required Kakfa, Hbase configuration information, checkpoint parameter information Map config = PropertiesUtil.loadConfFromFile (args [0]); String topic = config.get ("source.kafka.topic"); String groupId = config.get ("source.group.id"); String sourceBootStrapServers = config.get ("source.bootstrap.servers"); String hbaseTable = config.get ("hbase.table.name") from the properties configuration file String hbaseZkQuorum = config.get ("hbase.zk.quorum"); String hbaseZkParent = config.get ("hbase.zk.parent"); int checkPointPeriod = Integer.parseInt (config.get ("checkpoint.period")); int checkPointTimeout = Integer.parseInt (config.get ("checkpoint.timeout")); StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment (); / / step2 sets Checkpoint related parameters for Failover fault tolerance sEnv.getConfig (). RegisterTypeWithKryoSerializer (MobilePage.class, ProtobufSerializer.class); sEnv.getCheckpointConfig (). SetFailOnCheckpointingErrors (false) SEnv.getCheckpointConfig (). SetMaxConcurrentCheckpoints (1); sEnv.enableCheckpointing (checkPointPeriod,CheckpointingMode.EXACTLY_ONCE); sEnv.getCheckpointConfig (). SetCheckpointTimeout (checkPointTimeout); sEnv.getCheckpointConfig (). EnableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); / / step3 uses Blink planner, creates TableEnvironment, and sets the state expiration time to avoid Job OOM EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build (); StreamTableEnvironment tEnv = StreamTableEnvironment.create (sEnv, environmentSettings) TEnv.getConfig (). SetIdleStateRetentionTime (Time.days (1), Time.days (2)); Properties sourceProperties = new Properties (); sourceProperties.setProperty ("bootstrap.servers", sourceBootStrapServers); sourceProperties.setProperty ("auto.commit.interval.ms", "3000"); sourceProperties.setProperty ("group.id", groupId) / / step4 initializes the Schema information of KafkaTableSource. The author uses register TableSource to register the source table in Flink instead of register DataStream, because I want to be familiar with how to register KafkaTableSource into Flink TableSchema schema = TableSchemaUtil.getAppPageViewTableSchema (); Optional proctimeAttribute = Optional.empty (); List rowtimeAttributeDescriptors = Collections.emptyList (); Map fieldMapping = new HashMap (); List columnNames = new ArrayList (); RowTypeInfo rowTypeInfo = new RowTypeInfo (schema.getFieldTypes (), schema.getFieldNames ()) ColumnNames.addAll (Arrays.asList (schema.getFieldNames ()); columnNames.forEach (name-> fieldMapping.put (name, name)); PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema (rowTypeInfo); Map specificOffsets = new HashMap (); Kafka011TableSource kafkaTableSource = new Kafka011TableSource (schema, proctimeAttribute, rowtimeAttributeDescriptors, Optional.of (fieldMapping), topic, sourceProperties, deserializationSchema, StartupMode.EARLIEST, specificOffsets); tEnv.registerTableSource ("pageview", kafkaTableSource) / / step5 initializes Hbase TableSchema, writes parameters, and registers it with Flink HBaseTableSchema hBaseTableSchema = new HBaseTableSchema (); hBaseTableSchema.setRowKey ("log_date", String.class); hBaseTableSchema.addColumn ("f", "UV", Long.class); HBaseOptions hBaseOptions = HBaseOptions.builder () .setTableName (hbaseTable) .setZkQuorum (hbaseZkQuorum) .setZkNodeParent (hbaseZkParent) .build (); HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder () .setBufferFlushMaxRows (1000) .setBufferFlushIntervalMillis (1000) .build () HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink (hBaseTableSchema, hBaseOptions, hBaseWriteOptions); tEnv.registerTableSink ("uv_index", hBaseSink) / / step6 calculates the current day's UV metric sql in real time. The simplest group by agg is used here, but no minibatch or window is used. When optimizing large amounts of data, it is best to use the latter two methods: String uvQuery = "insert into uv_index" + "select log_date,\ n" + "ROW (count (distinct mid) as UV)\ n" + "from pageview\ n" + "group by log_date"; tEnv.sqlUpdate (uvQuery) / / step7 executes Job sEnv.execute ("UV Job");}} at this point, the study on "how to write a Flink Job main program" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.