Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the KeyedProcessFunction class in the Flink handler

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to understand the Flink processing function in the KeyedProcessFunction class, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can get something.

KeyedProcessFunction to learn about today and some of the features brought by this class

About KeyedProcessFunction

By comparing the class diagram, it can be determined that there is no direct relationship between KeyedProcessFunction and ProcessFunction: KeyedProcessFunction is used to deal with the data collection of KeyedStream, and KeyedProcessFunction has more features than the ProcessFunction class. The official document has the following red box, and the state handling and timer functions are unique to KeyedProcessFunction: after the introduction, let's learn through an example.

Version information

Development environment operating system: MacBook Pro 13 inch, macOS Catalina 10.15.3

Development tools: IDEA ULTIMATE 2018.3

JDK:1.8.0_211

Maven:3.6.0

Flink:1.9.2

Source code download

If you don't want to write code, the source code for the entire series can be downloaded from GitHub. The address and link information are shown in the following table (https://github.com/zq2599/blog_demos):

Name Link Note Project Home Page https://github.com/zq2599/blog_demos the home page of the project on GitHub git warehouse address (https) https://github.com/zq2599/blog_demos.git the warehouse address of the project source code, https protocol git warehouse address (ssh) git@github.com:zq2599/blog_demos.git the warehouse address of the project source code, ssh protocol

There are multiple folders in this git project. The application of this chapter is under the flinkstudy folder, as shown in the red box below:

Brief introduction of actual combat

The goal of this actual combat is to learn KeyedProcessFunction, the contents are as follows:

Listen to the native port 9999 to get the string

Separate each string with a space and turn it into an instance of Tuple2. F0 is the separated word, and F1 equals 1.

The above Tuple2 instance is partitioned with f0 field to get KeyedStream

KeyedSteam moves to custom KeyedProcessFunction processing

The function of custom KeyedProcessFunction is to record the last occurrence time of each word, and then build a timer of ten seconds. If you find that the word does not appear again ten seconds later, send the word and the total number of times it appears to the downstream operator.

Coding

Continue to use the engineering flinkstudy created in the article "Flink processing functions II: ProcessFunction classes"

Create the bean class CountWithTimestamp, which contains three fields, which are directly set to public for convenience:

Package com.bolingcavalry.keyedprocessfunction;public class CountWithTimestamp {public String key; public long count; public long lastModified;}

Create the implementation class Splitter of FlatMapFunction, which is used to generate multiple Tuple2 instances after dividing the string. F0 is a separated word, and F1 equals 1:

