Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use flink streaming sql

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "how to use flink streaming sql". Friends who are interested might as well take a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn how to use flink streaming sql.

Background

SQL,Structured Query Language: structured query language, as a general and popular query language, is becoming more and more popular not only in traditional databases, but also in the field of big data. Big data components such as hive, spark, kafka, flink and so on all support sql queries. Using sql can make it easy for people who do not understand the principles of these components to operate and greatly reduce the threshold for use. Today we will briefly talk about how to use sql in flink stream processing.

An example is given to explain the construction of StreamTableEnvironment object

In flink stream processing, to use sql, you need to first construct a StreamTableEnvironment object, which is relatively simple.

Catalog, table, function, etc. used in sql need to be registered with StreamTableEnvironment before they can be used.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ()

StreamTableEnvironment tableEnv = StreamTableEnvironment.create (env)

Register for table

Next, to register the information of the corresponding table with the StreamTableEnvironment object, there are several ways to choose.

The following code is based on flink version 1.10.0, which varies slightly from version to version.

Use Tuple

/ / use the flink tuple. You need to customize the field name.

Tuple2 tuple2 = Tuple2.of ("jack", 10)

/ / construct a DataStream of Tuple

DataStream tupleStream = env.fromElements (tuple2)

/ / Register to StreamTableEnvironment and specify the corresponding field name

TableEnv.createTemporaryView ("usersTuple", tupleStream, "name,age")

/ / execute a sql query. And then return a table object

Table table = tableEnv.sqlQuery ("select name,age from usersTuple")

/ / convert the table object to the DataStream of flink for subsequent operations, which we will output here

TableEnv.toAppendStream (table, Row.class). Print ()

Use Row

The tuple Tuple provided in flink is limited to Tuple25 at most, so if we have more fields, we can choose to use the Row object in flink.

/ / use Row

Row row = new Row (2)

Row.setField (0, "zhangsan")

Row.setField (1,20)

DataStream rowDataStream = env.fromElements (row)

TableEnv.createTemporaryView ("usersRow", rowDataStream, "name,age")

Table tableRow = tableEnv.sqlQuery ("select name,age from usersRow")

TableEnv.toAppendStream (tableRow, Row.class). Print ()

Pojo classes that use java

First set a pojo class

Public static class User {

Private String name

Private int age

Public String getName () {

Return name

}

Public void setName (String name) {

This.name = name

}

Public int getAge () {

Return age

}

Public void setAge (int age) {

This.age = age

}

}

The definition of this pojo class is to comply with the serialization rules of flink. There are certain requirements. For more information, please see [1]:

This class is of type public and has no non-static inner class that has a public no-parameter constructor class (and all superclasses) in which all non-static, non-transient fields are public (non-final), or follow the Java bean rule that the fields are private but have getter and setter methods of type public

User user = new User ()

User.setName ("Tom")

User.setAge (20)

DataStream userDataStream = env.fromElements (user)

TableEnv.createTemporaryView ("usersPojo", userDataStream)

Table tablePojo = tableEnv.sqlQuery ("select name,age from usersPojo")

TableEnv.toAppendStream (tablePojo, Row.class). Print ()

If you are using a DataStream of type java pojo, you do not have to declare the field name, and flink automatically parses the field name and type in the pojo class as the field and type of table.

Use external Stora

/ / Connect to external systems, such as files, kafka, etc.

Schema schema = new Schema ()

.field ("name", DataTypes.STRING ())

.field ("age", DataTypes.INT ())

TableEnv.connect (new FileSystem () .path ("...."))

.withFormat (new Csv ())

.withSchema (schema)

.createTemporaryTable ("usersFile")

Table tableFile = tableEnv.sqlQuery ("select name,age from usersFile")

TableEnv.toAppendStream (tableFile, Row.class). Print ()

The following objects need to be specified when using external storage:

TableEnv.connect (ConnectorDescriptor...) Specify connectors. Currently, flink supports Elasticsearch, hbase, kafka, and filesystem withFormat (FormatDescriptor format). This specifies the format of the data we read from the above data sources, such as json, csv, parquet, and so on. WithSchema (Schema schema) defines a schema for our table, that is, the name and type of the field, which is used in the sql query .createTemporaryTable ("usersFile") to give the table a name and register to StreamTableEnvironment to this. I believe you have a deeper understanding of "how to use flink streaming sql". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report