In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Editor to share with you Flink 1.11 and Hive batch integrated warehouse example analysis, I believe that most people do not know much, so share this article for your reference, I hope you will learn a lot after reading this article, let's go to know it!
1. Background of Flink and Hive integration
Why do you want to integrate Flink and Hive? The original intention was that we wanted to tap Flink's capabilities in batch processing. As we all know, Flink is already a successful engine in stream computing and is used by a large number of users. Batch computing is a special case of stream processing in Flink's design philosophy. This means that if Flink does a good job in stream computing, its architecture can also support batch computing scenarios. SQL is an important entry point in the scenario of batch computing. Because students who do data analysis are more accustomed to using SQL for development than to write programs like DataStream or DataSet.
The SQL engine of the Hadoop ecosystem, Hive is a de facto standard. Some of the functions of Hive are used in most user environments to build positions. Some of the newer SQL engines, such as Spark SQL and Impala, actually provide the ability to integrate with Hive. In order to easily connect with the existing usage scenarios of users, we think that for Flink, interfacing with Hive is also an indispensable feature.
As a result, we began to provide integration with Hive in Flink 1.9. Of course, in version 1.9, this feature was released as a trial version. With Flink version 1.10, the integration with Hive is available for production. At the same time, when Flink 1.10 was released, we used 10TB's TPC-DS test set to compare Flink with Hive on MapReduce. The results are as follows:
The blue box indicates the time taken by Flink, and the orange box indicates the time taken by Hive on MapReduce. The end result is that Flink has about seven times better performance for Hive on MapReduce. So it is verified that Flink SQL can well support the scenario of batch computing.
Next, the design architecture of Flink docking Hive is introduced. Several levels are required when interfacing with Hive, which are:
Access to Hive metadata
Read and write Hive table data
Production Ready
1. Accessing Hive metadata
Students who have used Hive should know that the metadata of Hive is managed through Hive Metastore. So it means that Flink needs to communicate with Hive Metastore. In order to better access Hive metadata, a set of newly designed Catalog API is proposed on the Flink side.
This new interface is a universal design. It is not just for the purpose of interfacing Hive metadata, but theoretically it can be used to interconnect metadata of different external systems.
And in a Flink Session, multiple Catalog can be created, each Catalog corresponding to an external system. You can specify which Catalog is defined in the Flink Table API or if you are using SQL Client in the Yaml file. These Catalog are then loaded when SQL Client creates the TableEnvironment. TableEnvironment manages these different Catalog instances through CatalogManager. In this way, SQL Client can use these Catalog to access the metadata of the external system in the subsequent process of submitting SQL statements.
Two Catalog implementations are listed in the figure above. One is GenericlnMemoryCatalog, which stores all the metadata in memory on the Flink Client side. Its behavior is similar to that of Flink before the advent of the Catalog interface. That is, the life cycle of all metadata is the same as the Session cycle of SQL Client. When the Session ends, the metadata created in the Session is automatically lost.
The other is HiveCatalog, which is highlighted by docking Hive. Behind the HiveCatalog is an instance of Hive Metastore, which communicates with Hive Metastore to read and write metadata. In order to support multiple versions of Hive, the API of different versions of Hive Metastore may be incompatible. So another HiveShim is added between HiveCatalog and Hive Metastore, and different versions of Hive can be supported through HiveShim.
On the one hand, HiveCatalog allows Flink to access Hive's own metadata, on the other hand, it also provides Flink with the ability to persist metadata. That is, HiveCatalog can be used to store both the metadata of Hive and the metadata used by Flink. For example, if you create a Kafka table in Flink, the table can also be stored in HiveCatalog. This provides Flink with the ability to persist metadata. Before HiveCatalog, there was no persistence capability.
two。 Read and write Hive table data
With the ability to access Hive metadata, another important aspect is reading and writing Hive table data. Hive's table is stored in Hadoop's file system, which is a HDFS and may also be other file systems. It is theoretically possible to store tables of Hadoop as long as they implement the file system interface of Hive.
In Flink:
HiveTableSource is implemented when reading data
HiveTableSink is implemented when writing data
And one of the principles of the design is that we want to reuse Hive's original Input/Output Format, SerDe and so on as much as possible to read and write Hive data. The main benefits of this are two points, one is that reuse can reduce the workload of development. Another benefit of reuse is to ensure write data compatibility with Hive as much as possible. The target is the data written by Flink, and the Hive must be able to read normally. On the contrary, the data written by Hive can also be read normally by Flink.
3. Production Ready
In Flink 1.10, the function of docking Hive has been implemented with Production Ready. The main reason for implementing Production Ready is that it is functionally complete. The specific functions are as follows:
II. New features in Flink 1.11
Some of the new features of docking Hive in Flink 1.11 are described below.
1. Simplified dependency management
The first thing to do is to simplify dependency management using Hive connector. One of the pain points of Hive connector is that you need to add several jar package dependencies, and the jar packages you need to add vary depending on the version of Hive you use. For example, the following figure:
The first figure shows the jar package that needs to be added for the Hive version 1.0.0 you are using. The second figure shows the jar package that needs to be added with Hive version 2.2.0. As you can see, no matter from the number of jar packages, versions, and so on, different Hive versions add different jar packages. So if you don't read the document carefully, it can easily lead to dependency errors added by the user. Once an error is added, such as a small addition or the wrong version, some strange and incomprehensible errors will be reported. This is also one of the most exposed problems for users when using Hive connector.
So we hope to simplify dependency management and provide users with a better experience. Specifically, starting with Flink version 1.11, some pre-typed Hive dependency packages will be provided:
Users can choose the corresponding dependent package according to their own version of Hive.
If the Hive you are using is not an open source version of Hive, you can still add a single jar package yourself in the 1.10 way.
2. Enhancement of Hive Dialect
Hive Dialect was introduced in Flink 1.10, but few people use it because this version of Hive Dialect is weak. The only function is whether or not to allow the switch to create a partition table. That is, if Hive Dialect is set, you can create a partition table in Flink SQL. If it is not set, creation is not allowed.
Another key point is that it does not provide compatibility with Hive syntax. If Hive Dialect is set and you can create a partitioned table, but the DDL that creates the partitioned table is not the syntax of Hive.
The function of Hive Dialect is enhanced in Flink 1.11. The goal of the enhancement is to expect users to have an experience similar to that of using Hive CLI or Beeline when using Flink SQL Client. That is, in using Flink SQL Client, you can write some Hive-specific syntax. Or when users migrate to Flink, the Hive script does not need to be modified at all.
To achieve the above goals, the following improvements have been made in Flink 1.11:
Dialect has been parameterized. Currently, the parameters support both default and hive values. Default is the Dialect of Flink itself, and hive is the Dialect of Hive.
Both SQL Client and API can be used.
Can be flexible to do dynamic switching, switching is at the statement level. For example, after Session is created, the first statement is set to default if you want to write it in Flink's Dialect. After executing a few lines of statements, if you want to write in Hive's Dialect, you can set it to hive. During the switch, there is no need to restart Session.
Compatible with Hive commonly used DDL and basic DML.
Provide a similar experience to Hive CLI or Beeline.
3. Turn on Hive Dialect
The figure above shows how to turn on Hive Dialect in SQL Client. The initial Dialect can be set in SQL Client. You can set it in the Yaml file, or you can switch dynamically after SQL Client is up.
You can also turn on Hive Dialect by Flink Table API:
You can see that you can get the Config through TableEnvironment and then set it to open.
4. Syntax supported by Hive Dialect
The syntax of Hive Dialect is mainly enhanced in DDL. Because it is not very available to use Flink SQL to write DDL to manipulate Hive metadata in 1.10, so to solve this pain point, focus on the DDL direction.
Currently, the supported DDL is as follows:
5. Streaming data is written to Hive
In Flink 1.11, we also do the streaming data scene and the function combined with Hive to help the real-time transformation of Hive data warehouse through the combination of Flink and Hive.
Streaming data is written into Hive with the help of Streaming File Sink, which is completely SQL-oriented and does not require user code development. Streaming data writing to Hive also supports partitioned and non-partitioned tables. Hive data warehouses are generally offline data, and users have high requirements for data consistency, so Exactly-Once semantics is supported. There is a delay of about 5-10 minutes for streaming data to write Hive. If you want the latency to be as low as possible, one result is that more small files will be generated. Small files are unfriendly to HDFS, and more small files will affect the performance of HDFS. In this case, you can do some small text merge operation.
There are several configurations required for streaming data to be written to Hive:
For partitioned tables, set the parameters for Partition Commit Delay. The meaning of this parameter is to control how long each partition contains data, such as days, hours, and so on.
Partition Commit Trigger indicates when Partition Commit will be triggered, and Process-time and Partition-time triggering mechanisms are supported in version 1.11.
Partition Commit Policy indicates how the partition will be submitted. For Hive, you need to commit the partition to metastore so that the partition is visible. Metastore policy only supports Hive tables. The other is success-file, where success-file tells downstream job partitions that the data is ready. Users can also customize and implement a submission method by themselves. In addition, Policy can specify multiple, for example, you can specify metastore and success-file at the same time.
Let's take a look at the implementation of downstream data writing to Hive:
There are mainly two parts, one is StreamingFileWriter, using it to write data, it will distinguish between Bucket, where the Buck is similar to the partition concept of Hive, each Subtask will write data to a different Bucket. Each Bucket written by Subtask may maintain three files at the same time. In-progress Files indicates the file being written, Pending Files indicates that the file has been written but has not been submitted, and Finished Files indicates that the file has been written and has also been submitted.
The other is StreamingFileCommitter, which is executed after StreamingFileWriter. It is used to commit partitions, so it is not needed for non-partitioned tables. When one of the partition data for StreamingFileWriter is ready, StreamingFileWriter sends a Commit Message,Commit Message to StreamingFileCommitter to tell StreamingFileCommitter that the data is ready. Then the Commit Trigger that triggers the submission and the submission method Commit Policy are carried out.
Here is a concrete example:
In the example, you create a partition table called hive_table, which has two partitions dt and hour. Dt represents a string of dates, and hour represents a string of hours. Commit trigger sets partition-time,Commit delay for 1 hour, and Commit Policy sets metastore and success-file.
6. Streaming consumption Hive
Hive data is read in batches in Flink 1.10, and streaming read Hive data is provided from version 1.11.
By constantly monitoring the Hive data table for new data, and if so, the consumption of incremental data.
If you want to enable streaming consumption for a certain Hive table, you can enable it in table property, or you can use the dynamic options feature added in 1.11 to dynamically specify whether the Hive table needs to be opened for streaming reading when querying.
Streaming consumption Hive supports partitioned and non-partitioned tables. For non-partitioned tables, the addition of new files under the table directory is monitored and read incrementally. For the partition table, check whether any new partitions are added by monitoring the partition directory and Metastore. If there are new partitions, the new partition data will be read out. It should be noted here that reading the new partition data is an one-time process. That is, after the new partition is added, the partition data will be read out all at once, and after that, the data of the partition will no longer be monitored. So if you need to stream Hive's partition table with Flink, make sure that the partition's data is complete when it is added.
Streaming consumption of Hive data also requires additional parameters to be specified. First of all, you need to specify the consumption order, because the data is read incrementally, so you need to specify the order in which the data is consumed. Currently, two consumption orders, create-time and partition-time, are supported.
Users can also specify the starting point of consumption, similar to the functions such as consuming kafka specifying offset, and from which time point of data they want to start consuming. When Flink consumes data, it checks and reads only the data after this point in time.
Finally, you can specify the interval for monitoring. Because at present, the addition of monitoring new data is to scan the file system, you may want to monitor not too frequently, too often will cause great pressure on the file system. So you can control an interval.
Finally, look at the principle of down-stream consumption. First, take a look at the streaming consumption non-partition table:
In the figure, ContinuoousFileMonitoringFunction will constantly monitor the files under the non-partitioned table directory and interact with the file system. Once it is found that new files have been added, Splits will be generated for these files, and the Splits will be transferred to ContinuoousFileReaderOperator,FileReaderOperator to Splits, then the data will be actually consumed in the file system, and then the read data will be transferred to the downstream for processing.
There is not much difference between streaming consumer partitioned tables and non-partitioned tables, in which HiveContinuousMonitoringFunction constantly scans the file system, but it scans directories for new partitions. When it finds that there is a new partition directory, it will further check in metstore to see if the partition has been submitted to metstore. If it has been submitted, you can consume the data in the partition. Then the data in the partition is generated and Splits is passed to ContinuousFileReaderOperator, and then the data can be consumed.
7. Associated Hive dimension table
Another scenario about the combination of Hive and streaming data is to associate Hive dimension tables. For example, when consuming streaming data, join with an offline Hive dimension table.
The associated Hive dimension table adopts the syntax of Flink's Temporal Table, which takes the dimension table of Hive as Temporal Table, and then join with the streaming table. To learn more about Temporal Table, check out Flink's website.
The implementation of the associated Hive dimension table is that each sub-task caches the Hive table in memory, caching the entire Hive table. If the size of the Hive dimension table exceeds the available memory of sub-task, the job will fail.
When the Hive dimension table is associated, the Hive dimension table may be updated, so it allows the user to set the timeout for the hive table cache. After this time, sub-task reloads the Hive dimension table. It should be noted that this scenario does not apply to frequent updates of Hive dimension tables, which can put a lot of pressure on the HDFS file system. Therefore, it is suitable for the case of slow update of Hive dimension table. The cache timeout is generally set to a long time, usually at the level of hours.
This diagram shows the principle of associating Hive dimension tables. Streaming Data stands for streaming data, and LookupJoinRunner stands for Join operator, which gets the join key of streaming data and passes the join key to FileSystemLookupFunction.
FileSystemLookupFunction is a Table function that interacts with the underlying file system and loads the Hive table, and then queries the Hive table for join key to determine which rows of data can be join.
Here is an example of an associated Hive dimension table:
This is an example of the official website of Flink. The streaming table is Orders,LatestTates, which is the dimension table of Hive.
Third, Hive batch and stream integrated data warehouse
As can be seen from the above introduction, in Flink 1.11, the integrated function of Hive warehouse and batch flow is focused on development. Because Flink is a stream processing engine, I hope to help users better combine batch and stream, let Hive data warehouse achieve real-time transformation, and make it more convenient for users to mine the value of data.
Prior to Flink 1.11, Flink docking Hive would do some batch calculations and only support offline scenarios. One of the problems in offline scenarios is that the delay is relatively large, and batch job scheduling is generally scheduled through some scheduling framework. In fact, the delay will have a cumulative effect. For example, after the first job, you can run the second job.... This is done in turn. So end-to-end latency is the superposition of all job.
After 1.11, the streaming capability of Hive is supported, and a real-time transformation of Hive data warehouse can be carried out.
For example, some data of Online, use Flink to do ETL, to write to Hive in real time. When the data is written into Hive, a new Flink job can be connected to do real-time query or near real-time query, and the results can be returned quickly. At the same time, other Flink job can also use the data written into the Hive warehouse as a dimension table to associate and integrate with other online data to get the analysis results.
The above is all the contents of the article "sample Analysis of Flink 1.11 and Hive batch Integrated data Warehouse". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.