In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the Flink 1.11 new features of the SQL Hive Streaming example analysis, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let the editor take you to understand it.
Add related dependencies
The Hive version on the test cluster is 1.1.0 Magi Hadoop version 2.6.0 Magi Kafka version 1.0.1.
2.11 1.11.0 2.6.5-10.0 1.1.0
Org.apache.flink flink-streaming-scala_$ {scala.bin.version} ${flink.version} org.apache.flink flink-clients_$ {scala.bin.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-table-api-scala-bridge_$ {scala.bin.version} ${flink.version} Org.apache.flink flink-table-planner-blink_$ {scala.bin.version} ${flink.version} org.apache.flink flink-connector-hive_$ {scala.bin.version} ${flink.version} org.apache.flink flink-sql-connector-kafka_$ {scala.bin.version} ${flink.version} org.apache.flink flink-json ${flink.version} Org.apache.flink flink-shaded-hadoop-2-uber ${flink-shaded-hadoop.version} org.apache.hive hive-exec ${hive.version}
Also, don't forget to find hdfs-site.xml and hive-site.xml and add them to the project.
Create an execution environment
In Table/SQL API of Flink 1.11, FileSystem Connector is implemented by an enhanced version of the StreamingFileSink component, which is called StreamingFileWriter in the source code. We know that only when Checkpoint is successful will the files written by StreamingFileSink change from the Pending state to the Finished state, so that they can be safely read downstream. Therefore, we must open the Checkpointing and set a reasonable interval.
Val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) streamEnv.setParallelism (3)
Val tableEnvSettings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build () val tableEnv = StreamTableEnvironment.create (streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set (ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set (ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds (20))
Register for HiveCatalog
Val catalogName = "my_catalog" val catalog = new HiveCatalog (catalogName, / / catalog name "default", / / default database "/ Users/lmagic/develop", / / Hive config (hive-site.xml) directory "1.1.0" / / Hive version) tableEnv.registerCatalog (catalogName, catalog) tableEnv.useCatalog (catalogName)
Create a Kafka flow table
What is stored in Kafka Topic is the buried point log in JSON format, and the calculated column is used to generate the event time and watermark when building the table. The parameters of version 1.11 SQL Kafka Connector are somewhat simpler than those of version 1.10.
TableEnv.executeSql ("CREATE DATABASE IF NOT EXISTS stream_tmp") tableEnv.executeSql ("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")
TableEnv.executeSql ("" | CREATE TABLE stream_tmp.analytics_access_log_kafka (| ts BIGINT, | userId BIGINT, | eventType STRING, | fromType STRING, | columnType STRING, | siteId BIGINT, | grouponId BIGINT, | partnerId BIGINT, | merchandiseId BIGINT, | procTime AS PROCTIME (), | eventTime AS TO_TIMESTAMP (ts / 1000 MMMurdd HH:mm:ss')) | | WATERMARK FOR eventTime AS eventTime-INTERVAL '15' SECOND |) WITH (|' connector' = 'kafka', |' topic' = 'ods_analytics_access_log', |' properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092' |' properties.group.id' = 'flink_hive_integration_exp_1', |' scan.startup.mode' = 'latest-offset', |' format' = 'json' | | | 'json.fail-on-missing-field' =' false', | 'json.ignore-parse-errors' =' true' |) "" .stripMargin)
HiveCatalog has been registered previously, so you can observe the metadata of the created Kafka flow table in Hive (note that the table does not have actual columns).
Hive > DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;OK# col_name data_type comment
# Detailed Table InformationDatabase: stream_tmpOwner: nullCreateTime: Wed Jul 15 18:25:09 CST 2020LastAccessTime: UNKNOWNProtect Mode: NoneRetention: 0Location: hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafkaTable Type: MANAGED_TABLETable Parameters: flink. Connector kafka flink.format json flink.json.fail-on-missing-field false flink.json.ignore-parse-errors true flink.properties.bootstrap.servers kafka110:9092 Kafka111:9092,kafka112:9092 flink.properties.group.id flink_hive_integration_exp_1 flink.scan.startup.mode latest-offset flink.schema.0.data-type BIGINT flink.schema.0.name ts flink.schema.1.data-type BIGINT flink.schema.1.name userId flink.schema.10.data-type TIMESTAMP (3) flink.schema.10.expr TO_TIMESTAMP (FROM_UNIXTIME (`ts` / 1000) 'yyyy-MM-dd HH:mm:ss') flink.schema.10.name eventTime flink.schema.2.data-type VARCHAR (2147483647) flink.schema.2.name eventType # slightly. Flink.schema.9.data-type TIMESTAMP (3) NOT NULL flink.schema.9.expr PROCTIME () flink.schema.9.name procTime flink.schema.watermark.0.rowtime eventTime flink.schema.watermark.0.strategy.data-type TIMESTAMP (3) flink.schema.watermark.0.strategy.expr `eventTime`-INTERVAL '15' SECOND flink.topic ods_analytics_access_log is_generic true transient_lastDdlTime 1594808709
# Storage InformationSerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeInputFormat: org.apache.hadoop.mapred.TextInputFormatOutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormatCompressed: NoNum Buckets:-1Bucket Columns: [] Sort Columns: [] Storage Desc Params: serialization.format 1Time taken: 1.797 seconds, Fetched: 61 row (s)
Create a Hive table
Flink SQL provides DDL compatible with HiveQL style, and you can specify SqlDialect.HIVE (DML compatibility is still under development).
To facilitate the observation of the results, the following table uses a three-level partition of day / hour / minute, which can be avoided in practical applications (a partition of 10 minutes or even an hour may be more appropriate).
TableEnv.getConfig.setSqlDialect (SqlDialect.HIVE)
TableEnv.executeSql ("CREATE DATABASE IF NOT EXISTS hive_tmp") tableEnv.executeSql ("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")
TableEnv.executeSql ("" | CREATE TABLE hive_tmp.analytics_access_log_hive (| ts BIGINT, | user_id BIGINT, | event_type STRING, | from_type STRING, | column_type STRING, | site_id BIGINT, | groupon_id BIGINT, | partner_id BIGINT, | merchandise_id BIGINT |) PARTITIONED BY (| ts_date STRING, | ts_hour STRING | | ts_minute STRING |) STORED AS PARQUET | TBLPROPERTIES (| 'sink.partition-commit.trigger' =' partition-time', | 'sink.partition-commit.delay' =' 1 min', | 'sink.partition-commit.policy.kind' =' metastore,success-file', | 'partition.time-extractor.timestamp-pattern' =' $ts_date $ts_hour:$ts_minute:00' |) "" .stripMargin) |
The parameters of the Hive table reuse the relevant parameters of SQL FileSystem Connector and are closely related to partition submission (Partition Commit). Just briefly explain the four parameters that appear above.
Sink.partition-commit.trigger: the time characteristic that triggers the partition submission. The default is processing-time, or processing time, which obviously can cause data partitioning disorder in the event of a delay. So partition-time is used here, that is, it is submitted according to the partition timestamp (that is, the event time corresponding to the data in the partition). Partition.time-extractor.timestamp-pattern: the extraction format of the partition timestamp. It needs to be written in the form of yyyy-MM-dd HH:mm:ss and replaced with the corresponding partition field in the Hive table as a placeholder. Obviously, the partition field value of the Hive table comes from the event time defined in the flow table, as you will see later. Sink.partition-commit.delay: the delay that triggers the partition commit. When the time characteristic is set to partition-time, the partition will only be submitted when the timestamp of the watermark is greater than the creation time of the partition plus this delay. This value is preferably the same as the partition granularity. For example, if the Hive table is partitioned by 1 hour, this parameter can be set to 1 h, and if partitioned by 10 minutes, it can be set to 10 min. Sink.partition-commit.policy.kind: partition commit policy, which can be understood as an additional operation that makes the partition visible to the downstream. Metastore means to update the table metadata in Hive Metastore, and success-file means to create a _ SUCCESS tag file within the partition.
Of course, the function of SQL FileSystem Connector is not limited to this, there is a lot of room for customization (for example, you can customize the partition submission policy to merge small files, etc.). Please refer to the official documentation for details.
Https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sink
Streaming write to Hive
Note that the event time in the flow table is converted to the partition of the Hive.
TableEnv.getConfig.setSqlDialect (SqlDialect.DEFAULT) tableEnv.executeSql ("" | INSERT INTO hive_tmp.analytics_access_log_hive | SELECT | ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId, | DATE_FORMAT (eventTime,'yyyy-MM-dd'), | DATE_FORMAT (eventTime,'HH'), | DATE_FORMAT (eventTime,'mm') | FROM stream_tmp.analytics_access_log_kafka | WHERE merchandiseId > 0 ".stripMargin)
Let's take a look at the results of streaming Sink.
The Checkpoint Interval set above is 20 seconds, and you can see that the data file in the image above is written exactly at an interval of 20 seconds. Because the degree of parallelism is 3, 3 files are generated per write. After all the data in the partition has been written, the _ SUCCESS file is generated at the same time. If it is a partition being written, you will see the .inprogress file.
Query through Hive to make sure that the time of the data is correct.
Hive > SELECT from_unixtime (min (cast (ts / 1000 AS BIGINT), from_unixtime (max (cast (ts / 1000 AS BIGINT)) > FROM hive_tmp.analytics_access_log_hive > WHERE ts_date = '2020-07-15' AND ts_hour ='23 'AND ts_minute =' 23 '23:23:59Time taken: 1.115 seconds, Fetched: 1 row (s)
Streaming read Hive
To use the Hive table as a streaming Source, you need to enable Dynamic Table Options and specify the parameters of the Hive data flow through Table Hints. The following is an example of simply calculating the PV of a commodity through Hive.
TableEnv.getConfig.getConfiguration.setBoolean (TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
Val result = tableEnv.sqlQuery ("" | SELECT merchandise_id,count (1) AS pv | FROM hive_tmp.analytics_access_log_hive | / * + OPTIONS (| 'streaming-source.enable' =' true', | 'streaming-source.monitor-interval' =' 1 min') | | 'streaming-source.consume-start-offset' =' 2020-07-15 23 shtOpenGoodsDetail' 30 shtOpenGoodsDetail' |) * / | AND ts_date > = '2020-07-15' | GROUP BY merchandise_id | ORDER BY pv DESC LIMIT 10 ".stripMargin) |
Result.toRetractStream [Row] .print () .setParallelism (1) streamEnv.execute ()
The meaning of the three Table Hint parameters is explained as follows.
Streaming-source.enable: set to true, indicating that the Hive table can be used as a Source. Streaming-source.monitor-interval: perceives the period of new data in the Hive table, which is set to 1 minute above. For partition tables, it is to monitor the generation of new partitions to read data incrementally. Streaming-source.consume-start-offset: the timestamp for starting consumption also needs to be written in the form of yyyy-MM-dd HH:mm:ss.
More specific instructions can still be found in the official documentation.
Https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading finally, because there are ORDER BY and LIMIT logic in the SQL statement, you need to call the toRetractStream () method to convert it to a withdrawal flow to output the result.
Thank you for reading this article carefully. I hope the article "SQL Hive Streaming sample Analysis of Flink 1.11 New Features" shared by the editor will be helpful to you. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.