In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail how to obtain TableAPI, SQL and Kafka messages in Flink. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Use the Tbale&SQL and Flink Kafka connectors to get data from the message queue of kafka
Sample environment
Java.version: 1.8.xflink.version: 1.11.1kafka:2.11
Sample data source (project code cloud download)
Building Development Environment and data of Flink system example
Sample module (pom.xml)
TableAPI & SQL and sample Module of Flink system
SelectToKafka.java
Package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row / * * @ Description uses Tbale&SQL and Flink Kafka connectors to get data from the message queue of kafka * / public class SelectToKafka {/ * * official reference: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html start offset position config option scan.startup.mode specifies the startup mode of the Kafka user. A valid enumeration is: group-offsets: starts with offsetting commitments in ZK / Kafka brokers for specific consumer groups. Earliest-offset: start with the earliest offset. Latest-offset: start with the latest offset. Timestamp: start with the timestamp provided by the user of each partition. Specific-offsets: starts with a specific offset provided by the user for each partition. The default option value group-offsets indicates that the offset consumption consistency guarantee sink.semantic option last submitted from the ZK / Kafka broker selects three different modes of operation: NONE:Flink does not guarantee anything. The resulting records may be lost or can be repeated. AT_LEAST_ONCE (default): this ensures that no records are lost (although they can be repeated). EXACTLY_ONCE:Kafka transactions will be used to provide precise semantics at one time. Whenever you use a transaction to write to Kafka, don't forget to set the required setting isolation.level (read_committed or read_uncommitted- is the default) for any application that uses Kafka records. * / static String table_sql = "CREATE TABLE KafkaTable (\ n" + "`item_ id` BIGINT,\ n" + "`item_ id` BIGINT,\ n" + "`uplovior` STRING,\ n" + "`ts` TIMESTAMP (3)\ n" + "WITH (\ n" + "'connector' =' kafka'") \ n "+" 'topic' =' user_behavior',\ n "+" 'properties.bootstrap.servers' =' 192.168.110.35 testGroup', 9092,\ n "+" 'properties.group.id' =' testGroup',\ n "+" scan.startup.mode' = 'earliest-offset' \ n "+" 'format' =' json'\ n "+") " Public static void main (String [] args) throws Exception {/ / build StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / default stream time mode env.setStreamTimeCharacteristic (TimeCharacteristic.ProcessingTime); / / build EnvironmentSettings and specify BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance (). UseBlinkPlanner (). InStreamingMode (). Build (); / build StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create (env, bsSettings) / register kafka data dimension table tEnv.executeSql (table_sql); String sql = "select user_id,item_id,behavior,ts from KafkaTable"; Table table = tEnv.sqlQuery (sql); / / print field structure table.printSchema (); / / table convert dataStream stream DataStream behaviorStream = tEnv.toAppendStream (table, Row.class); behaviorStream.print () Env.execute ();}}
Print the result
Root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP (3) 3 > 1 Magi 1Magic 1021-01-26T10:25:44 so much about how to obtain TableAPI, SQL and Kafka messages in Flink. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.