Package com.bolingcavalry;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;import org.apache.flink.util.StringUtils;public class Splitter implements FlatMapFunction {@ Override public void flatMap (String s, Collector collector) throws Exception {if (StringUtils.isNullOrWhitespaceOnly (s)) {System.out.println ("invalid line"); return } for (String word: s.split ("")) {collector.collect (new Tuple2 (word, 1));}

Finally, the main body of the whole logic function: ProcessTime.java, in which there is a custom KeyedProcessFunction subclass and the main method of the program entry. After the code is listed below, the key parts will be introduced:

Package com.bolingcavalry.keyedprocessfunction;import com.bolingcavalry.Splitter;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment Import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;import java.util.Date / * @ author will * @ email zq2599@gmail.com * @ date 2020-05-17 13:43 * @ description experience a subclass of the KeyedProcessFunction class (time type is processing time) * / public class ProcessTime {/ * KeyedProcessFunction to record the latest occurrence time of each word to backend and create a timer. * when the timer is triggered, check whether the word has reached 10 seconds since it last appeared. If so, send it to the downstream operator * / static class CountWithTimeoutFunction extends KeyedProcessFunction {/ / Custom status private ValueState state @ Override public void open (Configuration parameters) throws Exception {/ / initialization status, name is myState state = getRuntimeContext () .getState (new ValueStateDescriptor ("myState", CountWithTimestamp.class)) } @ Override public void processElement (Tuple2 value, Context ctx, Collector out) throws Exception {/ / get the current word Tuple currentKey = ctx.getCurrentKey (); / / get the myState status of the current word CountWithTimestamp current = state.value () from backend / / if myState has never been assigned, initialize if (current = = null) {current = new CountWithTimestamp (); current.key = value.f0;} / / add a current.count++ to the number of words / / take the timestamp of the current element as the last occurrence time of the word current.lastModified = ctx.timestamp (); / / re-save to backend, including the number of times the word appears, and the last occurrence time state.update (current) / / create a timer for the current word, and trigger long timer = current.lastModified + 10000; ctx.timerService () .registerProcessingTimeTimer (timer) ten seconds later / / print all information Used to check data correctness System.out.println (String.format ("process,% s,% d, lastModified:% d (% s), timer:% d (% s)\ n\ n", currentKey.getField (0), current.count, current.lastModified, time (current.lastModified) Timer, time (timer) The timestamp represents the trigger time of the timer * @ param ctx * @ param out * @ throws Exception * / @ Override public void onTimer (long timestamp, OnTimerContext ctx). Collector out) throws Exception {/ / get the current word Tuple currentKey = ctx.getCurrentKey () / / get the myState status of the word CountWithTimestamp result = state.value (); / / whether the flag boolean isTimeout = false of the current element has not appeared for 10 consecutive seconds / / timestamp is the timer trigger time. If it is equal to the last update time + 10 seconds, it means that the word has been received in these ten seconds. / / this element that does not appear for ten consecutive seconds Sent to downstream operator if (timestamp = = result.lastModified + 10000) {/ / send out.collect (new Tuple2 (result.key, result.count)) IsTimeout = true } / / print data Used to check if the expected System.out.println (String.format ("ontimer,% s,% d, lastModified:% d (% s), stamp:% d (% s), isTimeout:% s\ n\ n", currentKey.getField (0), result.count, result.lastModified, time (result.lastModified) Timestamp, time (timestamp), String.valueOf (isTimeout) } public static void main (String [] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / parallelism 1 env.setParallelism (1); / / processing time env.setStreamTimeCharacteristic (TimeCharacteristic.ProcessingTime); / / listen on local port 9999 and read the string DataStream socketDataStream = env.socketTextStream ("localhost", 9999) / / if all the words entered do not appear again for more than 10 seconds, you can get DataStream timeOutWord = socketDataStream / / A pair of received strings can be divided by spaces to get multiple words. FlatMap (new Splitter ()) / / sets the timestamp allocator Use the current time as the timestamp. AssignTimestamp AndWatermarks (new AssignerWithPeriodicWatermarks () {@ Override public long extractTimestamp (Tuple2 element, long previousElementTimestamp) {/ / use the current system time as the timestamp return System.currentTimeMillis () } @ Override public Watermark getCurrentWatermark () {/ / this example does not require watermark and returns null return null }}) / / the word is partitioned as key. Keyby (0) / / the data partitioned by word is handed over to the custom KeyedProcessFunction for processing. Process (new CountWithTimeoutFunction ()) / / if all the words entered do not appear again for more than 10 seconds, timeOutWord.print (); env.execute ("ProcessFunction demo: KeyedProcessFunction");} public static String time (long timeStamp) {return new SimpleDateFormat ("yyyy-MM-dd hh:mm:ss") .format (new Date (timeStamp));}}

There are several key points to focus on in the above code:

When you set a timestamp through assignTimestampsAndWatermarks, getCurrentWatermark returns null because watermark is not needed

In the processElement method, state.value () can get the status of the current word, and state.update (current) can set the status of the current word. For more information on this function, please see "learn more about the status Operation of ProcessFunction (Flink-1.10)".

The registerProcessingTimeTimer method sets the trigger time of the timer. Note that the timer here is based on processTime, which is different from eventTime in the official demo.

After the timer is triggered, the onTimer method is executed, which contains all the information about the timer, especially the input parameter timestamp, which is the trigger time of the timer originally set.

Verification

Execute the command nc-l 9999 from the console so that you can send a string from the console to port 9999 of the machine

Execute the main method of the ProcessTime class directly on IDEA, and the program starts listening on port 9999 of the machine.

Enter aaa in the previous console, and then press enter. After waiting for 10 seconds, the console of IEDA outputs the following information. The result shows that it meets the expectations:

Continue to enter aaa and enter again, twice in a row, with an interval of no more than 10 seconds. As shown in the figure below, each Tuple2 element has a timer, but the timer of the second input aaa is updated by the third input operation before departure. So the comparison operation in the timer of the second input aaa shows that it is less than 10 seconds since the last (that is, the third) occurrence of the aaa, so the second element will not be emitted to the downstream operator:

All the timeout information received by the downstream operator will be printed out, as shown in the red box below, only the number of records equal to 1 and 3 is printed, which is equal to 2 because the aaa is re-entered in 10 seconds, so there is no timeout reception and will not be printed downstream: at this point, the learning of the KeyedProcessFunction processing function is completed, and its state read and write and timer operations are very practical.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report