In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces the window operation of Flink, which is very detailed and has a certain reference value. Friends who are interested must finish reading it!
We often need to aggregate data in a time window dimension, and window is a problem that often needs to be solved in the flow application. The window operator of Flink provides us with an easy-to-use API. We can cut the data stream into windows and process the data in the window.
Type of window (window)
There are two main operations for windows, for Keyedstream and Datastream respectively. The main difference between them is that when setting up a window, one is .window (...) , one is .windowAll (…) . For Keyedstream windows, it enables multi-tasking parallel computing, and each logical key stream will be processed independently.
Stream .keyby (...) "assigner" [.trigger (...)] "trigger" (else default trigger) [.evictor (...)] "evictor" (else no evictor) [.allowedLateness (...)] "lateness" (else zero) [.sideOutputLateData (...)] "output tag" (else no side output for late data) .reduce / aggregate/fold/apply () "function" [.getSideOutput (...)] "output tag"
According to the Assigner of windows, windows can be divided into
Tumbling window, sliding window,session window,global window,custom window
Each window can be based on processing time and event time respectively, so that there are strictly speaking many types of windows.
There is also a window called count window, which is allocated according to the number of elements arriving, which will be mentioned later.
The life cycle of a window begins when the first element that belongs to the window arrives and ends when the first element that does not belong to the window arrives.
2. Operation of the window 2.1 Tumbling window
Fixed the same interval distribution window, there is no overlap between each window, look at the picture to understand.
The following example defines the flow of a window every 3 milliseconds:
WindowedStream Rates = rates .keyby (MovieRate::getUserId) .window (TumblingEventTimeWindows.of (Time.milliseconds (3); 2.2 Sliding Windows
As above, assign windows at the same interval, except that there is overlap between each window. If the overlapping part of the window is smaller than the window, the window will have multiple overlaps, that is, an element may be assigned to multiple windows.
The following example shows a stream with a window size of 10 milliseconds and an overlap of 5 milliseconds:
WindowedStream Rates = rates .keyby (MovieRate::getUserId) .window (SlidingEventTimeWindows.of (Time.milliseconds (10), Time.milliseconds (5)); 2.3 Session window
Such windows are mainly windowing based on active events, and they usually do not overlap and do not have a fixed start and end time. A session window is usually closed because the element has not been received for a period of time. In this flow of user interaction events, the first thing that comes to mind is to aggregate events into the session window (a period of continuous user activity), separated by inactive gaps.
/ / static interval WindowedStream Rates = rates .keyby (MovieRate::getUserId) .window (EventTimeSessionWindows.withGap (Time.milliseconds (10); / / dynamic time WindowedStream Rates = rates .keyBy (MovieRate::getUserId) .window (EventTimeSessionWindows.withDynamicGap ((); 2.4window
Assign all elements of the same keyed to a window. All right, that's it:
WindowedStream Rates = rates .keyby (MovieRate::getUserId) .window (GlobalWindows.create ()); III. Window function
The window functions are these four: ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction. The first two are performed more efficiently because Flink can incrementally aggregate each element that arrives at the window.
Flink must buffer all elements in the internal window before calling the function, so using ProcessWindowFunction is not efficient. However, ProcessWindowFunction can be used in conjunction with other window functions, which accept incremental information, and ProcessWindowFunction accepts window metadata.
To take an example of AggregateFunction, the following code groups MovieRate by user and allocates a 5-millisecond Tumbling window that returns the average of all scores scored by each user in the window.
DataStream > Rates = rates .keyby (MovieRate::getUserId) .window (TumblingEventTimeWindows.of (Time.milliseconds (5) .windows (new AggregateFunction > () {@ Override public AverageAccumulator createAccumulator () {return new AverageAccumulator ()) } @ Override public AverageAccumulator add (MovieRate movieRate, AverageAccumulator acc) {acc.userId = movieRate.userId; acc.sum + = movieRate.rate; acc.count++; return acc } @ Override public Tuple2 getResult (AverageAccumulator acc) {return Tuple2.of (acc.userId, acc.sum/ (double) acc.count) } @ Override public AverageAccumulator merge (AverageAccumulator acc0, AverageAccumulator acc1) {acc0.count + = acc1.count; acc0.sum + = acc1.sum; return acc0;}}); public static class AverageAccumulator {int userId Int count; double sum;}
The following is a partial output:
.. 1 > (44 min3. 0) 4 > (96. 5) 2 > (51. 0. 5) 3 > (90. 2. 75).
If you look at the code above, you can see that the add () function is particularly stiff because we want to return the Tuple2 type, that is, Integer is key, but AggregateFunction does not seem to provide this mechanism to allow the constructor of AverageAccumulator to provide arguments. Therefore, a combined version of ProcessWindowFunction and AggregateFunction is introduced here, and AggregateFunction is used for incremental superposition. When the window is closed, ProcessWindowFunction will be provided with the result returned by AggregateFunction for Tuple encapsulation:
DataStream > Rates = rates .keyby (MovieRate::getUserId) .window (TumblingEventTimeWindows.of (Time.milliseconds (5) .window (new MyAggregateFunction (), new MyProcessWindowFunction ()); public static class MyAggregateFunction implements AggregateFunction {@ Override public AverageAccumulator createAccumulator () {return new AverageAccumulator ();} @ Override public AverageAccumulator add (MovieRate movieRate, AverageAccumulator acc) {acc.sum + = movieRate.rate; acc.count++; return acc } @ Override public Double getResult (AverageAccumulator acc) {return acc.sum/ (double) acc.count;} @ Override public AverageAccumulator merge (AverageAccumulator acc0, AverageAccumulator acc1) {acc0.count + = acc1.count; acc0.sum + = acc1.sum; return acc0 }} public static class MyProcessWindowFunction extends ProcessWindowFunction, Integer, TimeWindow > {@ Override public void process (Integer key, Context context, Iterable results, Collector > out) throws Exception {Double result = results.iterator (). Next (); out.collect (new Tuple2 (key, result));} public static class AverageAccumulator {int count; double sum;}
You can get it, the result is the same as above, but the code looks much better.
4. Other operations 4.1 Triggers (trigger)
Triggers define when the window is ready to be processed by the window. Each window allocator has a trigger by default. If the default trigger does not meet your requirements, you can use trigger (...) Custom triggers.
Generally speaking, the default trigger is suitable for a variety of scenarios. For example, most event-time window allocators have an EventTimeTrigger as the default trigger. The trigger starts when the watermark passes through the end of the window.
The default trigger for PS:GlobalWindow is NeverTrigger, which never starts, so you must customize the trigger when using GlobalWindow.
4.2 Evictors (expeller)
Evictors can selectively remove elements after the trigger is triggered and before and / or after the window function is applied. Using Evictor prevents preaggregation because all elements of the window must be passed to Evictor for processing before applying computing logic
4.3 Allowed Lateness
When using the event-time window, the element may be late, for example, the watermark used by Flink to track the progress of the event-time has exceeded the window's end timestamp.
By default, late elements are discarded when the watermark exceeds the end of the window. But flink also allows you to specify the maximum allowed lateness for the window operator, so that you can tolerate receiving late elements until they are completely deleted, with a default value of 0.
To support this feature, Flink maintains the state of the window until allowed lateness expires. Once expired, flink deletes the window and deletes its status.
Treat late elements as side output.
SingleOutputStreamOperator result = input .keyby () .window () .allowedLateness () .sideOutputLateData (lateOutputTag). (function >); these are all the contents of the article "what are the window operations of Flink?" thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.