In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article will explain in detail how Flink calculates topN in real time. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
1. The knowledge points used
Flink creates kafka data sources
How to specify Watermark based on EventTime processing
Window, tumbling window and sliding window in Flink
Use of State statu
Implementing TopN function with ProcessFunction
two。 Case introduction
Through the user access log, the topN of the most active users of the platform in the recent period of time is calculated.
Create a kafka producer and send test data to kafka
Consume kafka data, use a sliding window to update the rankings at regular intervals
3. Data source
Here, kafka api is used to send test data to kafka. The code is as follows:
@ Data @ NoArgsConstructor @ AllArgsConstructor @ ToString public class User {private long id; private String username; private String password; private long timest} Map config = Configuration.initConfig ("commons.xml"); @ Test public void sendData () throws InterruptedException {int cnt = 0; while (cnt)
< 200){ User user = new User(); user.setId(cnt); user.setUsername("username" + new Random().nextInt((cnt % 5) + 2)); user.setPassword("password" + cnt); user.setTimestamp(System.currentTimeMillis()); Future future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user)); while (!future.isDone()){ Thread.sleep(100); } try { RecordMetadata recordMetadata = future.get(); System.out.println(recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("发送消息:" + cnt + "******" + user.toString()); cnt = cnt + 1; } } 这里通过随机数来扰乱username,便于使用户名大小不一,让结果更加明显。KafkaUtil是自己写的一个kafka工具类,代码很简单,主要是平时做测试方便。 4. 主要程序 创建一个main程序,开始编写代码。 创建flink环境,关联kafka数据源。 Map config = Configuration.initConfig("commons.xml"); Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper")); kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport")); kafkaProps.setProperty("group.id", config.get("kafka-groupid")); StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); EventTime 与 Watermark senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置属性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示按照数据时间字段来处理,默认是TimeCharacteristic.ProcessingTime /** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; 这个属性必须设置,否则后面,可能窗口结束无法触发,导致结果无法输出。取值有三种: ProcessingTime:事件被处理的时间。也就是由flink集群机器的系统时间来决定。 EventTime:事件发生的时间。一般就是数据本身携带的时间。 IngestionTime:摄入时间,数据进入flink流的时间,跟ProcessingTime还是有区别的; 指定好使用数据的实际时间来处理,接下来需要指定flink程序如何get到数据的时间字段,这里使用调用DataStream的assignTimestampsAndWatermarks方法,抽取时间和设置watermark。 senv.addSource( new FlinkKafkaConsumer010( config.get("kafka-topic"), new SimpleStringSchema(), kafkaProps ) ).map(x ->{return JSON.parseObject (x, User.class);}) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds (1000)) {@ Override public long extractTimestamp (User element) {return element.getTimestamp ();}})
As you can see from the code given earlier, since the User object is converted into a json string when it is sent to kafka, fastjson is used here, and it can be converted into JsonObject to handle it. I still convert it to the User object JSON.parseObject (x, User.class) here, which is easy to deal with.
Taking into account that the data may be out of order, we use the abstract class BoundedOutOfOrdernessTimestampExtractor that can deal with disorder, and implement the only unimplemented method extractTimestamp, disordered data, which will cause data delay. A Time.milliseconds (1000) is passed into the constructor, indicating that the data can be delayed by one second. For example, if the window length of 10s is calculated at 11s and the watermark is 10, the calculation will only be triggered. That is to say, if watermark is introduced to deal with out-of-order data, the data in this window can be tolerated at most, and the data of the window will arrive at the latest.
For a specific explanation of watermark, please refer to this article.
Https://blog.csdn.net/qq_39657909/article/details/106081543
Window statistics
In terms of business requirements, it may be an hour or the data of the past 15 minutes, and the ranking is updated every 5 minutes. Here, to demonstrate the effect, the window length is 10 seconds, and the slide is 5 seconds, that is, the ranking data of the past 10 seconds are updated every 5 seconds.
.keyby ("username") .timeWindow (Time.seconds (10), Time.seconds (5)) .timewindow (new CountAgg (), new WindowResultFunction ())
We use .keyby ("username") to group users and use .timeWindow (Time size, Time slide) to slide windows (10s window, 5s slide once) for each user. Then we use .aggregate (AggregateFunction af, WindowFunction wf) to do incremental aggregation operations, which can use AggregateFunction to aggregate data in advance and reduce the storage pressure on state. The last calculation is much more efficient than. Apply (WindowFunction wf) stores all the data in the window. The first parameter of the aggregate () method is used for
The CountAgg here implements the AggregateFunction interface, which is used to count the number of entries in the window, that is, add one when you encounter a piece of data.
Public class CountAgg implements AggregateFunction {@ Override public Long createAccumulator () {return 0L;} @ Override public Long add (User value, Long accumulator) {return accumulator + 1;} @ Override public Long getResult (Long accumulator) {return accumulator;} @ Override public Long merge (Long a, Long b) {return a + b;}}
The second parameter WindowFunction of .aggregate (AggregateFunction af, WindowFunction wf) outputs the aggregated result of each key and each window with other information. The WindowResultFunction we implement here encapsulates the user name, window, and traffic into UserViewCount for output.
Private static class WindowResultFunction implements WindowFunction {@ Override public void apply (Tuple key, TimeWindow window, Iterable input, Collector out) throws Exception {Long count = input.iterator () .next (); out.collect (new UserViewCount ((Tuple1) key) .f0, window.getEnd (), count));} @ Data @ NoArgsConstructor @ AllArgsConstructor @ ToString public static class UserViewCount {private String userName; private long windowEnd; private long viewCount;}
TopN calculates the most active users
In order to count the active users under each window, we need to group by window again, where the keyBy () operation is performed according to the windowEnd in UserViewCount. Then use ProcessFunction to implement a custom TopN function TopNHotItems to calculate the top three users in the number of clicks, and format the ranking results into strings for subsequent output.
.keyby ("windowEnd") .process (new TopNHotUsers (3)) .print ()
ProcessFunction is a low-level API provided by Flink to implement more advanced functions. It mainly provides the function of timer timer (supports EventTime or ProcessingTime). In this case, we will use timer to determine when the access data of all users under a window has been collected. Because the progress of Watermark is global, in the processElement method, whenever we receive a piece of data (ItemViewCount), we register a timer for windowEnd+1 (the Flink framework automatically ignores repeated registrations at the same time). When the timer of windowEnd+1 is triggered, it means that the Watermark of windowEnd+1 is received, that is, all the user window statistics under the windowEnd are collected. We sort all the items and clicks collected in onTimer (), select TopN, and format the ranking information into a string for output.
Here we also use ListState to store each UserViewCount message received to ensure that the state data is not lost and consistent in the event of a failure. ListState is a State API similar to Java List interface provided by Flink. It integrates the checkpoint mechanism of the framework and automatically achieves the semantic guarantee of exactly-once.
Private static class TopNHotUsers extends KeyedProcessFunction {private int topSize; private ListState userViewCountListState; public TopNHotUsers (int topSize) {this.topSize = topSize;} @ Override public void onTimer (long timestamp, OnTimerContext ctx, Collector out) throws Exception {super.onTimer (timestamp, ctx, out); List userViewCounts = new ArrayList (); for (UserViewCount userViewCount: userViewCountListState.get ()) {userViewCounts.add (userViewCount) } userViewCountListState.clear (); userViewCounts.sort (new Comparator () {@ Override public int compare (UserViewCount o1, UserViewCount O2) {return (int) (o2.viewCount-o1.viewCount);}}) / / format the ranking information into String to make it easy to print StringBuilder result = new StringBuilder (); result.append ("= =\ n"); result.append ("time:") .append (new Timestamp (timestamp-1)). Append ("\ n"); for (int I = 0; I < topSize; iTunes +) {UserViewCount currentItem = userViewCounts.get (I) / / No1: merchandise ID=12224 views = 2413 result.append ("No") .append (I) .append (":") .append ("user name =") .append (currentItem.userName) .append ("views =") .append (currentItem.viewCount) .append ("\ n") } result.append ("= =\ n\ n"); Thread.sleep (1000); out.collect (result.toString ());} @ Override public void open (org.apache.flink.configuration.Configuration parameters) throws Exception {super.open (parameters) ListStateDescriptor userViewCountListStateDescriptor = new ListStateDescriptor ("user-state", UserViewCount.class); userViewCountListState = getRuntimeContext (). GetListState (userViewCountListStateDescriptor);} @ Override public void processElement (UserViewCount value, Context ctx, Collector out) throws Exception {userViewCountListState.add (value); ctx.timerService () .registerEventTimeTimer (value.windowEnd + 1000);}}
Result output
As you can see, the output data is updated every 5 seconds.
On how to Flink real-time calculation of topN to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.