Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

IV. Flink--window, eventTime and wate

2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

I. Overview of window Mechanism 1.1 window of flink

​ streaming streaming computing is a data processing engine designed to deal with infinite data sets, which refers to a growing essentially infinite data set, while window is a means to cut infinite data into finite blocks for processing.

​ Window is the core of infinite data stream processing. Window splits an infinite stream into finite size "buckets" buckets on which we can do calculations.

1.2 types of window

Window can be divided into two main categories:

CountWindow: generates a Window based on the specified number of data items, regardless of time. Less used

TimeWindow: generate Window by time. Very commonly used, the following will be the main types of time windows. There are four main categories: scroll window (Tumbling Window), sliding window (Sliding Window), session window (Session Window) and global window (global window is less used).

1.2.1 scroll window (Tumbling Windows)

Overview: slice the data according to a fixed window length. There is only one working parameter, which is the window size.

Features: time alignment, fixed window length, no overlap.

The ​ scroll window allocator assigns each element to a window of a specified window size, which has a fixed size and does not overlap (both before and after the point in time). For example, if you specify a 5-minute scrolling window, the window is created as shown in the following figure:

​ figure 1.2.1 Scroll window

Applicable scenarios: suitable for BI statistics (aggregate calculation for each time period).

1.2.2 sliding window (Sliding Windows)

Summary: sliding window is a more generalized form of fixed window. The working parameters of sliding window are composed of fixed window length and sliding interval.

Features: time alignment, fixed window length, overlap.

The ​ sliding window allocator assigns elements to fixed-length windows, similar to scrolling windows, where the size of the window is configured by the window size parameter, and another window sliding parameter controls how often the sliding window starts. Therefore, if the sliding parameter of the sliding window is less than the window size, the window can overlap, in which case the element will be assigned to multiple windows.

For example, if you have a 10-minute window and a 5-minute slide, the 5-minute window in each window contains the data generated in the last 10 minutes, as shown in the following figure:

​ figure 1.2.2 Slide window

Applicable scenario: statistics in the most recent time period (calculate the failure rate of the recent 5min of an interface to decide whether to report to the police).

1.2.3 session window (Session Windows)

Overview: consists of a series of events combined with a specified length of timeout gap, similar to the session of web applications, that is, a new window will be generated if no new data is received for a period of time.

Features: time is not aligned. Window has no fixed length

The ​ session window allocator groups elements through session activities. Compared with scrolling windows and sliding windows, session windows do not overlap and have fixed start and end times. On the contrary, when it no longer receives elements within a fixed period of time, that is, inactive intervals are generated, the window closes. A session window is configured with a session interval that defines the length of the inactive period. When this inactive cycle occurs, the current session will be closed and subsequent elements will be assigned to the new session window.

​ figure 1.2.3 session window

1.3 window window api1.3.1 window api classification

There are two types of window data sources, one is a typical KV type (keyedStream), the other is a non-KV type (Non-keyedStream).

Difference:

KeyedStream:

You need to call keyBy to partition KV according to key before using window operation, and then you can call api of window operation, such as countWindow,timeWindow, etc.

Non-keyedstream:

If the keyBy operator is not used before using the window operation, then it is considered to be Non-keyedstream, the window api called is xxxWindowAll, such as countWindowAll,timeWindowAll, and because it is non-KV, it cannot be partitioned, that is, there is only one partition, then the parallelism of the window can only be 1. This is something to pay attention to.

1.3.2 countWindow

CountWindow triggers execution based on the number of the same key elements in the window, and only calculates the result of the key whose number of elements reaches the size of the window.

There are two uses: countWindow (window_size): specify only the window size, when the window is a scrolling window countWindow (window_size, slide): specify the window size and sliding interval, when the window is a sliding window

Note: the window_size of CountWindow refers to the number of elements of the same Key, not the total number of elements entered.

1. Scroll window

The default CountWindow is a scrolling window. You only need to specify the window size. When the number of elements reaches the window size, the execution of the window will be triggered.

