In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use Hive Catalog". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to use Hive Catalog.
What is Hive Catalog?
We know that Hive uses Hive Metastore (HMS) to store metadata information and relational databases to persist this information. Therefore, Flink integration Hive needs to get through the metastore of Hive to manage the metadata of Flink, which is the function of Hive Catalog.
The main role of Hive Catalog is to use Hive MetaStore to manage the metadata of Flink. Hive Catalog can persist the metadata so that subsequent operations can use the metadata of these tables repeatedly without having to re-register every time they are used. If you do not persist the catalog, then fetching and processing data in each session will have to repeatedly create metadata objects, which is very time-consuming.
How to use Hive Catalog
HiveCatalog is available right out of the box, so once you have configured Flink to integrate with Hive, you can use HiveCatalog. For example, we create a data source table for kafka through the DDL statement of FlinkSQL, and we can view the metadata information of that table immediately.
HiveCatalog can handle two types of tables: one is an Hive compatible table, and the other is a plain table (generic table). The Hive-compatible table is stored in a Hive-compatible manner, so for Hive-compatible tables, we can use either Flink or Hive to manipulate the table.
Regular tables are for Flink. When you create a normal table using HiveCatalog, you simply use Hive MetaStore to persist its metadata, so you can view the metadata information of these tables through Hive (through the DESCRIBE FORMATTED command), but you cannot process these tables through Hive because the syntax is not compatible.
Flink uses the is_generic attribute to identify whether it is a regular table or not. By default, the table created is a normal table, that is, is_generic=true, and if you want to create an Hive compatible table, you need to specify is_generic=false in the table creation properties.
Scream hint:
Since you rely on Hive Metastore, you must open the Hive MetaStore service
Use Hive Catalog EnvironmentSettings settings = EnvironmentSettings.newInstance (). UseBlinkPlanner (). Build () in the code
TableEnvironment tableEnv = TableEnvironment.create (settings)
String name = "myhive"
String defaultDatabase = "default"
String hiveConfDir = "/ opt/modules/apache-hive-2.3.4-bin/conf"
HiveCatalog hive = new HiveCatalog (name, defaultDatabase, hiveConfDir)
TableEnv.registerCatalog ("myhive", hive)
/ / use registered catalog
TableEnv.useCatalog ("myhive"); use Hive Catalog in Flink SQLCli
Using Hive Catalog in FlinkSQL Cli is easy, as long as you configure the sql-cli-defaults.yaml file. The configuration is as follows:
Catalogs:
-name: myhive
Type: hive
Default-database: default
Hive-conf-dir: / opt/modules/apache-hive-2.3.4-bin/conf
Create a kafka table in FlinkSQL Cli, which defaults to a normal table, namely is_generic=true
CREATE TABLE user_behavior (
`user_ id` BIGINT,-- user id
`item_ id` BIGINT,-- Commodity id
`cat_ id` BIGINT,-- category id
`action` STRING,-- user behavior
`province` INT,-- the province where the user resides
`ts`BIGINT,-- the timestamp of the user's behavior
`proctime` AS PROCTIME ()-- generate a processing time column by calculating the column
`eventTime` AS TO_TIMESTAMP (FROM_UNIXTIME (ts, 'yyyy-MM-dd HH:mm:ss')),-- event time
WATERMARK FOR eventTime AS eventTime-INTERVAL'5' SECOND-defines watermark
) WITH (
'connector' =' kafka',-use kafka connector
'topic' =' user_behavior',-- kafka topic
'scan.startup.mode' =' earliest-offset',-offset
'properties.group.id' =' group1',-Consumer group
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'
'format' =' json',-data source format is json
'json.fail-on-missing-field' = 'true'
'json.ignore-parse-errors' = 'false'
);
We can view the metadata information of the table in the Hive client
Hive (default) > desc formatted user_behavior
Table Parameters:
...
Is_generic true
...
As can be seen from the metadata information above, is_generic=true indicates that the table is a regular table, and an error will be reported if you look at the table in Hive.
The table created above is a normal table that cannot be queried using Hive. So how do you create an Hive compatibility table? We only need to display the specified is_generic=false in the properties of the table, as shown below:
CREATE TABLE hive_compatible_tbl (
`user_ id` BIGINT,-- user id
`item_ id` BIGINT,-- Commodity id
`cat_ id` BIGINT,-- category id
`action` STRING,-- user behavior
`province` INT,-- the province where the user resides
`ts` BIGINT-the timestamp of the user's behavior
) WITH (
'connector' =' kafka',-use kafka connector
'topic' =' user_behavior',-- kafka topic
'scan.startup.mode' =' earliest-offset',-offset
'properties.group.id' =' group1',-Consumer group
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'
'format' =' json',-data source format is json
'json.fail-on-missing-field' = 'true'
'json.ignore-parse-errors' = 'false'
'is_generic' = 'false'
);
When we look at the metadata information of the table in Hive, we can see: is_generic = false
Hive (default) > desc formatted hive_compatible_tbl
Table Parameters:
...
Is_generic false
...
We can use FlinkSQL Cli or HiveCli to write data to the table, and then use FlinkSQL Cli and HiveCli to see the changes in the table data.
Hive (default) > insert into hive_compatible_tbl select 2020, 1221, 100, 1174330486
Hive (default) > select * from hive_compatible_tbl
Then view the table in FlinkSQL Cli
Flink SQL > select user_id,item_id,action from hive_compatible_tbl
User_id item_id action
2020 1221 buy
Similarly, we can write data to the table in FlinkSQL Cli:
Flink SQL > insert into hive_compatible_tbl select 2020, 1222, 101, favorites, 11, 1574330486
Flink SQL > select user_id,item_id,action from hive_compatible_tbl
User_id item_id action
2020 1221 buy
2020 1222 fav
Scream hint:
For Hive-compatible tables, you need to pay attention to the data types, the specific data type correspondence and the following points to note
Flink data type Hive data type CHAR (p) CHAR (p) VARCHAR (p) VARCHAR (p) STRINGSTRINGBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTLONGFLOATFLOATDOUBLEDOUBLEDECIMAL (p, s) DECIMAL (p, s) DATEDATETIMESTAMP (9) TIMESTAMPBYTESBINARYARRAYLISTMAPMAPROWSTRUCT
Note:
The maximum length of type Hive CHAR (p) is 255Hive VARCHAR (p), and the maximum length of key of type 65535Hive MAP is only basic types, while key execution of type Flink's MAP does not support any type of Hive. For example, the precision of STRUCTHive's TIMESTAMP is 9, and the Hive UDFs function can only handle precision set table.sql-dialect=hive;-- using hive dialect.
Flink SQL > set table.sql-dialect=default;-- using default dialect
Scream hint:
Once you switch to hive dialect, you can only use the syntax of Hive to build the table. If you try to build the table using the syntax of Flink, you will get an error.
Cooperate with dialectEnvironmentSettings settings = EnvironmentSettings.newInstance (). UseBlinkPlanner ()... build () in Table API
TableEnvironment tableEnv = TableEnvironment.create (settings)
/ / use hive dialect
TableEnv.getConfig () setSqlDialect (SqlDialect.HIVE)
/ / use default dialect
TableEnv.getConfig () .setSqlDialect (SqlDialect.DEFAULT); example Flink SQL > set table.sql-dialect=hive
-- create a table using Hive syntax
CREATE TABLE IF NOT EXISTS `hive_dialect_ tbl` (
`id`int COMMENT 'id'
`name`string COMMENT 'name'
`age`age 'int COMMENT'
)
COMMENT 'hive dialect Table Test'
ROW FORMAT DELIMITED FIELDS TERMINATED BY','
Go to the Hive client to view the metadata information of the table
Desc formatted hive_dialect_tbl
Col_name data_type comment
# col_name data_type comment
Id int
Name string
Age int
# Detailed Table Information
Database: default
Owner: null
CreateTime: Mon Dec 21 17:23:48 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl
Table Type: MANAGED_TABLE
Table Parameters:
Comment hive dialect table test
Is_generic false
Transient_lastDdlTime 1608542628
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets:-1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
Field.delim
Serialization.format
Obviously, this table is an Hive compatibility table, or is_generic=false.
Use FlinkSQLCli to write a piece of data to the table:
Flink SQL > insert into hive_dialect_tbl select 1
We can also manipulate the table in the Cli of Hive
Hive (default) > select * from hive_dialect_tbl
Hive (default) > insert into hive_dialect_tbl select 2
Here are some considerations for using the Hive dialect.
Hive dialect can only be used to manipulate Hive tables, not regular tables. The Hive dialect should be used with HiveCatalog. Although all versions of Hive support the same syntax, whether there is a specific feature depends on the version of Hive used. For example, updating database locations is supported only in Hive-2.4.0 or later. Hive and Calcite have different reserved keywords. For example, default is a reserved keyword in Calcite and an unreserved keyword in Hive. Therefore, when using Hive dialect, such keywords must be referenced in backquotes (`) before they can be used as identifiers. Views created in Flink cannot be queried in Hive.
Of course, once Hive dialect is enabled, we can process Hive data in Flink in the same way as Hive. The specific operation is the same as Hive, which will not be discussed in detail in this article.
Thank you for your reading, the above is the content of "how to use Hive Catalog", after the study of this article, I believe you have a deeper understanding of how to use Hive Catalog, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.