In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to use the SessionWindow of Flink". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to use the SessionWindow of Flink".
SessionWindows session window: cut into different partition windows according to inactivity time, and calculate the window
Sample environment
Java.version: 1.8.xflink.version: 1.11.1
Sample data source (project code cloud download)
Building Development Environment and data of Flink system example
SessionWindow.java
Import com.flink.examples.DataSource;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows Import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List / * * @ Description sessionWindows session window: cut into different partition windows according to the inactivity time, and calculate the window * / public class SessionWindow {/ * traversing the collection, and return the * @ param args * @ throws Exception * / public static void main (String [] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () / / set stream processing time event. This time type must be set for session window. There are three types: / / 1.ProcessingTime: based on the time of operator processing, it uses the machine's system time as the time of data stream / / 2.IngestionTime: based on the time when the data enters flink streaming data flow / / 3.EventTime: based on the timestamp field that comes with the data. The application needs to specify how to extract the timestamp field env.setStreamTimeCharacteristic (TimeCharacteristic.IngestionTime) from record Env.setParallelism (4); DataStream inStream = env.addSource (new MyRichSourceFunction ()) DataStream dataStream = inStream.keyBy ((KeySelector) k-> k.f1) / / scroll by session window. If no partitioned data flow is specified within 2 seconds, the / / session window is calculated once based on the fact that there is no active data access after the specified time, and the window is considered to end. Window (EventTimeSessionWindows.withGap (Time.seconds (2). Reduce (new ReduceFunction () {@ Override public Tuple3 reduce (Tuple3 T1, Tuple3 T2) throws Exception {/ / returns the oldest return t1.f2 > t2.f2? T1: T2;}}); dataStream.print (); env.execute ("flink EventTimeSessionWindows job");} / * continuous output of analog data * / public static class MyRichSourceFunction extends RichParallelSourceFunction {@ Override public void run (SourceContext ctx) throws Exception {List tuple3List = DataSource.getTuple3ToList () For (Tuple3 tuple3: tuple3List) {ctx.collect (tuple3); / / output a Thread.sleep (2 * 1000) in 1 second;} @ Override public void cancel () {try {super.close () } catch (Exception e) {e.printStackTrace ();}
Print the result
2 > (Zhang San, man,20) 4 > (Li Si, girl,24) 2 > (Wang Wu, man,29) 4 > (Liu Liu, girl,32) 2 > (Wu Ba, man,30) Thank you for your reading. The above is the content of "how to use the SessionWindow of Flink". After the study of this article, I believe you have a deeper understanding of how to use Flink SessionWindow, and the specific use needs to be verified by practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.