Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to master the interchange between Table and DataStream

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "how to master the interchange between Table and DataStream". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

First, use kafka as the input stream

Table API support is already available in version 1. 10 of kafka's connector flink-kafka-connector. We can pass a class called Kafka directly into the connect method, which is the descriptor ConnectorDescriptor of the kafka connector.

Prepare the data:

1, language number 2, English object 3, Hua Sheng 4, Literature 5, Linguistics 6, Learning object

Create a kafka theme

. / kafka-topics.sh-- create-- zookeeper node01:2181,node02:2181,node03:2181-- replication-factor 2-- partitions 3-- topic FlinkSqlTest

Start a producer through the command line

[root@node01 bin] #. / kafka-console-producer.sh-- broker-list node01:9092,node02:9092,node03:9092-- topic FlinkSqlTest > 1, number of words > 2, English > 3, chemical > 4, literature > 5, theory\ > 6, learning

Write Flink code to connect to kafka

Import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors. {Csv, Kafka Schema} / * * @ Package * @ author Brother big data * @ date 2020-12-17 0:35 * @ version V1.0 * / object FlinkSQLSourceKafka {def main (args: Array [String]): Unit = {/ / get the running environment for stream processing val env = StreamExecutionEnvironment.getExecutionEnvironment / / get the running environment for table val tableEnv = StreamTableEnvironment.create (env) tableEnv.connect ( New Kafka () .version ("0.11") / / sets the version of kafka. Topic ("FlinkSqlTest") / / sets the topic .property ("zookeeper.connect") to connect to. "node01:2181,node02:2181,node03:2181") / / sets the connection address and port number of zookeeper. Property ("bootstrap.servers", "node01:9092,node02:9092,node03:9092") / / sets the connection address and port number of kafka) .withFormat (new Csv ()) / / sets the format .withSchema (new Schema () / / sets metadata information .field ("id") DataTypes.STRING () .field ("name", DataTypes.STRING ()) .createTemporaryTable ("kafkaInputTable") / create temporary table / / define the sql statement to query val result = tableEnv.sqlQuery ("select * from kafkaInputTable") / / print data result.toAppendStream [(String,String)] .print () / enable execution of env.execute ("source kafkaInputTable")}}

Running result diagram

Of course, you can also connect to external systems such as ElasticSearch, MySql, HBase, Hive and so on, and the implementation is basically similar.

Second, query the table

Using the connector connector of the external system, we can read and write data and register the table in the Catalog of the environment. Then you can do query transformation on the table. Flink provides us with two query methods: Table API and SQL.

Third, the call of Table API

Table API is a query API integrated into Scala and Java languages. Unlike SQL, Table API's queries are not represented as strings, but are invoked step by step in the host language. Table API is based on the Table class that represents a table and provides a complete set of methods for handling operations, API. These methods return a new Table object that represents the result of applying the transformation operation to the input table. Some relational transformation operations can be composed of multiple method calls to form a chain call structure. For example, table.select (…) .filter (...) , where select (…) Indicates that the field specified in the selection table, filter (…) Represents the filter criteria. The implementation in the code is as follows:

Val kafkaInputTable = tableEnv.from ("kafkaInputTable") kafkaInputTable.select ("*") .filter ('id! = = "1")

4. SQL query

Flink's SQL integration, based on ApacheCalcite, implements the SQL standard. In Flink, SQL query statements are defined with regular strings. The result of the SQL query is a new Table.

The code is implemented as follows:

Val result = tableEnv.sqlQuery ("select * from kafkaInputTable")

Of course, you can also add aggregation operations, such as counting the number of each user.

Call table API

Val result: Table = tableEnv.from ("kafkaInputTable") result.groupBy ("user") .select ('name,'name.count as' count)

Call SQL

Val result = tableEnv.sqlQuery ("select name, count (1) as count from kafkaInputTable group by name")

Here the fields specified in Table API are preceded by a single quotation mark', which is the type of Expression defined in Table API, and can easily represent the fields in a table. Fields can be enclosed directly in double quotes, or in half single quotation marks + field names. The latter form is generally used in future code.

Convert DataStream to Table

Flink allows us to convert Table to DataStream: based on a DataStream, we can read the data source first, then map it into a sample class, and then convert it to Table. The column field (column fields) of Table is the field in the sample class, so you don't have to bother to define schema.

5.1, code implementation

The implementation in the code is very simple, just use tableEnv.fromDataStream () directly. By default, the field definitions in the converted Table schema and DataStream correspond to each other, or you can specify them separately.

This allows us to change the order of the fields, rename them, or just select certain fields, which is equivalent to doing a map operation (or Table API's select operation).

The code is as follows:

Import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ / * * @ Package * @ author Brother big data * @ date, 2020-12-17 21:21 * @ version V1.0 * / object FlinkSqlReadFileTable {def main (args: Array [String]): Unit = {/ / build flow processing runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment / / build Table runtime environment val tableEnv = StreamTableEnvironment.create (env) / / use streaming processing to read data val readData = env.readTextFile (". / data/word.txt") / / use flatMap for sharding val word: DataStream [String] = readData.flatMap (_ .split (")) / / convert word to tableval table = tableEnv.fromDataStream (word) / / calculate wordcount val wordCount = Table.groupBy ("f0"). Select ('f0) 'f0.count as' count) wordCount.printSchema () / / convert to stream processing printout tableEnv.toRetractStream [(String,Long)] (wordCount) .print () env.execute ("FlinkSqlReadFileTable")}}

5.2 correspondence between data types and Table schema

The data type in DataStream corresponds to the Schema of the table (name-based mapping) according to the field name in the sample class, so you can also rename it with as.

Another way to position-based mapping is to position-based mapping directly according to the position of the field, and in the corresponding process, you can directly specify the new field name.

Name-based correspondence:

Val userTable = tableEnv.fromDataStream (dataStream,'username as' name,'id as' myid)

Location-based correspondence:

Val userTable = tableEnv.fromDataStream (dataStream, 'name,' id)

Flink's DataStream and DataSet API support multiple types. Combined types, such as tuples (built-in Scala and Java tuples), POJO, Scala case classes, and Flink's Row types, allow for nested data structures with multiple fields that can be accessed in Table expressions. Other types are treated as atomic types.

For tuple types and atomic types, it is generally better to use position correspondence; if you do not want to use a name, it is also possible: tuple type, the default name is _ 1, _ 2, while the atomic type, the default name is f0.

Create a temporary view (Temporary View)

The first way to create temporary views is to convert them directly from DataStream. Similarly, you can convert the field directly, or you can specify the corresponding field at the time of the transformation. The code is as follows:

TableEnv.createTemporaryView ("sensorView", dataStream) tableEnv.createTemporaryView ("sensorView", dataStream,'id, 'temperature,'timestamp as' ts)

In addition, of course, you can create views based on Table:

TableEnv.createTemporaryView ("sensorView", sensorTable)

The Schema for View and Table is exactly the same. In fact, in Table API, View and Table can be considered equivalent.

This is the end of the content of "how to master the interchange between Table and DataStream". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report