In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article shows you what is Flink windows and Time operation, the content is concise and easy to understand, can definitely brighten your eyes, through the detailed introduction of this article, I hope you can get something.
Time Typ
Time types commonly used in Flink:
Processing time
Uptake time
Event time
Processing time
This is the processing time of the last step in the above figure, indicating the processing time of the relevant operation in the server. For example, some operator operation time, the time on the server.
If you use processing time as the time processing method for stream processing, then all time-based operations will use the server's time to run related operations. For example, the one-hour processing time window will contain all the data arriving in the server within one hour. For example, when the application 9:15am starts execution, the first hour time processing window contains all the event data from 9:15 to 10:15, and the next time window is all data from 10:15 to 11:15.
Processing time is the simplest way to handle events and does not require time coordination between the flow and the machine. Therefore, it provides high performance and low latency. However, processing time does not provide accuracy in distributed or asynchronous environments (that is, when processing data, due to network jitter in a processing time window, for example, 9:15 to 10:15, most likely includes 9:00 event data).
Event time
The event time is the time when each individual event occurs on each device, such as the log time when the phone logs into APP. This time is the time when this data is recorded. Each piece of data has a timestamp indicating the time when the event of this data occurred. This time depends on each piece of data, not on the time of the machine. Event time processing must specify how to get the Event Time watermarks (used to describe how Event Time handles it).
When processing data according to the time of the event, the result should be completely consistent, that is to say, no matter how many times it is processed, the result is the same, which is the so-called idempotence of big data processing. Regardless of the arrival time of events and the orderly arrival of events (in a production environment, the time and order in which data often enters the server is not certain, and it is possible that the data generated first arrives at the server, depending on many network factors)
Uptake time
The ingestion time represents the time when some event data entered the Flink. In the source operation, each record gets the current timestamp of the source, that is, the received data automatically has an ingestion time, that is, for example, the time window is processed based on this time.
The ingestion time is between the event time and the processing time. As shown in the image above. Intake time is costly, but the results are predictable. Because the ingestion time uses a stable timestamp (which is allocated only once on the source side), the timestamp of each piece of data is fixed. And the data of the same intake time may be assigned to different processing time windows.
Windows
Windows enables us to deal with the core components of infinite data streams (which keep coming in). Windows splits our data stream into buckets. We need to apply the operator to buckets.
The first thing is to specify whether our stream data has key, and the operator with key is completely different from that without key.
Keyed windows
With keyby, it will be used in conjunction with windows. Any attribute in the entered data content can be used as a key. Window multitasking parallel computing is allowed on this stream, each logical key can be calculated independently, and the data of the same key will be sent to the same parallel task for processing.
Non-Keyed windows
Specified by using windowAll. The original data flow is not split into multiple logical tasks, and all window logic is executed by one window task, so the degree of parallelism is 1.
Windows lifecycle
In short, when the first element reaches the corresponding window, a windows is created. When the time (whether event time or processing time) reaches the timestamp range, the window is removed. In addition, each window has a Trigger and window Functions, and when the data arrives at the window, the function executed is window Functions, which contains all the calculations for the contents of the window. When the Trigger meets certain conditions, it will be triggered.
Windows Assigners
After specifying whether the stream data comes with key, the next step is to define the window's windows assigner, and windows assigner's job is to define how each incoming element is assigned to the window. Use the window () method for keyby and the windowAll () method for non-keyby.
A WindowAssigner is responsible for assigning each incoming element to one or more windows.
Each incoming data is assigned to one or more windows.
Flink's built-in window assigner is basically sufficient for most scenarios (tumbling windows scrolling window, sliding windows sliding window, session windows session window and global windows global window). You can also customize a window assigner by inheriting WindowAssigner. All built-in window assigner (except the global window) allocates data based on time (processing time or event time).
A time-based window has a start timestamp (inclusive) and an end timestamp (exclusive) to indicate the size of the window.
There are two categories of windows in Flink, the first is based on time (the most used), and the second is based on count.
Tumbling Windows Scroll window
The scrolling window allocator allocates each element to a window of a specified size, which has a fixed size and does not overlap. The picture above is taken apart at specified intervals with the flow of time.
Simple example code:
Scalaobject WindosApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream ("192.168.227.128", 9999) text.flatMap (_ .split (",") .map ((_, 1)) .keyby (0) .timewindow (Time.seconds (5)) .sum (1). SetParallelism (1) env.execute ("WindosApp")}}
The above code means to monitor the socket data stream and get the data every 5 seconds. TimeWindow means to divide windows by time, (there is also countWindow to divide windows by number). The default time is the processTime processing time.
Javapublic class JavaWindowApp {public static void main (String [args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("192.168.227.128", 9999); text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {String [] tokens = value.toLowerCase (). Split (",") For (String token: tokens) {if (token.length () > 0) {out.collect (new Tuple2 (token, 1));}) .keyby (0) .timeWindow (Time.seconds (5)) .sum (1) .print () .setParallelism (1) Env.execute ("JavaWindowApp");}} Sliding Windows sliding window
The sliding window allocator allocates each element to a fixed-size window, similar to a scrolling window, whose size can be modified by configuration, but the sliding window has another additional sliding parameter that controls when the sliding window starts, so it is possible for this window to overlap.
The picture above means that the window size of window1 is 10 minutes, the sliding size is 5 minutes, that is, every 5 minutes a window is generated, the size of this window is 10 minutes, this window is window2, and then window2 generates a window after 5 minutes, the size of the window is 10 minutes window3, and so on. So the data processed by the sliding window may overlap. A data element may be processed in multiple windows.
Usage scenario: count the TopN of the previous hour for every half hour.
Object WindosApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream ("192.168.227.128", 9999) text.flatMap (_ .split (",") .map ((_, 1)) .keyby (0) / / .timewindow (Time.seconds (5)) # Scroll window .timeWindow (Time.seconds (10)) Time.seconds (5)) .sum (1) .print () .setParallelism (1) env.execute ("WindosApp")}}
Count the data of nearly 10 seconds every 5 seconds. So when the server enters:
A,a,a,b,b,ba,a,a,b,b,ba,b,a,b,a,a
The console prints the results twice:
(a ~ ~ 10) (b ~ ~ 8) (b ~ ~ 8) (a ~ ~ 10) Window Functions
After defining the window allocator, you need to specify a calculation method on a per-window basis (we did a keyby sum operation in the above example). Window function processes every element in the window. Window function includes the following:
ReduceFunction
AggregationFunction
FoldFunction
ProcessWindowFunction
ReduceFunction and AggregationFunction perform more efficiently because Flink first does an incremental aggregation operation as the data arrives at each window. What ProcessWindowFunction gets is an Iterable of all the elements and additional information contained in the window, which is a full aggregate. As a result, ProcessWindowFunction is inefficient because Flink caches all data in the window.
ReduceFunction
The two elements in input are combined to produce the same type of output. Here, for example, we demonstrate the incremental effect by passing in a numeric data type.
Scalaobject WindowReduceApp {def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream ("192.168.227.128", 9999) text.flatMap (_ .split (",")) .map (x = > (1meme x.toInt)) / / 1Med 2Med 3Min 5 = > (1JI 1) (1Min 2) (1Med 3) (1M4) (1Jing 5) .keyby (0) / because key is all 1. So all the elements go to a task to execute. TimeWindow (Time.seconds (5)) / / scroll window. Reduce ((v1, v2) = > {/ reduce function acts on the window The incremental operation in the window can be completed, instead of waiting for all the data to arrive for one-time processing, but data pairwise processing println (v1 + "." + v2) (v1.room1, v1.room2 + v2.room2)} .print () .setParallelism (1) env.execute ("WindowReduceApp")}}
Server-side input:
1,2,3,4,5
The output in the console is as follows:
(1) (1) (1). (1) (1) (1) (3) (1) (6). (1) (4) (1) (10). (1) (5) (1) (15)
Reduce function acts on the window, you can complete the incremental operation in the window, do not wait for all the data to arrive after one-time processing, but data pairwise processing.
Javapublic class JavaWindowReduceApp {public static void main (String [args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("192.168.227.128", 9999); text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {String [] tokens = value.toLowerCase (). Split (",") For (String token: tokens) {if (token.length () > 0) {out.collect (new Tuple2 (1, Integer.parseInt (token) Reduce (new ReduceFunction () {@ Override public Tuple2 reduce (Tuple2 value1, Tuple2 value2) throws Exception {System.out.println ("value1 = [" + value1 + "]) Value2 = ["+ value2 +"] ") Return new Tuple2 (value1.f0,value1.f1 + value2.f1);}}) .print () .setParallelism (1); env.execute ("JavaWindowApp");}}
The output is as follows:
Value1 = [(1jue 1)], value2 = [(1jue 2)] value1 = [(1je 3)], value2 = [(1je 3)] value1 = [(1m et 6)], value2 = [(1jue 4)] value1 = [(1jue 10)], value2 = [(1jue 5)] (1mel 15) ProcessWindowFunction
ProcessWindowFunction can get an Iterable, get all the elements in the window, and have a context object that can access time and status information, providing more functionality than reducefunction. But this can incur resource and performance overhead, because elements cannot be aggregated incrementally; instead, they need to put all the data in one buffer.
Public class JavaWindowProcessApp {public static void main (String [args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource text = env.socketTextStream ("192.168.227.128", 9999); text.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {String [] tokens = value.toLowerCase (). Split (",") For (String token: tokens) {if (token.length () > 0) {out.collect (new Tuple2 (1, Integer.parseInt (token) }) .keyby (0) .timeWindow (Time.seconds (5)) .process (new ProcessWindowFunction () {@ Override public void process (Tuple tuple, Context context, Iterable elements) Collector out) throws Exception {System.out.println ("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]") Long count = 0; for (Tuple2 in:elements) {count++;} out.collect ("window:" + context.window () + "count:" + count) ) .print () .setParallelism (1); env.execute ("JavaWindowApp");}}
Server input:
1,2,3,4,5
Console output:
Tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1 start=1568542160000), (1) count:5 2), (1) count:5 3), (1) 4), (1)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00] window:TimeWindow {start=1568542160000, end=1568542165000} count:5
Output only once, indicating that you are waiting for all the data to be obtained before processing.
Use scenario: sort the data in the window. Sorting is not possible in Reduce.
What are the above contents of Flink windows and Time operations? have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.