In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how to connect Flink SQL to Hive and write / read data. It is very detailed and has certain reference value. Interested friends must finish reading it.
1. Add dependency 1.11.2 2.11 org.apache.flink flink-streaming-scala_$ {scala.version} ${flink.version} org.apache.flink flink-connector-kafka-0.11_$ {scala.version} ${flink.version} Org.apache.flink flink-clients_$ {scala.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_$ {scala.version} ${flink.version} org.apache.flink Flink-table-planner-blink_$ {scala.version} ${flink.version} org.apache.flink flink-connector-hive_$ {scala.version} ${flink.version} org.apache.hive hive-exec 2.1. 1 org.apache.flink flink-shaded-hadoop-2-uber 2.6.5-7.0 org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-elasticsearch7_$ {scala.version} ${flink.version} org.apache.flink flink-csv ${flink.version} com.fasterxml.jackson.core jackson-databind 2.4.0 com.fasterxml.jackson.core jackson-annotations 2.4.0 com.fasterxml.jackson.core jackson-core 2.4.0 2. Create a blink version of the batch Table execution environment EnvironmentSettings bbSettings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inBatchMode () .build (); TableEnvironment bbTableEnv = TableEnvironment.create (bbSettings)
After actual testing, HiveTableSink does not support streaming writing (AppendStreamTableSink is not implemented). It must be a batch environment to write data to hive, but cannot write streaming data to hive. For example, creating a temporary table with kafka, and then continuously inserting the data flow from the table into hive is not allowed. Version 1.11 of the official website can realize streaming writing of hive through flink sql-client, which has yet to be verified.
3. Connect to the file system, create a hive catalog, and manipulate the table, just like Spark on Hive,flink can directly get the metadata of Hive and use flink to calculate. / / Connect to the external file bbTableEnv.connect (new FileSystem (). Path ("file:///E:/d.txt")) .withFormat (new Csv (). FieldDelimiter (',')) .withSchema (new Schema (). Field (" id ", DataTypes.STRING () .createTemporaryTable (" output ") / set the hive dialect bbTableEnv.getConfig (). SetSqlDialect (SqlDialect.HIVE); / / get the hive-site.xml directory String hiveConfDir = Thread.currentThread (). GetContextClassLoader (). GetResource ("). GetPath (). Substring (1); HiveCatalog hive = new HiveCatalog (" hive "," warningplatform ", hiveConfDir); bbTableEnv.registerCatalog (" hive ", hive); bbTableEnv.useCatalog (" hive ") BbTableEnv.useDatabase ("warningplatform"); bbTableEnv.executeSql ("insert into test select id from default_catalog.default_database.output")
The way to create temporary tables through bbTableEnv.connect () is out of date. It is recommended to use bbTableEnv.executeSql () to create temporary tables through DDL. It is not clear which catalog the temporary tables belong to, and what the rules are. According to the data, temporary tables are related to the life cycle of a single Flink session, and temporary tables are always stored in memory. The permanent table needs a catalog to manage the corresponding metadata of the table, such as hive metastore, which will exist until the table is explicitly deleted. So guess: default_catalog is stored in memory, and if we create a temporary table before switching to hive catalog, then we can use default_catalog.default_database.tableName to get the temporary table. If we switch catalog and then create a temporary table, we will not be able to get the temporary table, because it is not in the default_catalog and is stored in memory. Directly querying the temporary table will go to the current catalog to find the temporary table, so we must create a temporary table in default_catalog. The temporary view seems to be stored in the current catalog.
The view created by bbTableEnv.createTemporaryView () belongs to the current database
BbTableEnv.createTemporaryView ("output", bbTableEnv.sqlQuery ("select * from default_catalog.default_database.output"))
Note that the method of executing sql in version 1.11 has changed, inserting or executing sql statements by executing the environment's executeSql (), executeInsert (), etc.
These are all the contents of the article "how Flink SQL connects to Hive and writes / reads data". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.