In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "what is the new watermarking strategy in flink". In the daily operation, I believe that many people have doubts about what the new watermarking strategy is in flink. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful for you to answer the doubts about "what is the new watermarking strategy in flink?" Next, please follow the editor to study!
Background
In versions prior to flink 1.11, two strategies for generating watermarks (Watermark) were provided, namely AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks, both of which inherit from the TimestampAssigner interface.
If users want to use different watermark generation methods, they need to implement different interfaces, but this gives rise to a problem, and it becomes complicated to add some general and common functions to the watermark. because we need to add new functions to both interfaces at the same time, which also results in code duplication.
So in order to avoid code duplication, the watermark generation interface of flink is reconstructed in flink 1.11.
New watermark generation interface
After we have built a DataStream, we use the assignTimestampsAndWatermarks method to construct the watermark, and the new interface needs to pass in a WatermarkStrategy object.
DataStream#assignTimestampsAndWatermarks (WatermarkStrategy)
What does the WatermarkStrategy interface do? There are many static methods and methods with default implementation, only one method is non-default and no default implementation, which is the following method.
/ * *
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
, /
@ Override
WatermarkGenerator createWatermarkGenerator (WatermarkGeneratorSupplier.Context context)
So by default, we just need to implement this method, which mainly returns a WatermarkGenerator, which we'll take a look at here.
@ Public
Public interface WatermarkGenerator {
/ * *
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
, /
Void onEvent (T event, long eventTimestamp, WatermarkOutput output)
/ * *
* Called periodically, and might emit a new watermark, or not.
*
*
The interval in which this method is called and Watermarks are generated
* depends on {@ link ExecutionConfig#getAutoWatermarkInterval ()}.
, /
Void onPeriodicEmit (WatermarkOutput output)
}
This method is simple and straightforward, and there are mainly two ways:
OnEvent: this method is called by every element. If we want to rely on each element to generate a watermark, and then transmit it downstream (optional, that is, to see whether to use output to collect watermarks), we can implement this method. OnPeripheral Emit: if we generate a watermark for each piece of data when the amount of data is large, it will affect performance, so there is also a method to generate watermarks periodically. The generation cycle of this watermark can be set as follows: env.getConfig (). SetAutoWatermarkInterval (5000L)
We implement a simple example of sending watermarks periodically:
In this onEvent method, we extract a time field from each element, but instead of generating a watermark to send it downstream, we save it in a variable. In the onPeriodicEmit method, we use the maximum log time minus the desired delay time to send the watermark downstream.
DataStream withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks (
New WatermarkStrategy () {
@ Override
Public WatermarkGenerator createWatermarkGenerator (
WatermarkGeneratorSupplier.Context context) {
Return new WatermarkGenerator () {
Private long maxTimestamp
Private long delay = 3000
@ Override
Public void onEvent (
Tuple2 event
Long eventTimestamp
WatermarkOutput output) {
MaxTimestamp = Math.max (maxTimestamp, event.f1)
}
@ Override
Public void onPeriodicEmit (WatermarkOutput output) {
Output.emitWatermark (new Watermark (maxTimestamp-delay))
}
}
}
}); built-in watermark generation strategy
To facilitate development, flink provides some built-in watermark generation methods for us to use.
Fixed delay to generate watermark
Provided by the static method forBoundedOutOfOrderness, the input parameter receives a Duration type interval, which is the maximum delay time we can accept. When using this delay strategy, we need to have a rough estimate of the delay time of the data.
WatermarkStrategy#forBoundedOutOfOrderness (Duration maxOutOfOrderness)
We implement a fixed delay watermark with a delay of 3 seconds, which can be done as follows:
DataStream dataStream =.
DataStream.assignTimestampsAndWatermarks (WatermarkStrategy.forBoundedOutOfOrderness (Duration.ofSeconds (3)
An implementation class BoundedOutOfOrdernessWatermarks of the WatermarkGenerator interface used by his underlying layer. Let's see if these two methods in the source code are very similar to what we wrote above.
@ Override
Public void onEvent (T event, long eventTimestamp, WatermarkOutput output) {
MaxTimestamp = Math.max (maxTimestamp, eventTimestamp)
}
@ Override
Public void onPeriodicEmit (WatermarkOutput output) {
Output.emitWatermark (new Watermark (maxTimestamp-outOfOrdernessMillis-1))
} monotonously incrementally generate watermarks
It is provided by static method forMonotonousTimestamps.
WatermarkStrategy.forMonotonousTimestamps ()
This is equivalent to the delay strategy mentioned above, which removes the delay time and uses the timestamp in event as the watermark.
You can use it in a program like this:
DataStream dataStream =.
DataStream.assignTimestampsAndWatermarks (WatermarkStrategy.forMonotonousTimestamps ())
Its underlying implementation is AscendingTimestampsWatermarks, in fact, it is a subclass of the BoundedOutOfOrdernessWatermarks class, without delay time, let's look at the specific source code implementation.
@ Public
Public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks {
/ * *
* Creates a new watermark generator with for ascending timestamps.
, /
Public AscendingTimestampsWatermarks () {
Super (Duration.ofMillis (0))
}
} acquisition of event time
Above we talked about two watermark generation strategies that come with flink, but when we use eventtime semantics, we want to extract eventtime from our own data, which requires TimestampAssigner.
@ Public
@ FunctionalInterface
Public interface TimestampAssigner {
.
Long extractTimestamp (T element, long recordTimestamp)
}
When we use it, we mainly extract the eventtime we want from our own element element.
Using flink's built-in watermarking strategy and eventtime extraction class, you can use:
DataStream dataStream =.
DataStream.assignTimestampsAndWatermarks (
WatermarkStrategy
.forBoundedOutOfOrthood (Duration.ofSeconds (5))
.withTimestampAssigner ((event, timestamp)-> event.f1); handles idle data sources
In some cases, due to less data generation, there is no data generation for a period of time, and then there is no watermark generation, which leads to problems in some downstream operations that depend on the watermark. For example, there are multiple operators upstream of an operator. In this case, the watermark takes the smaller values of the two upstream operators. If a certain upstream operator does not generate the watermark for a long time due to lack of data. The eventtime tilt problem occurs, making it impossible to trigger the calculation downstream.
So filnk uses the WatermarkStrategy.withIdleness () method to allow the user to mark a stream as idle when there is no record of arrival within the configured time (that is, the timeout). This means that the downstream data does not have to wait for the watermark to arrive.
The next time a watermark is generated and transmitted downstream, the data stream becomes active again.
Use the following code to process the idle data stream
WatermarkStrategy
.forBoundedOutOfOrthood (Duration.ofSeconds (20))
.withIdleness (Duration.ofMinutes (1)); at this point, the study of "what is the new watermarking strategy in flink" is over, hoping to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.