Import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WindowTest {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () DataStreamSource source = env.readTextFile ("/ test.txt"); source.flatMap (new FlatMapFunction () {@ Override public void flatMap (String s, Collector collector) throws Exception {for (String S1: s.split (")) {collector.collect (new Tuple2 (S1) ) .keyby (0) .countWindow (5) .reduce (new ReduceFunction () {@ Override public Tuple2 reduce (Tuple2 T1, Tuple2 T2) throws Exception {return new Tuple2 (t1.f0, t1.f1 + t2.f1);}}) .print (); env.execute ("scroll window");}}

2. Sliding window

The function names of the moving window and the scrolling window are exactly the same, except that you need to pass in two parameters, one is window_size and the other is sliding_size.

The sliding_size in the following code is set to 2, that is, every time two data with the same key are received, the window range for each calculation is 5 elements.

Import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WindowTest {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () DataStreamSource source = env.readTextFile ("/ test.txt"); source.flatMap (new FlatMapFunction () {@ Override public void flatMap (String s, Collector collector) throws Exception {for (String S1: s.split (")) {collector.collect (new Tuple2 (S1) Reduce (new ReduceFunction () {@ Override public Tuple2 reduce (Tuple2 T1, Tuple2 T2) throws Exception {return new Tuple2 (t1.f0, t1.f1 + t2.f1);}) .print (); env.execute ("sliding window") }} 1.3.3 timeWindow

​ TimeWindow is to compose all the data within a specified time range into a window, and calculate all the data in a window at a time. Scrolling window and sliding window modes similar to those above are also supported. There are two working parameters: window_size and slide. Scrolls window when only window_size is specified.

1. Scroll window

The default time window of ​ Flink divides the window according to Processing Time, and divides the data obtained by Flink into different windows according to the time of entering Flink.

Import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector Public class WindowTest {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource source = env.readTextFile ("/ test.txt") Source.flatMap (new FlatMapFunction () {@ Override public void flatMap (String s, Collector collector) throws Exception {for (String S1: s.split ("")) {collector.collect (new Tuple2 (S1, 1)) Reduce (new ReduceFunction () {@ Override public Tuple2 reduce (Tuple2 T1, Tuple2 T2) throws Exception {return new Tuple2 (t1.f0, t1.f1 + t2.f1);}}) .print (); env.execute ("scroll window") }}

2. Sliding window

Similar to the above, the parameter slide is added to the parameter, that is, the sliding interval. The interval can be specified by one of Time.milliseconds (x), Time.seconds (x), Time.minutes (x), and so on.

1.3.4 window reduce

That is, the reduce operator is executed after the window operator, and the usage is the same as the normal reduce, except that the unit of reduce is a window. That is, each window returns the reduce result once. The program is on it, so I won't repeat it.

1.3.5 window fold

That is, the fold operator is executed after the window operator, and the usage is the same as the normal fold, except that the unit of fold is a window. That is, each window returns the reduce result once. The program is on it, so I won't repeat it.

1.3.6 window aggregation operation

It refers to these aggregation operators, such as max, min, and so on, which are only used after the window operator, and each window returns the aggregate result once, instead of, as usual, each aggregate result.

II. Classification of time in time, watermark and window2.1 flink

In flink, time has different categories, as follows:

Event Time:

Is the time when the event was created. It is usually described by the timestamp in the event, for example, in the collected log data, each log records its own generation time, and Flink accesses the event timestamp through the timestamp allocator.

Ingestion Time:

Is the time when the data enters the Flink.

Processing Time:

Is the local system time for each operator that performs time-based operations, machine-dependent, and the default time attribute is Processing Time. That is, the current time when the data is processed.

What's the difference between these times? Because network transmission takes time, Ingestion Time is not necessarily equal to Event Time, and in many cases it is different. Similarly, Processing Time represents the time of data processing, if the data was collected a long time ago and processed now, then it is obvious that the three times time will not be equal.

​ figure 2.1 the concept of flink-- time

Example:

The time for a log to enter Flink is 2017-11-12 10, and the system time for Window is 2017-11-12. The content of the log is as follows:

2017-11-02 18 3715 624 INFO Fail over to rm2

As you can see, none of the three time are equal. For the business, which time is the most meaningful to count the number of fault logs in the 1min? -- eventTime, because we need to make statistics according to the time when the log is generated. But the default window time for flink is Processing Time, so how to introduce eventTime?

2.2 introduction of eventTime

​ in Flink streaming, the vast majority of businesses will use eventTime, generally only when eventTime can not be used, will be forced to use ProcessingTime or IngestionTime. ProcessingTime is used by default. So how do you specify that flink uses the specified time?

2.2.1 introduction method 1: set env time type StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (time type); / / three types of time correspond to the following: time when TimeCharacteristic.EventTime; eventtimeTimeCharacteristic.IngestionTime; arrives at flink, time for TimeCharacteristic.ProcessingTime; to process data

This approach takes effect globally for the entire env and directly sets the default time of the env to eventtime. Later window operations will use eventtime as the time basis by default. This approach won't work if you want different windows to set different time types.

2.2.2 introduction method 2: set the actual type of window stream.window (TumblingEventTimeWindows.of (Time.seconds (5)) separately. Window this api is the total api of all windows, and other window api is encapsulated through this api. You can use this total api, the parameter directly to the type of window, for example, the above is the timewindow of the specified eventtime, so that it does not affect the time type of the entire env. Similarly, other time types of windows, such as SlidingEventTimeWindows sliding eventtime windows, basically know what type of time (three major time types) and what type of window (sliding, scrolling, conversation window) by looking at the name. Note: eventtime does not have a session window, and the principle of processingTime and 2.3watermark introduces the background.

​ we know that there is a process and time in stream processing from event generation to flow through source, and then to operator. Although in most cases, the data flowing to operator is based on the time order of events, it does not rule out the occurrence of disorder due to network, back pressure and other reasons. The so-called disorder means that the sequence of events received by Flink is not strictly arranged according to the Event Time order of events.

​ figure 2.3 data disorder

​, then there is a problem at this time. Once there is disorder, if we only decide the operation of window based on eventTime, we cannot know whether all the data are in place, but we cannot wait indefinitely. At this time, we must have a mechanism to ensure that after a specific time, we must trigger window to calculate. This special mechanism is Watermark.

Explanation:

If the window operation is triggered only according to the eventtime of the arriving event, assume that there is an event1~5. If the order of arrival is out of order, for example, event5 is the first to arrive, and then event1 is also reached, then how does the flink know if there is any data in the middle? There is no way, we cannot be sure whether the data will arrive completely, and we cannot wait indefinitely. So a mechanism is needed to deal with this situation.

2.3.2 principle of watermark mechanism

​ Watermark is a mechanism for measuring the progress of Event Time. It is a hidden attribute of the data itself, which carries the corresponding Watermark. Watermark is used to deal with out-of-order events, while the correct handling of out-of-order events is usually implemented by Watermark mechanism combined with window.

The Watermark in the ​ data stream is used to indicate that the data whose timestamp is less than Watermark has arrived, so the execution of window is also triggered by Watermark.

​ Watermark can be understood as a delay trigger mechanism. We can set the delay time t of Watermark. Each time the system verifies the largest maxEventTime in the data that has arrived, and then determines that all data whose eventTime is less than maxEventTime-t has arrived. If the stop time of a window is equal to maxEventTime-t, then the window is triggered by watermark to execute.

Explanation:

​ watermark is a probabilistic mechanism. Suppose event1~5, if the event5 has already arrived, then in fact, in the order in which the event was generated, normally, the previous event1~4 should also have arrived. In order to ensure the arrival of the previous event1~4 (actually more arrivals, but not all of them), a certain delay time t is provided after the event5 arrives. When the event5 arrives and t time has elapsed, under normal circumstances, the previous event1~4 will arrive with a high probability. If it does not arrive, in a few cases, then it is assumed that all the event before the event5 have arrived, regardless of whether they have actually arrived or not. If it arrives after the delay time, the old data will be discarded directly. So in fact, watermark is a mechanism to ensure the arrival of more event out of order, providing a certain delay mechanism, and because it will only delay a certain amount of time, it will not cause flink to wait indefinitely.

The watermark of an ordered data stream is as follows: (watermark is set to 0)

​ figure 2.4 watermark of ordered data streams

The watermark of disordered data flow is as follows: (watermark is set to 2)

​ figure 2.5 watermark of disordered data streams

​ when the Flink receives each piece of data, it will generate a Watermark, which is equal to the maxEventTime-delay length t in all the current arrival data, that is, the Watermark is carried by the data. Once the Watermark carried by the data is later than the stop time of the current untriggered window, the execution of the corresponding window will be triggered. Because Watermark is carried by data, if new data cannot be obtained during the run, windows that are not triggered will never be triggered.

In the figure above, the maximum delay arrival time we set is 2s, so the Watermark for an event with a timestamp of 7s is 5s, and the Watermark for an event with a timestamp of 12s is 10s. If our window 1 is 1s~5s and window 2 is 6s~10s, then the Watermarker of the event with a timestamp of 7s just triggers window 1, and the Watermark with the timestamp of 12s triggers window 2.

​ Window will continue to be generated, data within this Window range will be continuously added to Window, and all untriggered Window will wait for triggering. As long as Window is not triggered, data belonging to this Window range will be added to Window until Window is triggered, and data received from triggered Window will be discarded when Window is triggered. If there is no new data in the resulting window, there will be no watermark, and the window will not be triggered for calculation.

2.3.3 conditions for triggering calculation of watermark

Watermark time (max_eventTime-t) > = window_end_time

There is data in [window_start_time,window_end_time).

2.3.4 how watermark is generated

Punctuated: continuous generation

Each incremental EventTime in the data stream produces a Watermark.

In the actual production, the Punctuated mode will generate a large number of Watermark in the scenarios with high TPS, which will cause pressure on the downstream operators to a certain extent, so only in the scenarios with very high real-time requirements will we choose the Punctuated method to generate Watermark.

Periodic: periodic generation

Periodically (at regular intervals or reaching a certain number of records) to produce a Watermark.

In the actual production, the way of Periodic must combine the two dimensions of time and the number of accumulated bars to continue to generate Watermark periodically, otherwise there will be a great delay in extreme cases.

These two have different api implementations, which will be discussed below

2.4introduction of watermark and interface 2.4.1 watermark

Need to introduce eventime first, then watermark

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); DataStreamSource source = env.readTextFile ("/ test.txt"); / / the implementation class source.assignTimestampsAndWatermarks (xx) of the introduced watermark

There are two types of watermark implementations, and there are two interfaces corresponding to the above two ways of generating watermark:

AssignerWithPeriodicWatermarks; periodically generates watermark, that is, PeriodAssignerWithPunctuatedWatermarks; Punctuated: continuously generates 2.4.2 AssignerWithPeriodicWatermarks interface

Take a look at the source code of AssignerWithPeriodicWatermarks, which is mainly used to generate watermark periodically.

Public interface AssignerWithPeriodicWatermarks extends TimestampAssigner {/ / get the current watermark @ Nullable Watermark getCurrentWatermark ();} / / parent interface = public interface TimestampAssigner extends Function {/ / get the current timestamp long extractTimestamp (T var1, long var2);}

The main thing is that there are two methods that need to be overridden, and getCurrentWatermark () is used to generate watermark,extractTimestamp to get the timestamp of each event.

Since this is an API that periodically generates watermark, you need to specify how long the generation period is, which needs to be specified in the configuration of env, such as:

Env.getConfig (). SetAutoWatermarkInterval (n ms); remember that the interval is in milliseconds

Example:

/ * processing watermark*/public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks {/ / watermark delay time t according to eventTime creation, in milliseconds private final long maxOutOfOrderness = 3500; / / 3.5 seconds / / save the current maximum timestamp private long currentMaxTimest / / get the time according to the passed event, and then replace it if it is larger than the current maximum time, otherwise keep it. Because the data arriving out of order cannot guarantee that the time is incremented @ Override public long extractTimestamp (MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime (); currentMaxTimestamp = Math.max (timestamp, currentMaxTimestamp); return timest} / / returns watermark @ Override public Watermark getCurrentWatermark () {/ / return the watermark as current highest timestamp minus the out-of-orderness bound return new Watermark (currentMaxTimestamp-maxOutOfOrderness);}}

Coupled with the set setAutoWatermarkInterval (n ms), watermark can be generated periodically.

2.4.3 AssignerWithPunctuatedWatermarks interface

Take a look at the source code of AssignerWithPunctuatedWatermarks, which is mainly used to generate watermark in real time.

Public interface AssignerWithPunctuatedWatermarks extends TimestampAssigner {/ / get the latest watermark @ Nullable Watermark checkAndGetNextWatermark (T var1, long var2);} / / parent interface public interface TimestampAssigner extends Function {/ / get timestamp long extractTimestamp (T var1, long var2) from event;}

The way of writing is actually similar to the above, except that the time interval for generating watermark will not be set here.

2.4.4 watermark implementation classes included with flink

1 、 BoundedOutOfOrdernessTimestampExtractor

A class that inherits the AssignerWithPeriodicWatermarks interface, look at its source code

Package org.apache.flink.streaming.api.functions.timestamps;import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.time.Time;public abstract class BoundedOutOfOrdernessTimestampExtractor implements AssignerWithPeriodicWatermarks {private static final long serialVersionUID = 1L; private long currentMaxTimest private long lastEmittedWatermark =-9223372036854775808L; private final long maxOutOfOrderness / / one parameter is received in the constructor, which is the delay time t public BoundedOutOfOrdernessTimestampExtractor (Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds ())

< 0L) { throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } else { this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness; } } public long getMaxOutOfOrdernessInMillis() { return this.maxOutOfOrderness; } //需要重写的方法,用于获取timestamp public abstract long extractTimestamp(T var1); //获取watermark的方法已经写好了,用传递进来的延迟时间t来计算得出watermark public final Watermark getCurrentWatermark() { long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness; if (potentialWM >

= this.lastEmittedWatermark) {this.lastEmittedWatermark = potentialWM;} return new Watermark (this.lastEmittedWatermark);} public final long extractTimestamp (T element, long previousElementTimestamp) {long timestamp = this.extractTimestamp (element); if (timestamp > this.currentMaxTimestamp) {this.currentMaxTimestamp = timest} return timest}}

This class is a watermark that enables users to customize the delay time t.

2 、 AscendingTimestampExtractor

It is also a class that inherits the AssignerWithPeriodicWatermarks interface. Data sources with stable incremental timestamps, such as kafka partition data, each piece of information is incremented by + 1, which applies to this class. You just need to rewrite

ExtractAscendingTimestamp method.

2.5 example of using eventTime, window and watermark together package flinktest;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment Import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class EventTimeTest {public static void main (String [] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); env.getConfig () .setAutoWatermarkInterval (1000); DataStreamSource source = env.readTextFile ("/ tmp/test.txt") Source.assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds (3000)) {@ Override public long extractTimestamp (String s) {return Integer.valueOf (s.split (") [0]);}}) .flatMap (new FlatMapFunction () {@ Override public void flatMap (String s, Collector collector) throws Exception {Tuple2 tmpTuple = new Tuple2 () For (String S1: s.split ("")) {tmpTuple.setFields (S1, 1); collector.collect (tmpTuple) Reduce (new ReduceFunction () {@ Override public Tuple2 reduce (Tuple2 T1, Tuple2 T2) throws Exception {return new Tuple2 (t1.f0, t1.f1 + t2.f1)) ) .print (); try {env.execute ("eventtime test");} catch (Exception e) {e.printStackTrace ();}

Class inheritance structure of window api

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report