In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the function and example usage of window in FlinkSQL". In daily operation, I believe that many people have doubts about the function and example usage of window in FlinkSQL. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "window function and example usage in FlinkSQL". Next, please follow the editor to study!
Preface
Time semantics must cooperate with window operation to play a role. The main use, of course, is to open windows and make calculations according to the time period. Let's take a look at how to use the time field to do window operations in Table API and SQL. In Table API and SQL, there are two main types of windows: Group Windows and Over Windows
Grouping window (Group Windows) grouping window (Group Windows) aggregates rows into a limited group (Group) according to time or row count interval, and performs an aggregate function on the data of each group. Group Windows in Table API is defined using the .window (w:GroupWindow) clause, and an alias must be specified by the as clause. In order to group tables by window, the alias of the window must be referenced in the group by clause like a regular grouping field. Example:
Val table = input .window ([w: GroupWindow] as'w) .groupBy ('w,'a) .select ('a, 'w.start,' w.end, 'w.rowtime,' b.count)
Table API provides a set of predefined Window classes with specific semantics that are converted to underlying DataStream or DataSet window operations.
Table API supports three main window definitions, as we are familiar with: scrolling (Tumbling), sliding (Sliding), and session (Session).
1.1 Scroll window
The scroll window (Tumbling windows) is defined by the Tumble class, and there are three other methods:
Over: define window length
On: the time field used to group (by interval) or sort (by number of rows)
As: alias, which must appear later in the groupBy
Realization case
1. Demand
Set the scroll window to 10 seconds to count the number of id occurrences.
two。 Data preparation
Sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
3. Code implementation
Package windows import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.scala._ import org.apache.flink.table.api. {EnvironmentSettings, Table Tumble} import org.apache.flink.types.Row / * @ Package Windows * @ File: FlinkSQLTumBlingTie.java * @ author Brother big data * @ date on 2020-12-25 21:58 * @ version V1.0 * set the scroll window * / object FlinkSQLTumBlingTie {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) env.setStreamTimeCharacteristic (TimeCharacteristic. EventTime) val settings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build () val tableEnv = StreamTableEnvironment.create (env) Settings) / / read data val inputPath = ". / data/sensor.txt" val inputStream = env.readTextFile (inputPath) / / convert to sample class type (simple conversion operation) val dataStream = inputStream .map (data = > {val arr = data.split (",") SensorReading (arr (0), arr (1). ToLong Arr (2) .toDouble) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor [SensorReading] (Time.seconds (1)) {override def extractTimestamp (element: SensorReading): Long = element.timestamp * 1000L}) val sensorTable: Table = tableEnv.fromDataStream (dataStream,'id, 'temperature,' timestamp.rowtime as'ts) / / registry tableEnv.createTemporaryView ("sensor") SensorTable) / / table implements val resultTable = sensorTable. Window (Tumble over 10.seconds on'ts as'tw) / / counts every 10 seconds Scroll the time window .groupBy ('id,'tw). Select ('id, 'id.count,' tw.end) / / sql implements val sqlTable = tableEnv.sqlQuery ("| select | id, | count (id), | tumble_end (ts,interval '10' second) | from sensor | group by | id, | tumble (ts) Interval '10' second) | "" .stripMargin) / * .window (Tumble over 10.minutes on' rowtime as'w) (event time field rowtime) * .window (Tumble over 10.minutes on 'proctime as'w) (processing time field proctime) * .window (Tumble over 10.minutes on' proctime as'w) (similar to the count window Sort by processing time, 10 lines a group) * / resultTable.toAppendStream.print ("talbe") sqlTable.toRetractStream [Row] .print ("sqlTable") env.execute ("FlinkSQLTumBlingTie")} case class SensorReading (id: String, timestamp: Long, temperature: Double)}
Running result
1.2 sliding window
The sliding window (Sliding windows) is defined by the Slide class, and there are four other methods:
Over: define window length
Every: define the sliding step
On: the time field used to group (by interval) or sort (by number of rows)
As: alias, which must appear later in the groupBy
Realization case
1. Requirement description
Set the window size to 10 seconds and set the sliding distance to 5 seconds to count the number of id occurrences.
two。 Data preparation
Sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
3. Implementation code
Package windows import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api. {EnvironmentSettings, Slide Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import windows.FlinkSQLTumBlingTie.SensorReading / * * @ Package windows * @ File: FlinkSQLSlideTime.java * @ author Brother big data * @ date 2020-12-27 22:19 * @ version V1.0 * sliding window * / object FlinkSQLSlideTime {def main (args: Array [String]): Unit = {/ / structure Build the running environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) / / set the partition to 1 to facilitate testing env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) / / event time val settings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build () / create table env val tableEnv = StreamTableEnvironment.create (env) Settings) / / read data val inputPath = ". / data/sensor.txt" val inputStream = env.readTextFile (inputPath) / / convert to sample class type (simple conversion operation) val dataStream = inputStream .map (data = > {val arr = data.split (",") SensorReading (arr (0), arr (1). ToLong Arr (2) .toDouble) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor [SensorReading] (Time.seconds (1)) {override def extractTimestamp (element: SensorReading): Long = element.timestamp * 1000L}) val sensorTable: Table = tableEnv.fromDataStream (dataStream,'id, 'temperature,' timestamp.rowtime as'ts) / / registry tableEnv.createTemporaryView ("sensor") SensorTable) / / table API implementation val tableApi = sensorTable.window (Slide over 10.seconds every 5.seconds on'ts as'w) .groupBy ('w,'id). Select ('id, 'id.count,' w.end) val tableSql = tableEnv.sqlQuery ("| select | id | count (id), | HOP_END (ts,INTERVAL '10' SECOND) INTERVAL'5' SECOND) as w | from sensor | group by | HOP (ts,INTERVAL '10' SECOND, INTERVAL' 5' SECOND) Id | "" .stripMargin) tableApi.toAppendStream.print ("tableApi") tableSql.toAppendStream [Row] .print ("tableSql") / * .window (Slide over 10.minutes every 5.minutes on 'rowtime as'w) (event time field rowtime) .window (Slide over 10.minutes every 5.minutes on' proctime as'w) (processing time field proctime) .window (Slide over 10.rows every 5) .rows on 'proctime as'w) (similar to the count window Sorted by processing time, 10 lines per group) * * / env.execute ("FlinkSQLSlideTime")}}
4. Running result
1.3 session window
The session window (Session windows) is defined with the Session class, and there are three other methods:
WithGap: session interval
On: the time field used to group (by interval) or sort (by number of rows)
As: alias, which must appear later in the groupBy implementation case
1. Requirement description
Set a session to 10 seconds to count the number of id
two。 Prepare data
Sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
3. Write code
Package windows import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api. {EnvironmentSettings, Session Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import windows.FlinkSQLTumBlingTie.SensorReading / * @ Package windows * @ File: FlinkSqlSessionTime.java * @ author Brother big data * @ date, 2020-12-27 22:52 * @ version V1.0 * / object FlinkSqlSessionTime {def main (args: Array [String]): Unit = {/ / build runtime environment val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism (1) / set partition to 1 to facilitate testing env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) / / event time val settings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build () / create table env val tableEnv = StreamTableEnvironment.create (env) Settings) / / read data val inputPath = ". / data/sensor.txt" val inputStream = env.readTextFile (inputPath) / / convert to sample class type (simple conversion operation) val dataStream = inputStream .map (data = > {val arr = data.split (",") SensorReading (arr (0), arr (1). ToLong Arr (2) .toDouble)}) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor [SensorReading] (Time.seconds (1)) {override def extractTimestamp (element: SensorReading): Long = element.timestamp * 1000L}) val sensorTable: Table = tableEnv.fromDataStream (dataStream,'id, 'temperature,' timestamp.rowtime as'ts) / / registry tableEnv.createTemporaryView ("sensor", sensorTable) val tableApi = sensorTable. Window (Session withGap 10.seconds on'ts as'w) .groupBy ('id,'w). Select ('id, 'id.count,' w.end) val tableSQL = tableEnv.sqlQuery ("| SELECT | id, | COUNT (id), | SESSION_END (ts, INTERVAL '10' SECOND) AS w | FROM sensor | GROUP BY | id, | SESSION (ts) INTERVAL '10' SECOND) | "" .stripMargin) tableApi.toAppendStream [Row] .print ("tableApi") tableSQL.toAppendStream [Row] .print ("tableSQL") / * * .window (Session withGap 10.minutes on' rowtime as'w) event time field rowtime) * .window (Session withGap 10.minutes on 'proctime as'w) processing time field proctime) * / env.execute ("FlinkSqlSessionTime")}}
4. Running result
II. Over Windows
Over window aggregations are existing in standard SQL (Over clause) and can be defined in the SELECT clause of a query. Over window aggregation, which calculates the aggregation within the range of adjacent rows for each input row. Over windows is defined using a .window (wrapper overwindow *) clause and is referenced by an alias in the select () method. Example:
Val table = input .window ([w: OverWindow] as'w) .select ('a, 'b.sum over'w,' c.min over'w)
Table API provides the Over class to configure the properties of the Over window. Over windows can be defined within the event time or processing time, and within the range specified as the interval, or row count.
Unbounded over window is specified using constants. That is, the interval specifies UNBOUNDED_RANGE, or the row count interval specifies UNBOUNDED_ROW. The bounded over window is specified by the size of the interval.
2.1Unbounded over window
/ / unbounded event time over window (time field "rowtime") .window (Over partitionBy'an orderBy 'rowtime preceding UNBOUNDED_RANGE as'w) / / unbounded processing time over window (time field "proctime") .window (Over partitionBy'an orderBy' proctime preceding UNBOUNDED_RANGE as'w) / / unbounded event time Row-count over window (time field "rowtime") .window (Over partitionBy'an orderBy 'rowtime preceding UNBOUNDED) _ ROW as'w) / / Unbounded processing time Row-count over window (time field "rowtime") .window (Over partitionBy'an orderBy 'proctime preceding UNBOUNDED_ROW as'w)
2.2 bounded over window
/ / bounded event time over window (time field "rowtime", 1 minute before) .window (Over partitionBy'an orderBy 'rowtime preceding 1.minutes as'w) / / bounded processing time over window (time field "rowtime", 1 minute before) .window (Over partitionBy'an orderBy' proctime preceding 1.minutes as'w) / / bounded event time Row-count over window (time field "rowtime") (previous 10 lines) .window (Over partitionBy'an orderBy 'rowtime preceding 10.rows as'w) / / bounded processing time Row-count over window (time field "rowtime", previous 10 lines) .window (Over partitionBy'an orderBy' proctime preceding 10.rows as'w)
2.3 Code exercise
We can synthesize what we have learned and implement a specific requirement with a complete piece of code. For example, count the average temperature of each piece of data for each sensor and the previous two rows of data.
Data preparation
Sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
Code analysis:
Package windows import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api. {EnvironmentSettings, Over Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row / * @ Package windows * @ File: FlinkSqlTumBlingOverTime.java * @ author Brother big data * @ date, 2020-12-28 21:45 * @ version V1.0 * / object FlinkSqlTumBlingOverTime {def main (args: Array [String]): Unit = {/ / build runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) / / setting parallelism to 1 is convenient for testing env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) / / setting event time val settings = EnvironmentSettings.newInstance () .useBlinkPlanner () .inStreamingMode () .build () / build tableEnv val tableEnv = StreamTableEnvironment.create (env) Settings) / / read data val inputPath = ". / data/sensor.txt" val inputStream = env.readTextFile (inputPath) / / convert to sample class type first (simple conversion operation) / / parse data is encapsulated into sample class val dataStream = inputStream .map (data = > {val arr = data.split (",") SensorReading (arr (0), arr (1). ToLong) Arr (2) .toDouble) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor [SensorReading] (Time.seconds (1)) {override def extractTimestamp (element: SensorReading): Long = element.timestamp * 1000L}) / / registers the data as a temporary table val dataTable = tableEnv.fromDataStream (dataStream,'id, 'temperature,' timestamp.rowtime as'ts) tableEnv.createTemporaryView ("sensor") DataTable) var tableRes= dataTable.window (Over partitionBy'id orderBy'ts preceding 2.rows as'ow) .select ('id,'ts,'id.count over' ow, 'temperature.avg over' ow) var tableSql= tableEnv.sqlQuery ("| select | id, | ts, | count (id) over ow | | avg (temperature) over ow | from sensor | window ow as (| partition by id | order by ts | rows between 2 preceding and current row |) | "" .stripMargin) tableRes.toAppendStream.print ("tableRes") tableSql.toAppendStream.print ("tableSql") env.execute ("FlinkSqlTumBlingOverTime")} case class SensorReading (id: String, timestamp: Long, temperature: Double)} |
Running result
At this point, the study on "the function and example usage of windows in FlinkSQL" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.