In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the concept of Flink watermark delay and window allowable delay". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the concept of Flink watermark delay and window allowable delay".
When link opens the window to process event time (Event Time) data, it can set watermark delay and window allowable delay (allowedLateness) to ensure the integrity of the data. Both of them are easy to be confused at first contact because they both set the delay time. The next part of this article will discuss and analyze the concept and difference of "watermark delay" and "window allowable delay".
Watermark delay (WaterMark)
(1) Watermark
Due to the use of event time, away from the physical wall clock. The window does not know when it needs to be closed and calculated, so the problem needs to be solved with the help of watermarks. When the window encounters the water level mark, the default is that all the data in the window period has arrived, and the window calculation can be triggered.
(2) Watermark delay
The purpose of setting the delay time of the watermark is to delay the arrival of the watermark, so as to solve the problem of disorder. Through the delayed arrival of the watermark, the late data arriving in the delay time range can be added to the window calculation, which ensures the integrity of the data. The window calculation is triggered when the watermark arrives, and the late data that arrives after the watermark is discarded.
Window allowed delay (allowedLateness)
When using StreamAPI, you can set the allowedLateness window delay after opening the window. It is explained on the official website as follows:
By default, when the watermark reaches the end of the window, the late element is deleted. However, Flink allows you to specify the maximum allowable delay for window operators. Allows delay to specify the amount of time an element is delayed before it is deleted, with a default value of 0. When an element arrives after the watermark passes through the end of the window, and its arrival time is within the time of running delay at the end of the window, it is still added to the window. Depending on the trigger used, elements that are delayed but not discarded may trigger window calculations again. This is the case with EventTimeTrigger. To do this, Flink maintains the state of the window until the delay they allow expires. Once this happens, Flink deletes the window and deletes its state, as described in the window lifecycle section.
Simple understanding: usually the late data will be deleted after the watermark arrives, while the window delay refers to the allowable retention time of the data before it is deleted. In other words, the late data should be deleted after the watermark arrives, but if the window delay is set, the late data that arrives after the watermark to the window delay period will still be added to the window calculation and trigger the window calculation again.
One Demo and two conjectures
Let me use a Demo and two conjectures to help you understand these two concepts.
Example: receive Kafka data in JSON format such as: {"word": "a", "count": 1, "time": 1604286564}. We open a 5-second tumbling windows scrolling window and use word as the key to accumulate count values in the window. At the same time, set the watermark delay for 2 seconds and the window delay for 2 seconds. The code is as follows:
Public class MyExample {public static void main (String [] args) throws Exception {/ / create the environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setParallelism (1); / / set the time property to env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) / / watermarking policy, which needs to inject Timestamp Assigner (which describes how to access the event timestamp) and Watermark Generator (the extent to which the event flow is out of normal range) WatermarkStrategy watermarkStrategy = WatermarkStrategy / / forBoundedOutOfOrderness belongs to (periodic periodicity). Period generators usually observe incoming events through onEvent (), and then issue watermarks when the framework calls onPeriodicEmit (). .forBoundedOutOfOrthood (Duration.ofSeconds (2)) .withTimestampAssigner (new SerializableTimestampAssigner () {@ Override public long extractTimestamp (WC wc, long l) {return wc.getEventTime () * 1000;}}); / / Kafka configuration Properties properties = new Properties () Properties.setProperty ("bootstrap.servers", "Kafka address: 9092"); properties.setProperty ("group.id", "test") / / Flink needs to know how to convert Kafka messages to Java objects (deserialization) KafkaDeserializationSchema (serialization needs to be written by yourself), JsonDeserializationSchema, AvroDeserializationSchema, TypeInformationSerializationSchema env.addSource (new FlinkKafkaConsumer ("flinktest1", new JSONKeyValueDeserializationSchema (true)) are provided by default. Properties) .setStartFromLatest () / / map build WC object .map (new MapFunction () {@ Override public WC map (ObjectNode jsonNode) throws Exception {JsonNode valueNode = jsonNode.get ("value")) WC wc = new WC (valueNode.get ("word") .asText (), valueNode.get ("count") .asInt (), valueNode.get ("time") .asLong (); return wc }}) / / set watermarking policy .assignTimestampsAndWatermarks (watermarkStrategy) .keyby (WC::getWord) / / window settings Here set to scroll window .window (TumblingEventTimeWindows.of (Time.seconds (5) / / set window delay .allowedLateness (Time.seconds (2)) .reduce (new ReduceFunction () {@ Override public WC reduce (WC wc)) WC T1) throws Exception {return new WC (wc.getWord (), wc.getCount () + t1.getCount ()) ) .print (); env.execute ();} static class WC {public String word; public int count; public long eventTime; public long getEventTime () {return eventTime } public void setEventTime (long eventTime) {this.eventTime = eventTime;} public String getWord () {return word;} public void setWord (String word) {this.word = word;} public int getCount () {return count } public void setCount (int count) {this.count = count;} public WC (String word, int count) {this.word = word; this.count = count;} public WC (String word, int count,long eventTime) {this.word = word; this.count = count This.eventTime = eventTime;} @ Override public String toString () {return "WC {" + "word='" + word +'\'+ ", count=" + count +'}';}
Conjecture 1:
The delay of the watermark is reached by 2s, so when 5 + 2 = 7s, all the data of the [0,5) window is considered to be complete, and the window calculation is triggered.
/ / write data {"word": "a", "count": 1, "time": 1604286560} / / 2020-11-02 11:09:20 {"word": "a", "count": 1, "time": 1604286561} / / 2020-11-02 11:09:21 {"word": "a", "count": 1, "time": 1604286562} / / 11:09:22 {"word": "a", "count": 1 "time": 1604286566} / / 2020-11-02 11:09:26 {"word": "a", "count": 1, "time": 1604286567} / / 2020-11-02 11:09:27 (window calculation triggered)
Console output
Analysis: through the test, it is found that the window calculation is finally triggered at the 7s, that is, 11:09:27, which is in line with our guess one. The delay of the watermark is reached by 2s, so when 5 + 2 = 7s, all the data of the [0,5) window is considered to be complete, and the window calculation is triggered. The result is 3, because only the first three pieces of data fall within the calculation range of the [0pr 5) window.
Conjecture 2:
If the window delay is set for 2 seconds, as long as the late data that is reached within the allowed delay time from the watermark to the window is added to the window, and the window operation is triggered again:
/ / continue to write data {"word": "a", "count": 1, "time": 1604286568} / / 2020-11-02 11:09:28 time reached the 8th second {"word": "a", "count": 1, "time": 1604286563} / / 2020-11-02 11:09:23 simulates a late data after the watermark, within the allowed delay range of the window, and belongs to the [0 5) window. The data will still trigger and participate in the calculation of the [0recom 5) window.
A new line has been added to the console output
/ / Let's continue to write data {"word": "a", "count": 1, "time": 1604286569} / / 2020-11-02 11:09:29 to the 9th second {"word": "a", "count": 1, "time": 1604286563} / / 2020-11-02 11:09:23 to simulate a late data that is behind the watermark and beyond the allowable delay range of the window and belongs to the [0 5) window. This data will not participate in and trigger the calculation of the [0djin5) window.
Looking at the console did not find a new output print.
Resolution: due to the delay, the watermark will trigger the calculation of the [0p5) window after the arrival of the 7s. If the window delay is not set, the late data of the watermark and belonging to the [0jin5) window will be discarded. In the above experiment, we set the window delay by 2s, and the effect is that within the allowed delay time of the window (between 7 + 2 = 9s) after the watermark, the data that is late and belongs to the [0prime5) window will still trigger a window calculation and participate in the window calculation. After 9s, that is, beyond the allowed delay time of the window, then the data that is late and belongs to [0re5) will be discarded.
Thank you for your reading, the above is "what is the concept of Flink watermark delay and window allowable delay". After the study of this article, I believe you have a deeper understanding of what the concept of Flink watermark delay and window allowable delay is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.