In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Watermark in Flink mainly solves the problem of keeping order. The root cause of the order preservation problem is that multiple tasks process data from the stream in parallel at the same time, and the order can not be guaranteed.
Upstream: generating watermark
Generally, WATERMARK is generated before WINDOW operation. There are two types of WATERMARK:
AssignWithPeriodicWatermarks:
The automatic injection of WATERMARK into the stream every N seconds is determined by the ExecutionConfig.setAutoWatermarkInterval. Each time the getCurrentWatermark method is called, if the resulting WATERMARK is not empty and is larger than the previous one, it is injected into the stream (emitWatermark)
Reference TimestampsAndPeriodicWatermarksOperator.processElement
AssignWithPunctuatedWatermarks:
Based on the event, a WATERMARK is injected into the stream, and each element has a chance to determine whether to generate a WATERMARK. If the resulting WATERMARK is not empty and larger than the previous one, it is injected into the stream (emitWatermark)
Reference TimestampsAndPunctuatedWatermarksOperator.processElement
Each generation of WATERMARK will overwrite the existing WATERMARK in the stream
Downstream: processing watermark
StatusWatermarkValve is responsible for aligning the Watermark of different Channel to the downstream of pipeline. The concept of alignment is that the Watermark time of the current Channel is greater than the minimum Watermark time of all Channel.
Handling of WindowOperator:
WindowOperator.processElement
WindowAssigner.assignWindows assigns a sliding window to the current message
Commonly used are: TumblingEventTimeWindows: assign windows according to the EventTime of the message (generate a single window at a time)
TumblingProcessingTimeWindows allocates windows according to the current time (generates a single window at a time)
Need to be used with StreamExecutionEnvironment.setStreamTimeCharacteristic (default is TimeCharacteristic.ProcessingTime), this must match
Otherwise, the sliding window cannot be triggered normally.
Actual observation results:
If you use ProcessingTimeWindows, even if the time of Event itself lags behind the window time, it will be triggered no matter whether you use WATERMARK or not, the data in the window will be out of order, that is, the data in the post-arrival window will be out of order before the data in the first-come window. If you use EventTimeWindow, the alignment of data and window time will not be out of order, and the data in the same window can not be strictly guaranteed in order. You need SORT. The last batch of data is missing, and the missing data depends on WATERMARK's MAXOUTOFORDERNESS default WATERMARK algorithm, which is based on the maximum time of the element. When no new element enters the stream, the water level no longer rises, and if you subtract MAXOUTOFORDERNESS, the last batch of data cannot fall below the water level. As a result, WINDOW cannot trigger the addition of the current sliding window and objects to WindowState, and different WindowState will be used according to different application scenarios. The type of WindowState is determined by the specific operation of WindowedStream, and the corresponding StateDescriptor is generated, and the add/get behavior of different WindowState is different. For example, HeapListWindowState appends the current object to the List corresponding to currentNamespace (that is, Timewindow). For example, HeapAggregateState applies Aggregate function to the current object and updates the results
Conditions for Window trigger
There are two points in WindowOperator that check whether the window is triggered, and the two check conditions are different.
ProcessElement this is triggered when new stream data comes in
Check condition: watermark time > = window maximum time see EventTimeTrigger.onElement
If the window cannot be triggered, InteralTimeService.registerEventTimeTimer is called to register a timer, which is triggered on the condition of the maximum time of the KEY+ window, and the timer will be destroyed automatically after a certain time. Time is the window maximum time + WindowOperator.allowedLateness WindowOperator.allowedLateness can be passed through Stream.window (...). AllowedLateness (...) Set up. Generally, it should be slightly larger than WatermarkGenerator's maxOutOfOrderness.
OnEventTime or onProcessingTime depends on the type of Watermark, which is triggered when Watermark is updated (InteralTimeService.advanceWatermark). The time of the current Watermark is compared with the time of the previously registered timer. If the timer still exists and the Watermark time is longer than the timer time, the window can be triggered. See EventTimeTrigger.onEventTime
Reference http://blog.csdn.net/lmalds/article/details/52704170
WATERMARK and ordinary data are processed separately.
If an element comes too late element.getTimestamp + allowedLateness
< currentWatermark 会有一个特殊的OutputTag 和正常的流数据区分开 参考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html 如果窗口来的过晚, window.maxTimestamp + allowedLateness < currentWatermark, 则窗口会被直接丢弃 Watermark 的问题: 默认的Watermark机制是数据驱动的,新的数据进入才会触发水位上升, 而由于maxOutOfOrderness 的存在, watermark < 最大流数据时间 < 当前窗口结束时间 根据之前的分析,最新的时间窗口总是不会被触发,除非更新的数据进入再次提高水位到当前窗口结束时间以后, 如果数据进入的频率低或者没有新的数据进入流,那最新的时间窗口被处理的延时会非常高甚至永远不会被触发,这在实时性要求高的流式系统是很致命的. 比如一个银行系统,要做客户账号层面的保序,每个账号的交易可能一天只有几笔甚至一笔,如果我们在Window 处理的时候KEY BY 账号就会引起上述问题. 我们可以考虑KEY BY的条件改为 HASH(账号) 再取模,然后在窗口处理中再次根据账号分组,这样虽然处理复杂一些,但是保证了窗口中数据的频次 另外一种方案是优化WATERMARK生成的机制,如果一段时间后WATERMARK仍然没有变化,那就将WATERMARK自动上涨一次到当前窗口的结束时间,这样保证窗口处理的延时有个上限 public abstract class AbstractWatermarkGenerator implements AssignerWithPeriodicWatermarks { private static final long serialVersionUID = -2006930231735705083L; private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class); private final long maxOutOfOrderness; // 10 seconds private long windowSize; private long currentMaxTimestamp; private long lastTimestamp = 0; private long lastWatermarkChangeTime = 0; private long windowPurgeTime = 0; public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) { this.maxOutOfOrderness = maxOutOfOrderness; this.windowSize = windowSize; } public AbstractWatermarkGenerator() { this(10000, 10000); } protected abstract long extractCurTimestamp(T element) throws Exception; public long extractTimestamp(T element, long previousElementTimestamp) { try { long curTimestamp = extractCurTimestamp(element); lastWatermarkChangeTime = new Date().getTime(); currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp); windowPurgeTime = Math.max(windowPurgeTime, getWindowExpireTime(currentMaxTimestamp)); if (logger.isDebugEnabled()) { logger.debug("Extracting timestamp: {}", currentMaxTimestamp); } return curTimestamp; } catch (Exception e) { logger.error("Error extracting timestamp", e); } return 0; } protected long getWindowExpireTime(long currentMaxTimestamp) { long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize); long windowEnd = windowStart + windowSize; return windowEnd + maxOutOfOrderness; } public Watermark getCurrentWatermark() { long curTime = new Date().getTime(); if (currentMaxTimestamp >LastTimestamp) {if (logger.isDebugEnabled ()) {logger.debug ("Current max timestamp has been increased since last");} lastTimestamp = currentMaxTimest lastWatermarkChangeTime = curTime;} else {long diff = windowPurgeTime-currentMaxTimestamp If (diff > 0 & & curTime-lastWatermarkChangeTime > diff) {if (logger.isDebugEnabled ()) {logger.debug ("Increase currentMaxTimestamp once");} currentMaxTimestamp = windowPurgeTime; lastTimestamp = currentMaxTimest lastWatermarkChangeTime = curTime;}} return new Watermark (currentMaxTimestamp-maxOutOfOrderness) }}
In the actual test, it is found that whether WATERMARK is triggered or not is related to the concurrency of the operator and the location where the WATERMARK is generated.
The test results are as follows:
Env default parallism 10: Source parallism 20, window parallism 6, watermark generation is defined before keyby
Source is a single SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator, with a concurrency of 10 and WINDOW SUBTASK of 6. The window can normally trigger Env default parallism 20, Source parallism 20, window parallism 6, and watermark generation is defined before keyby.
Synthesize a SUBTASK before Source to WINDOW operator, concurrency degree is 20, WINDOW SUBTASK concurrency degree is 6, window can trigger Env default parallism 60, Source parallism 20, window parallism 10 normally, watermark generation is defined before keyby.
Source is a separate SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator. The parallelism is 60 and the window cannot be triggered normally. (personal understanding is that the concurrency of the operator expands, which causes some CHANNEL processing threads to have no data. According to the above explanation, WATERMARK alignment will take the minimum WATERMARK of all CHANNEL, causing the water level not to rise.
As can be seen from FLINK CONSOLE's WATERMARKS) Env default parallism 60, Source parallism 20, window parallism 10, watermark generation is defined after Source
Source is a single SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator. The parallelism is 60 and the parallelism of window SUBTASK is 10. The window can trigger Env default parallism 10, Source parallism 20, window parallism 20 normally. Watermark generation is defined before keyby.
Source is a single SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator, with a concurrency of 10 and WINDOW SUBTASK of 20. The window can normally trigger Env default parallism 30, Source parallism 20, window parallism 20, and watermark generation is defined before keyby.
Source is a single SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator. The concurrency is 30 and WINDOW SUBTASK is 20. The window cannot trigger Env default parallism 30, Source parallism 20, window parallism 20 normally. Watermark generation is defined before keyby.
Source is a single SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator. The concurrency is 30 and the WINDOW SUBTASK concurrency is 20. The window cannot trigger Env default parallism 30, Source parallism 20, window parallism 20 normally. Watermark generation is defined after Source.
Source is a single SUBTASK with a concurrency of 20, and then a SUBTASK is synthesized before the WINDOW operator. The concurrency is 30 and WINDOW SUBTASK is 20, and the window can be triggered normally.
Therefore, note that before the WINDOW operator, it is best to avoid letting the concurrency of the downstream operator exceed the upstream operator, otherwise, the generation of WATERMARK will be placed at the front end of the DAG as far as possible, so that WATERMARK can be passed to the downstream operator.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.