Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

[Flink] Flink's processing of late data

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

The time allowed for delay is set through allowedLateness (lateness: Time).

The delay data is saved through sideOutputLateData (outputTag: OutputTag [T]).

Delay data is obtained through DataStream.getSideOutput (tag: OutputTag [X]).

Below, we will explain these methods respectively, and then give specific examples to deepen our understanding.

1. AllowedLateness (lateness: Time)

Def allowedLateness (lateness: Time): WindowedStream [T, K, W] = {

JavaStream.allowedLateness (lateness)

This

}

This method passes in a time value that sets the time at which data is allowed to be late, which is different from the concept of time in waterMark. Let's review it again.

Event time for waterMark= data-disordered time values are allowed

As the new data arrives, the value of waterMark is updated to the latest data event time-the out-of-order time value is allowed, but if a piece of historical data comes at this time, the watermark value is not updated. Generally speaking, waterMark is designed to receive as much out-of-order data as possible.

What about the time value here? The main purpose is to wait for late data. Within a certain period of time, if the data belonging to this window arrives, it will still be calculated. The calculation method will be explained in detail later.

Note: this method is only for event-time-based windows. If it is processing-time-based and a non-zero time value is specified, an exception will be thrown.

2. SideOutputLateData (outputTag: OutputTag [T])

Def sideOutputLateData (outputTag: OutputTag [T]): WindowedStream [T, K, W] = {

JavaStream.sideOutputLateData (outputTag)

This

}

This method saves belated data to a given outputTag parameter, and OutputTag is an object used to mark delayed data.

3. DataStream.getSideOutput (tag: OutputTag [X])

This method is called by the DataStream returned by window and other operations, and the object marked with delay data is passed in to obtain the delayed data.

4. Understanding of delayed data

Delayed data refers to:

After the current window [assuming the window range is 10-15] has been calculated, there is another data belonging to the window [assuming the event time is 13]. At this time, the window operation will still be triggered, which is called deferred data.

So the question is, how to calculate the delay time?

Assuming that the window range is 10-15 and the delay time is 2s, as long as the waterMark=15+2,10-15 window is used, the window operation can no longer be triggered, even if the event-time {of the new data

/ / input data format: name: timestamp

/ / flink:1559223685000

Try {

Val items = data.split (":")

(items (0), items (1) .toLong)

} catch {

Case _: Exception = > println ("input data does not conform to format:" + data)

("0", 0L)

}

) .filter (data = >! data._1.equals ("0") & & data._2! = 0L)

/ / assign timestamps to elements in the data flow, and periodically create watermarks to monitor the time progress of events

Val waterStream: DataStream [(String, Long)] = data.assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks [(String, Long)] {

/ / event time

Var currentMaxTimestamp = 0L

Val maxOutOfOrderness = 3000L

Var lastEmittedWatermark: Long = Long.MinValue

/ / Returns the current watermark

Override def getCurrentWatermark: Watermark = {

/ / allow a delay of three seconds

Val potentialWM = currentMaxTimestamp-maxOutOfOrderness

/ / ensure that the watermark can be increased in turn

If (potentialWM > = lastEmittedWatermark) {

LastEmittedWatermark = potentialWM

}

New Watermark (lastEmittedWatermark)

}

/ / Assigns a timestamp to an element, in milliseconds since the Epoch

Override def extractTimestamp (element: (String, Long), previousElementTimestamp: Long): Long = {

/ / use the time field value of the element as the timestamp of the data

Val time = element._2

If (time > currentMaxTimestamp) {

CurrentMaxTimestamp = time

}

Val outData = String.format ("key:% s EventTime:% s waterMark:% s"

Element._1

Sdf.format (time)

Sdf.format (getCurrentWatermark.getTimestamp))

Println (outData)

Time

}

})

Val lateData = new OutputTag [(String,Long)] ("late")

Val result: DataStream [String] = waterStream.keyBy (0) / / grouped according to name value

.window (TumblingEventTimeWindows.of (Time.seconds (5L) / / 5s span tumble window based on event time

/ * *

* for this window, 2 seconds of late data is allowed, that is, the first trigger is when watermark > end-of-window

* the condition for the second (or multiple) triggers is that late data arrives in this window within the watermark < end-of-window + allowedLateness time.

, /

.allowedLateness (Time.seconds (2L))

.sideOutputLateData (lateData)

.apply (new WindowFunction [(String, Long), String, Tuple, TimeWindow] {

Override def apply (key: Tuple, window: TimeWindow, input: Iterable [(String, Long)], out: Collector [String]): Unit = {

Val timeArr = ArrayBuffer [String] ()

Val iterator = input.iterator

While (iterator.hasNext) {

Val tup2 = iterator.next ()

TimeArr.append (sdf.format (tup2._2))

}

Val outData = String.format ("key:% s data:% s startTime:% s endTime:% s"

Key.toString

TimeArr.mkString ("-")

Sdf.format (window.getStart)

Sdf.format (window.getEnd))

Out.collect (outData)

}

})

Result.print ("window calculation result:")

Val late = result.getSideOutput (lateData)

Late.print ("late data:")

Env.execute (this.getClass.getName)

}

}

Next, enter data for test verification:

You can see that the window range is [15-20]. At this time, we enter a few more pieces of data that belong to this range:

Three pieces of data with an event time of 17, 16 and 15 are entered, all of which trigger the window operation. Let's try to enter data with a window range of [10-15]:

The data with a window range of [10-15] is late data, which has exceeded the maximum waiting time. We can try to calculate the waterMark value that allows late data in the previous window.

Window end time + delay time = maximum waterMark value

15 + 2 = 17

The current waterMark value is 20, which is greater than 17, so data with a window range of 10-15 is already late.

Let's calculate the critical value of the window time range of 15-20:

20 + 2 = 22

That is, when the waterMark rises to the range of 220.15-20 window, the data is late and can no longer participate in the calculation.

Remember the critical value we calculated, 22, and continue to enter the data test:

When entering data A, waterMark rises to 21, and entering data B within the range of 15-20 windows can still trigger window operations.

The input data C _ # waterMark rises to 22, which is equal to the critical value we just calculated. At this time, input, data B, can no longer trigger the window operation, so it belongs to late data.

Finally, summarize flink's handling of latency data:

If the delayed data has business needs, set the time allowed to delay, and each window has its own maximum time limit for waiting for delayed data:

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report