Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Introduction to window Mechanism of Flink

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article introduces the relevant knowledge of "Introduction to Flink's Window Mechanism". In the operation process of actual cases, many people will encounter such difficulties. Next, let Xiaobian lead you to learn how to deal with these situations! I hope you can read carefully and learn something!

Read the window difference:

Read the source code implementation relationship inside window

Window Assigner: Used to determine which window/windows an element is assigned to.

Trigger: Trigger. Determines when a window can be evaluated or cleared, and each window has its own Trigger.

Evictor:可以译为"驱逐者"。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。

读懂WindowAssignern内部实现机制,它主要是实现数据的分发,分发到不同的window中,我简单举例一个,我设置window的开始和结束时间,然后触发器发现我的window达到了结束时间,这个window就会触发。

一张图读懂trigger,evictor,emit的执行顺序

假设有一个滑动计数窗口,每2个元素计算一次最近4个元素的总和,那么窗口工作示意图如下所示:

测试验证代码:

import java.utilimport org.apache.flink.api.common.ExecutionConfigimport org.apache.flink.streaming.api.{TimeCharacteristic, environment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssignerimport org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, Trigger}import org.apache.flink.streaming.api.windowing.windows.TimeWindowobject FlinkWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val input = env.socketTextStream("localhost", 9001) val inputMap = input.flatMap(f => { f.split("\\W+") }).map(line =>(line ,1)) .keyBy(0).window(new WindowAssigner[Object,TimeWindow] { override def isEventTime = false override def getDefaultTrigger(env: environment.StreamExecutionEnvironment) = { ProcessingTimeTrigger.create() } override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext) = { val windows = new util.ArrayList[TimeWindow](7) //每隔1分钟统计历史5分钟的数据 val size =1000L * 60 * 5 val slide = 1000L * 60 val lastStart = timestamp - timestamp % slide var start = lastStart while ( { start > timestamp - size }) { start -= slide windows.add(new TimeWindow(start, start + size)) } //每隔1分钟统计历史1分钟的数据 val size1 =1000L * 60 val lastStart1 = timestamp - timestamp % slide println(timestamp % slide) var start1 = lastStart1 while ( { start1 > timestamp - size1 }) { windows.add(new TimeWindow(start1, start1 + size1)) start1 -= slide } windows } override def getWindowSerializer(executionConfig: ExecutionConfig) = new TimeWindow.Serializer }).sum(1) .print() env.execute() }}总结

WindowAssigner主要是把数据分发到不同的window窗口中去,然后每个window自己内部设置触发器,当数据还没有触发之前整个数据是存储在flink的state中,也就是状态存储。当window触发(Trigger的返回结果可以是)之后,Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。

"Flink的窗口机制介绍"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report