In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the example analysis of stream batch integrated Hive warehouse in Flink 1.11. the article is very detailed and has a certain reference value. Interested friends must read it!
First of all, congratulations to Table/SQL 's blink planner on becoming the default Planner.
Flink 1.11 stream computing combined with Hive batch data warehouse brings the ability of real-time Flink stream processing and Exactly-once to the offline data warehouse. In addition, Flink 1.11 improves the Filesystem connector of Flink itself and greatly improves the ease of use of Flink.
Data warehouse architecture
Offline data warehouse
The traditional offline data warehouse is the solution of Hive plus HDFS. Hive data warehouse has mature and stable big data analysis capability. Combined with scheduling and upstream and downstream tools, a complete data processing and analysis platform is constructed. The process is as follows:
Flume imports data into Hive data warehouse
Scheduling tool, scheduling ETL jobs for data processing
Flexible Ad-hoc queries can be performed on the table of the Hive warehouse
Scheduling tool for scheduling aggregation jobs to be output to the database in the BI tier
The problems under this process are:
The import process is not flexible enough, which should be a flexible SQL stream calculation process.
Cascade computing based on scheduling jobs, the real-time performance is too poor.
ETL cannot have streaming incremental computation.
Real-time data warehouse
According to the characteristics of offline data warehouse, with the popularity of real-time computing, more and more companies introduce real-time data warehouse. Real-time data warehouse is based on Kafka + Flink streaming, and defines the whole flow computing job, which has real-time performance of seconds or even milliseconds.
However, one of the problems with real-time data warehouse is that the historical data is only 3-15 days, so it is impossible to query Ad-hoc on it. If the offline + real-time architecture of Lambda is built, the maintenance cost, storage cost, consistency guarantee and repeated development will bring great burden.
Hive real-time
In order to solve the problem of offline data warehouse, Flink 1.11 brings the ability of real-time to Hive data warehouse and strengthens the real-time performance of each link without causing too much burden to the architecture.
Hive streaming sink
How do you import real-time data into Hive data warehouse? Flume, Spark Streaming or Flink Datastream? For a long time, here comes the streaming file sink of the Table / SQL layer, Flink 1.11 supports Filesystem connector [1] and Hive connector's streaming sink [2].
(note: the Bucket concept of StreamingFileSink in the figure is Partition in Table/SQL)
The streaming sink of the Table/SQL layer not only:
Real-time / quasi-real-time capabilities of Flink streaming support all formats of Filesystem connector (csv,json,avro,parquet,orc) support all formats of Hive table inherits all the features of Datastream StreamingFileSink: Exactly-once, support HDFS, S3.
And a new mechanism is introduced: Partition commit.
A reasonable warehouse data import that contains not only the writing of the data file, but also the visibility submission of the Partition. When a Partition finishes writing, you need to notify Hive metastore or add a SUCCESS file in the folder. The Partition commit mechanism of Flink 1.11 allows you to:
Trigger: controlling the timing of Partition submission can be determined by Watermark plus the time extracted from Partition, or by Processing time. You can control whether you want to see the unfinished Partition; as soon as possible or make sure that you can see it downstream after you finish writing the Partition. Policy: commit strategy, built-in support for SUCCESS file and Metastore commit, you can also extend the commit implementation, such as triggering Hive's analysis during the commit phase to generate statistics, or merging small files, etc.
An example:
-- use Hive DDL syntax SET table.sql-dialect=hive with Hive dialect CREATE TABLE hive_table (user_id STRING, order_amount DOUBLE) PARTITIONED BY (dt STRING, hour STRING) STORED AS PARQUET TBLPROPERTIES (--use partition to extract time, plus watermark to determine the timing of partiton commit 'sink.partition-commit.trigger'='partition-time',-- configure partition time extraction policy at hour level. In this example, the dt field is the day in yyyy-MM-dd format, and the hour is 0-23 hours. Timestamp-pattern defines how to deduce the complete timestamp 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', from these two partition fields-- configure dalay to be hourly. When watermark > partition time + 1 hour, it will commit the partition' sink.partition-commit.delay'='1 hackers.-- partitiion commit's policy is: update metastore (addPartition) first. Then write the SUCCESS file 'sink.partition-commit.policy.kind'='metastore,success-file') SET table.sql-dialect=default CREATE TABLE kafka_table (user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP (3), WATERMARK FOR log_ts AS log_ts-INTERVAL'5' SECOND)-can be combined with Table Hints to dynamically specify table properties [3] INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT (log_ts, 'yyyy-MM-dd'), DATE_FORMAT (log_ts,' HH') FROM kafka_table
Hive streaming source
There are a large number of Hive tasks in the ETL warehouse, which are often run periodically through scheduling tools. There are two main problems in doing so:
The real-time performance is not strong, and the minimum scheduling is the hour level. The process is complex, there are many components, and problems are easy to occur.
For these offline ETL jobs, Flink 1.11 developed real-time Hive streaming reading for this purpose, supporting:
Partition table, monitor Partition generation, incrementally read new Partition. Non-Partition table, monitor the generation of new files in the folder and read new files incrementally.
You can even use a 10-minute partition strategy. Using Flink's Hive streaming source and Hive streaming sink can greatly improve the real-time performance of Hive data warehouse to quasi-real-time minute level [4] [5]. While being real-time, it also supports full Ad-hoc queries for Table to improve flexibility.
SELECT * FROM hive_table/*+ OPTIONS ('streaming-source.enable'='true','streaming-source.consume-start-offset'='2020-05-20') * /
Real-time data association Hive table
After the release of Flink's integration with Hive, one of the most feedback we received from users was that we wanted to be able to associate Flink's real-time data with offline Hive tables. Therefore, in Flink 1.11, we support temporal join between real-time tables and Hive tables [6]. Following the example in the official Flink documentation, assume that Orders is a real-time table and LatestRates is a Hive table. Users can temporal join with the following statement:
SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
Currently, temporal join with Hive tables only supports processing time. We will cache the data of Hive tables in memory and update the cached data at regular intervals. Users can control the interval of cache updates with the parameter "lookup.join.cache.ttl", which is one hour by default.
"lookup.join.cache.ttl" needs to be configured in the property of the Hive table, so each table can have a different configuration. In addition, because the entire Hive table needs to be loaded into memory, it is currently only applicable to scenarios where the Hive table is small.
Hive enhancement
Hive Dialect syntax compatibility
Flink on Hive users do not use DDL very well, mainly because:
DDL is further improved in Flink 1.10, but because of the difference in metadata semantics between Flink and Hive, the usability of manipulating Hive metadata through Flink DDL is poor and can only cover a few application scenarios. Users who use Flink to dock with Hive often need to switch to Hive CLI to execute DDL.
In order to solve the above two problems, we propose FLIP-123 [7] to provide users with Hive syntax compatibility through Hive Dialect. The ultimate goal of this function is to provide users with a similar Hive CLI/Beeline experience, so that users do not need to switch between Flink and Hive's CLI, and can even directly migrate part of the Hive scripts to Flink for execution.
In Flink 1.11, Hive Dialect can support most commonly used DDL, such as CREATE/ALTER TABLE, CHANGE/REPLACE COLUMN, ADD/DROP PARTITION, and so on. To do this, we implemented a separate parser,Flink for Hive Dialect that decides which parser to use to parse the SQL statement based on the user-specified Dialect. The user can specify the SQL Dialect to use through the configuration item "table.sql-dialect". Its default value is "default", which is the native Dialect of Flink, and Hive Dialect is turned on when it is set to "hive". For SQL users, you can set "table.sql-dialect" in the yaml file to specify the initial Dialect of the session, or you can dynamically adjust the Dialect you need to use through the set command without restarting the session.
The specific features currently supported by Hive Dialect can be found in the official documentation of FLIP-123 or Flink. In addition, some design principles and considerations for the use of this feature are as follows:
Hive Dialect can only be used to manipulate Hive tables, not Flink native tables (such as Kafka, ES tables), which means that Hive Dialect needs to be used with HiveCatalog. When using Hive Dialect, some of the syntax of the original Flink may not be available (such as type aliases defined by Flink), and you can dynamically switch to the default Dialect when you need to use the Flink syntax. Hive Dialect's DDL syntax definition is based on Hive's official documentation, while the syntax may vary slightly between different Hive versions, requiring some adjustments by the user. The syntax implementation of Hive Dialect is based on Calcite, while Calcite and Hive have different reserved keywords. Therefore, some keywords that can be directly used as identifiers in Hive, such as "default", may need to be escaped with "`" in Hive Dialect.
Vectorization reading
In Flink 1.10, Flink already supports vectorization read support for ORC (Hive 2 +), but this is very limited. For this reason, Flink 1.11 adds more vector support:
ORC for Hive 1.x [8] Parquet for Hive 1,2,3 [9]
In other words, all versions of Parquet and ORC vectorization support have been completed, which is on by default and provides switches.
Simplify Hive dependency
In Flink 1.10, the required Hive-related dependencies are listed in the Flink documentation, and users are recommended to download them themselves. But this is still a bit troublesome, so in 1.11, Flink provides built-in dependency support [10]:
Dependent version of flink-sql-connector-hive-1.2.2_2.11-1.11.jar:Hive 1. Dependent version of flink-sql-connector-hive-2.2.0_2.11-1.11.jar:Hive 2.0-2.2. Flink-sql-connector-hive-2.3.6_2.11-dependent version of 1.11.jar:Hive 2.3. Dependent version of flink-sql-connector-hive-3.1.2_2.11-1.11.jar:Hive 3.
Now, all you need is a separate package, and then fix HADOOP_CLASSPATH, you can run Flink on Hive.
Flink enhancement
In addition to Hive-related features,Flink 1.11, a large number of other enhancements to batch integration have been completed.
Flink Filesystem connector
Flink table has long supported only one csv file system table, and it does not support Partition, and its behavior is somewhat inconsistent with big data's computing intuition in some aspects.
In Flink 1.11, the implementation of the entire Filesystem connector is reconstructed [1]:
Combined with Partition, Filesystem connector now supports all the semantics of Partition in SQL, DDL of Partition, Partition Pruning, static / dynamic Partition insertion, and overwrite insertion. Support all kinds of Formats:CSVJSONAparch AVROApache ParquetApache ORC. Support Batch read and write. Support Streaming sink, also support the above Hive support Partition commit, support to write Success files.
Example:
CREATE TABLE fs_table (user_id STRING, order_amount DOUBLE, dt STRING, hour STRING) PARTITIONED BY (dt, hour) WITH ('connector'='filesystem',' path'='...', 'format'='parquet',' partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 hacks,' sink.partition-commit.policy.kind'='success-file'))
-- stream environment or batch environmentINSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT (log_ts, 'yyyy-MM-dd'), DATE_FORMAT (log_ts,' HH') FROM kafka_table
-- query SELECT * FROM fs_table WHERE dt='2020-05-20' and hour='12' via Partition
Introduction of Max Slot
Yarn perJob or session mode is infinitely expanded before 1.11, there is no way to limit its resource use, it can only be restricted by ways such as Yarn queue. However, the traditional batch jobs are actually large concurrency, running on limited resources, and some of them run periodically. For this reason, Flink 1.11 introduces the configuration of Max Slot [11] to limit the use of Yarn application resources.
Slotmanager.number-of-slots.max
Defines the maximum number of Slot allocated by the Flink cluster. This configuration option is used to limit resource consumption for batch workloads. It is not recommended to configure this option for streaming jobs. If there is not enough Slot, the streaming job may fail.
The above is all the contents of the article "sample Analysis of Integrated Hive data Storage in Flink 1.11". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.