In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail the example analysis of the integration of SQL, HiveCatalog and event time in Flink 1.10. The editor thinks it is very practical, so I share it with you for reference. I hope you can get something after reading this article.
Flink 1.10 is an innovative version compared to 1.9, with improvements in many areas that we are interested in, especially Flink SQL. This article uses a simple example of calculating PV and UV from the burial log to experience two important new features of Flink 1.10:
One is the support of SQL DDL for event time, and the other is the metadata storage of Hive Metastore as Flink (that is, HiveCatalog).
These two points will provide great convenience for us to build a real-time data warehouse.
Add dependencies
The example uses Hive version 1.1.0 and Kafka version 0.11.0.2.
To integrate Flink with Hive to use HiveCatalog, you need to first put the following JAR package in the ${FLINK_HOME} / lib directory.
Flink-connector-hive_2.11-1.10.0.jarflink-shaded-hadoop-2-uber-2.6.5-8.0.jarhive-metastore-1.1.0.jarhive-exec-1.1.0.jarlibfb303-0.9.2.jar
The last three JAR packages are included with Hive and can be found in the ${HIVE_HOME} / lib directory. The first two can be found by searching GAV on Aliyun Maven and downloaded manually (both groupId are org.apache.flink).
Then add the relevant Maven dependencies to the pom.xml.
Maven download:
Https://maven.aliyun.com/mvn/search 2.11 1.10.0 1.1.0
Org.apache.flink flink-table-api-scala_$ {scala.bin.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_$ {scala.bin.version} ${flink.version} org.apache.flink flink-table-planner-blink_$ {scala.bin.version} ${flink.version} org .apache.flink flink-sql-connector-kafka-0.11_$ {scala.bin.version} ${flink.version} org.apache.flink flink-connector-hive_$ {scala.bin.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.hive hive-exec ${hive.version}
Finally, find the configuration file hive-site.xml for Hive, and the preparation is done.
Register HiveCatalog, create database
No more nonsense, directly on the code, concise and easy to understand.
Val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism (5) streamEnv.setStreamTimeCharacteristic (TimeCharacteristic.EventTime)
Val tableEnvSettings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build () val tableEnv = StreamTableEnvironment.create (streamEnv, tableEnvSettings)
Val catalog = new HiveCatalog ("rtdw", / / catalog name "default", / / default database "/ Users/lmagic/develop", / / Hive config (hive-site.xml) directory "1.1.0" / / Hive version) tableEnv.registerCatalog ("rtdw", catalog) tableEnv.useCatalog ("rtdw")
Val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods" tableEnv.sqlUpdate (createDbSql)
Create a Kafka flow table and specify the event time
Our burial site log is stored in the specified Kafka topic in JSON format, and the simplified version of schema is roughly as follows.
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "," platform ":" xyz "," columnType ":" merchDetail "," merchandiseId ":" 12727495 "," fromType ":" wxapp "," siteId ":" 20392 "," categoryId ":"," ts ": 1585136092541
The ts field is the timestamp (millisecond) of the buried event. In the era of Flink 1.9, when creating a flow table with a CREATE TABLE statement, it was impossible to specify the event time, only the processing time was used by default. Under Flink 1.10, you can write like this.
CREATE TABLE rtdw.ods.streaming_user_active_log (eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME (),-- processing time eventTime AS TO_TIMESTAMP (ts / 1000, 'yyyy-MM-dd HH:mm:ss')) -- event time WATERMARK FOR eventTime AS eventTime-INTERVAL '10' SECOND-- watermark) WITH (' connector.type' = 'kafka',' connector.version' = '0.11', 'connector.topic' =' ng_log_par_extracted', 'connector.startup-mode' =' latest-offset',-- specify the starting offset location 'connector.properties.zookeeper.connect' =' zk109:2181,zk110:2181,zk111:2181' 'connector.properties.bootstrap.servers' =' kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' =' rtdw_group_test_1', 'format.type' =' json', 'format.derive-schema' =' true',-- automatic derivation and parsing of JSON 'update-mode' =' append' from table schema)
Flink SQL introduces the concept of computed computed column, whose syntax is column_name AS computed_column_expression. Its function is to generate columns in the table that the data source schema does not exist, and can make use of the original columns, various operators and built-in functions. For example, in the above SQL statement, the processing time column is generated by using the built-in PROCTIME () function, and the event time column is generated by using the original ts field and the two time conversion functions FROM_UNIXTIME () and TO_TIMESTAMP ().
Why can't the ts field be used directly as event time? Because Flink SQL stipulates that the time feature must be of type TIMESTAMP (3), that is, a string shaped like a "yyyy-MM-ddTHH:mm:ssZ" format, Unix timestamps naturally do not work, so you have to convert a wave first.
Now that there is an event time, it is natural to have a watermark. Flink SQL introduces the syntax of WATERMARK FOR rowtime_column_name AS watermark_strategy_expression to generate watermarks. There are two common approaches:
Monotonously unsubtracted watermark (corresponding to the AscendingTimestampExtractor of DataStream API)
WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL '0.001' SECOND
Bounded disorder watermark (corresponding to BoundedOutOfOrdernessTimestampExtractor of DataStream API) WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL 'n' TIME_UNIT
The SQL statement above sets a 10-second out-of-order interval. If you are not familiar with watermarking, AscendingTimestampExtractor and BoundedOutOfOrdernessTimestampExtractor, you can see the previous article to understand why this syntax is like this.
Https://www.jianshu.com/p/c612e95a5028, let's formally create the table.
Val createTableSql = "" | the above SQL statement |. "" .stripMargin tableEnv.sqlUpdate (createTableSql)
After execution, we can also go to Hive to execute the DESCRIBE FORMATTED ods.streaming_user_active_log statement, and we can find that the table does not have actual columns, and all properties (including schema, connector, format, and so on) are recorded in Hive Metastore as metadata.
All tables created by Flink SQL have a tag attribute is_generic=true, which is not shown in the figure.
Windowing calculation of PV and UV
With a 30-second scrolling window, grouped by event type, the query statement is as follows.
SELECT eventType,TUMBLE_START (eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END (eventTime, INTERVAL' 30' SECOND) AS windowEnd,COUNT (userId) AS pv,COUNT (DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType,TUMBLE (eventTime, INTERVAL' 30' SECOND)
See the official documentation for how windows are expressed in SQL. The official documentation of version 1.10 of SQL is quite OK. SQL documentation:
Https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows is too lazy to output to a result table and converts it directly to stream and type it on the screen. Val queryActiveSql = "" |. |. ".stripMargin val result = tableEnv.sqlQuery (queryActiveSql)
Result .toAppendStream [Row] .print () .setParallelism (1) this is the end of the article on "sample Analysis of SQL, HiveCatalog and event time Integration in Flink 1.10". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.