In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Based on how Flink 1.11.0 implements the Watermark mechanism of Flink, I believe many inexperienced people don't know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
How to deal with out-of-order data when using eventTime? We know that there is a process and time in flow processing from event generation, to flow through source, and then to operator. Although in most cases, the data flowing to operator comes according to the time sequence of events, it does not rule out the occurrence of disorder due to network delay and other reasons, especially when using kafka, the data of multiple partitions can not be guaranteed to be orderly. So in the window calculation, we can not wait indefinitely, there must be a mechanism to ensure that after a specific time, we must trigger the window to calculate. This special mechanism is watermark. Watermark is a mechanism used to deal with out-of-order events and to measure the progress of Event Time. Watermark can be translated as water mark.
1. The core principle of Watermark
The core nature of Watermark can be understood as a delayed trigger mechanism.
In the window processing of Flink, if it is determined that all the data arrive, you can do window calculation operations (such as summarization, grouping, etc.) on all the data of Window. If the data does not all arrive, then continue to wait for the whole of the data in the window to arrive before processing. In this case, the Water level Line (WaterMarks) mechanism is needed, which can measure the progress of data processing (expressing the integrity of data arrival), ensure that the event data (all) reach the Flink system, or calculate correct and continuous results as expected in the event of disorder and delay. When any Event enters the Flink system, a Watermarks timestamp is generated based on the current maximum event time.
So how does Flink calculate the value of Watermak?
Watermark = maximum event time to enter Flink (mxtEventTime)-specified delay time (t)
So how does a Window with Watermark trigger a window function?
If there is a window whose stop time is equal to or less than maxEventTime-t (then warkmark), then the window is triggered to execute.
The core processing flow is shown in the following figure.
2. Three uses of Watermark 1. Watermark in the originally ordered Stream
If the event time of the data element is ordered, the Watermark timestamp is generated sequentially with the event time of the data element, and the change of the water level and the event time are maintained all the time (because it is an ordered time, there is no need to set a delay, then t is 0. So watermark=maxtime-0 = maxtime), which is the water mark in the ideal state. When the Watermark time is greater than the Windows end time, the data calculation for the Windows is triggered, and so is the next Window. This situation is actually a special case of out-of-order data.
2. Watermark in disordered events
In reality, data elements are often not accessed into the Flink system according to the order in which they are generated for processing, and frequently appear out of order or late, which requires the use of Watermarks to deal with. For example, in the following figure, set the delay time t to 2.
3. Watermark in parallel data streams
In the case of multiple parallelism, Watermark has an alignment mechanism that takes the smallest Watermark of all Channel.
Third, set the core code of Watermark 1. First of all, correctly set the time semantics of event handling, generally using Event Time. SEnv.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); 2. Secondly, specify the mechanism for generating Watermark, including the time of delay processing and the corresponding fields of EventTime. As follows:
Note: the above code can be used regardless of whether the data is orderly or not. Ordered data is only a special case of unordered data.
IV. Watermark programming case
Test data: mobile phone call data of the base station, as follows:
Demand: count the record of the longest talk time every 5 seconds by base station.
StationLog is used to encapsulate base station data
Package watermark;//station1,18688822219,18684812319,10,1595158485855public class StationLog {private String stationID; / / Base Station ID private String from; / / call release private String to; / / called Party private long duration; / / duration of the call private long callTime / / public StationLog (String stationID, String from, String to, long duration, long callTime) {this.stationID = stationID; this.from = from; this.to = to; this.duration = callTime;} public String getStationID () {return stationID } public void setStationID (String stationID) {this.stationID = stationID;} public long getCallTime () {return callTime;} public void setCallTime (long callTime) {this.callTime = callTime;} public String getFrom () {return from;} public void setFrom (String from) {this.from = from;} public String getTo () {return to } public void setTo (String to) {this.to = to;} public long getDuration () {return duration;} public void setDuration (long duration) {this.duration = duration;}}
Code implementation: WaterMarkDemo is used to complete the calculation (note: in order to facilitate our test setting task, the parallelism is 1)
Package watermark;import java.time.Duration;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource Every five seconds, import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;// outputs the call log with the longest talk time in the past 10 seconds. Public class WaterMarkDemo {public static void main (String [] args) throws Exception {/ / gets the running environment of Flink streaming StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); env.setParallelism (1); / / sets the time interval for periodically generating water level lines. When the data flow is large, if each event produces a watermark, it will affect performance. Env.getConfig () .setAutoWatermarkInterval (100); / / default 100ms / / get the input stream DataStreamSource stream = env.socketTextStream ("bigdata111", 1234); stream.flatMap (new FlatMapFunction () {public void flatMap (String data, Collector output) throws Exception {String [] words = data.split (",") / / ID from to call duration callTime output.collect (new StationLog (words [0], words [1], words [2], Long.parseLong (words [3]), Long.parseLong (words [4]) ) .filter (new FilterFunction () {@ Override public boolean filter (StationLog value) throws Exception {return value.getDuration () > 0?true:false) ) .assignTimestampsAndWatermarks (WatermarkStrategy.forBoundedOutOfOrderness (Duration.ofSeconds (3)) .withTimestampAssigner (new SerializableTimestampAssigner () {@ Override public long extractTimestamp (StationLog element, long recordTimestamp) {return element.getCallTime () / / specify the field corresponding to EventTime}}) .keyby (new KeySelector () {@ Override public String getKey (StationLog value) throws Exception {return value.getStationID () / / grouping by base station}}) .timeWindow (Time.seconds (5)) / / sets the time window. Reduce (new MyReduceFunction (), new MyProcessWindows ()). Print (); env.execute ();} / / is used to deal with the data in the window, that is, to find the record with the longest talk time in the window. Class MyReduceFunction implements ReduceFunction {@ Override public StationLog reduce (StationLog value1, StationLog value2) throws Exception {/ / find the longest call history return value1.getDuration () > = value2.getDuration ()? After value1: value2;}} / / window processing is completed, what is the output of class MyProcessWindows extends ProcessWindowFunction {@ Override public void process (String key, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {StationLog maxLog = elements.iterator () .next (); StringBuffer sb = new StringBuffer () Sb.append ("window range is:") .append (context.window (). GetStart ()) .append ("- -") .append (context.window (). GetEnd ()) .append ("\ n") Sb.append ("Base Station ID:") .append (maxLog.getStationID ()) .append ("call time:") .append (maxLog.getCallTime ()) .append ("calling number:") .append ("calling number:") .append (maxLog.getFrom ()) .append ("called number:") .ap pend (maxLog) .getTo () .append ("\ t") .append ("talk length:") .append (maxLog.getDuration ()) .append ("\ n") Out.collect (sb.toString ());}}
After reading the above, have you mastered how to implement the Watermark mechanism of Flink based on Flink 1.11.0? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.