Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Flink Zero Foundation practical course: how to calculate Real-time Hot items

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

In the previous introductory tutorial, we were able to quickly build a basic Flink program. This article will take you step by step to implement a more complex Flink application: real-time hot merchandise. We recommend that you practice the previous article before starting this article, because this article will follow the my-flink-project project framework above.

Through this article you will learn:

How to process based on EventTime, how to specify how Watermark uses Flink flexible Window API when you need to use State, and how to use ProcessFunction to implement TopN functions

For the demand for "real-time hot items", we can translate "real-time hot items" into something better understood by programmers: output the top N items with the most hits in the last hour every 5 minutes. To break down this requirement, we probably have to do a few things:

Extract the business timestamp, tell the Flink framework to filter the click behavior data according to the window size of one hour based on the business time, count every 5 minutes, do sliding window aggregation (Sliding Window), aggregate by each window, and output the commodity data of the top N clicks in each window.

Here we have prepared a Taobao user behavior data set (from Ariyun Tianchi public data set, especially thank you). This data set contains all the behaviors of random 1 million users on Taobao one day (including clicks, purchases, additional purchases, collections). The organization of the dataset is similar to MovieLens-20M, in that each row of the dataset represents a user behavior, consisting of user ID, commodity ID, commodity category ID, behavior type, and timestamp, separated by commas. A detailed description of each column in the dataset is as follows:

The column name describes the user ID integer type, encrypted user ID commodity ID integer type, encrypted commodity ID commodity category ID integer type, encrypted commodity category ID behavior type string, enumeration type, including ('pv',' buy', 'cart',' fav') timestamp of the occurrence of the timestamp behavior (in seconds)

You can download the dataset to the project's resources directory with the following command:

$cd my-flink-project/src/main/resources$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv

It doesn't matter whether you use the curl command to download the data here, you can also use the wget command or directly access the link to download the data. The key is to save the data file to the project's resources directory for easy access by the application.

Write a program

Create a HotItems.java file under src/main/java/myflink:

Package myflink;public class HotItems {public static void main (String [] args) throws Exception {}}

As above, we will fill it with code step by step. The first step is still to create a StreamExecutionEnvironment, which we add to the main function.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / in order that the results printed to the console are not out of order, we configure the global concurrency to be 1, where changing concurrency does not affect the correctness of the results env.setParallelism (1); create a simulation data source

In the data preparation section, we have downloaded the test dataset locally. Since it is a csv file, we will use CsvInputFormat to create a mock data source.

Note: although a streaming application should be a running program, it needs to consume an unlimited data source. However, in this case tutorial, in order to avoid the tedious task of building a real data source, we use files to simulate the real data source, which does not affect the knowledge points to be introduced below. This is also a common way to verify the correctness of Flink applications locally.

We first create a POJO class of UserBehavior (all member variables declared as public is the POJO class), strongly typed to facilitate subsequent processing.

/ * * user behavior data structure * * / public static class UserBehavior {public long userId; / / user ID public long itemId; / / Commodity ID public int categoryId; / / Commodity Category ID public String behavior; / / user behavior, including the time stamp of ("pv", "buy", "cart", "fav") public long timest / / behavior occurred (in seconds)

Next we can create a PojoCsvInputFormat, which reads the csv file and converts each line to the specified POJO

Type (in our case, UserBehavior).

The local file path of / / UserBehavior.csv URL fileUrl = HotItems2.class.getClassLoader () .getResource ("UserBehavior.csv"); Path filePath = Path.fromLocalFile (new File (fileUrl.toURI (); / / extract the TypeInformation of UserBehavior, which is a PojoTypeInfoPojoTypeInfo pojoType = (PojoTypeInfo) TypeExtractor.createTypeInfo (UserBehavior.class) / / since the order of fields extracted by Java reflection is uncertain, you need to explicitly specify the order of fields in the file String [] fieldOrder = new String [] {"userId", "itemId", "categoryId", "behavior", "timestamp"}; / / create PojoCsvInputFormatPojoCsvInputFormat csvInput = new PojoCsvInputFormat (filePath, pojoType, fieldOrder)

Next we use PojoCsvInputFormat to create the input source.

DataStream dataSource = env.createInput (csvInput, pojoType)

This creates a DataStream of type UserBehavior.

EventTime and Watermark

When we say "count hits in the past hour", what does "one hour" mean? In Flink, it can mean ProcessingTime or EventTime, as determined by the user.

ProcessingTime: the time the event was processed. That is, it is determined by the system time of the machine. EventTime: the time when the event occurred. It is generally the time carried by the data itself.

In this case, we need to count the number of clicks per hour in business time, so we have to deal with it based on EventTime. So what if we let Flink handle it according to the business time we want? There are two main things to do here.

The first thing is to tell Flink that we are now processing in EventTime mode, and Flink uses ProcessingTime processing by default, so we need to set it explicitly.

Env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime)

The second thing is to specify how to get the business time and generate the Watermark. Watermark is a concept used to track business events, which can be understood as a clock in the EventTime world to indicate when data is currently being processed. Because the data of our data source has been sorted out and is not out of order, that is, the timestamp of the event is monotonously increasing, so the business time of each piece of data can be regarded as Watermark. Here we use AscendingTimestampExtractor to extract timestamps and generate Watermark.

Note: real business scenarios are generally out of order, so BoundedOutOfOrdernessTimestampExtractor is generally used.

DataStream timedData = dataSource.assignTimestampsAndWatermarks (new AscendingTimestampExtractor () {@ Overridepublic long extractAscendingTimestamp (UserBehavior userBehavior) {/ / raw data unit second, convert it to millisecond return userBehavior.timestamp * 1000;}})

In this way, we have a data stream with a time stamp, and we can do some window operations later.

Filter out click events

Before starting the window operation, review the requirements to "output the top N items with the most hits in the past hour every 5 minutes." Because there are click, purchase, purchase and collection data in the original data, but we only need to count the number of clicks, so we first use FilterFunction to filter out the click behavior data.

DataStream pvData = timedData .filter (new FilterFunction () {@ Override public boolean filter (UserBehavior userBehavior) throws Exception {/ / filter out click-only data return userBehavior.behavior.equals ("pv");}}); window statistics

Since the number of clicks on each item in the last hour is counted every 5 minutes, the window size is one hour and slides every 5 minutes. That is, statistics [09:00, 10:00), [09:05, 10:05), [09:10, 10:10] respectively. The number of clicks on goods waiting for the window. Is a common sliding window requirement (Sliding Window).

DataStream windowedData = pvData .keyby ("itemId") .timeWindow (Time.minutes (60), Time.minutes (5)) .timewindow (new CountAgg (), new WindowResultFunction ())

We use .keyby ("itemId") to group items, and use .timeWindow (Time size, Time slide) to slide each item (1 hour window, once every 5 minutes). Then we use .aggregate (AggregateFunction af, WindowFunction wf) to do incremental aggregation operations, which can use AggregateFunction to aggregate data in advance and reduce the storage pressure on state. The last calculation is much more efficient than. Apply (WindowFunction wf) stores all the data in the window. The first parameter of the aggregate () method is used for

The CountAgg here implements the AggregateFunction interface, which is used to count the number of entries in the window, that is, add one when you encounter a piece of data.

/ * * aggregate function implementation of COUNT statistics, add * / public static class CountAgg implements AggregateFunction {@ Override public Long createAccumulator () {return 0L;} @ Override public Long add (UserBehavior userBehavior, Long acc) {return acc + 1;} @ Override public Long getResult (Long acc) {return acc;} @ Override public Long merge (Long acc1, Long acc2) {return acc1 + acc2;}} for each record.

The second parameter WindowFunction of .aggregate (AggregateFunction af, WindowFunction wf) outputs the aggregated result of each key and each window with other information. The WindowResultFunction we implement here encapsulates the primary key merchandise ID, window, and clicks into ItemViewCount for output.

/ * * result for the output window * / public static class WindowResultFunction implements WindowFunction {@ Override public void apply (Tuple key, / / the primary key of the window, that is, itemId TimeWindow window, / / window Iterable aggregateResult, / / the result of the aggregate function, that is, the count value Collector collector / / output type is ItemViewCount) throws Exception {Long itemId = ((Tuple1) key) .f0; Long count = aggregateResult.iterator () .next () Collector.collect (ItemViewCount.of (itemId, window.getEnd (), count);}} / * * merchandise clicks (output type of window operation) * / public static class ItemViewCount {public long itemId; / / merchandise ID public long windowEnd; / / window end timestamp public long viewCount; / / clicks of merchandise public static ItemViewCount of (long itemId, long windowEnd, long viewCount) {ItemViewCount result = new ItemViewCount (); result.itemId = itemId Result.windowEnd = windowEnd; result.viewCount = viewCount; return result;}}

Now we have the data stream of the number of clicks for each item in each window.

TopN calculates the hottest items

In order to count the most popular items under each window, we need to group by window again, where we do the keyBy () operation according to the windowEnd in ItemViewCount. Then use ProcessFunction to implement a custom TopN function TopNHotItems to calculate the top 3 clicks, and format the ranking results into strings to facilitate subsequent output.

DataStream topItems = windowedData .keyby ("windowEnd") .process (new TopNHotItems (3)); / / find the top three products with the number of clicks

ProcessFunction is a low-level API provided by Flink to implement more advanced functions. It mainly provides the function of timer timer (supports EventTime or ProcessingTime). In this case, we will use timer to determine when the click-through data for all items under a certain window have been collected. Because the progress of Watermark is global.

In the processElement method, whenever we receive a piece of data (ItemViewCount), we register a timer for windowEnd+1 (the Flink framework automatically ignores repeated registrations at the same time). When the timer of windowEnd+1 is triggered, it means that the Watermark of windowEnd+1 is received, that is, all the commodity window statistics under the windowEnd are collected. We sort all the items and clicks collected in onTimer (), select TopN, and format the ranking information into a string for output.

Here we also use ListState to store each ItemViewCount message received to ensure that the state data is not lost and consistent in the event of a failure. ListState is a State API similar to Java List interface provided by Flink. It integrates the checkpoint mechanism of the framework and automatically achieves the semantic guarantee of exactly-once.

/ * * find the top N items in a window. Key is the window timestamp, and the output is the result string of TopN * / public static class TopNHotItems extends KeyedProcessFunction {private final int topSize; public TopNHotItems (int topSize) {this.topSize = topSize;} / / used to store the status of goods and clicks. After collecting the data from the same window, TopN is triggered to calculate private ListState itemState. @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); / / Registration of status ListStateDescriptor itemsStateDesc = new ListStateDescriptor ("itemState-state", ItemViewCount.class); itemState = getRuntimeContext (). GetListState (itemsStateDesc);} @ Override public void processElement (ItemViewCount input, Context context, Collector collector) throws Exception {/ / each piece of data is saved to the state itemState.add (input) / / register the EventTimeTimer of windowEnd+1. When triggered, it means that all the product data belonging to the windowEnd window have been collected: context.timerService (). RegisterEventTimeTimer (input.windowEnd + 1);} @ Override public void onTimer (long timestamp, OnTimerContext ctx, Collector out) throws Exception {/ / get the number of clicks received by List allItems = new ArrayList (); for (ItemViewCount item: itemState.get ()) {allItems.add (item) } / / clear the data in the state ahead of time and release the space itemState.clear (); / / sort allItems.sort according to the number of clicks (new Comparator () {@ Override public int compare (ItemViewCount o1, ItemViewCount O2) {return (int) (o2.viewCount-o1.viewCount);}}) / / format the ranking information into String so that it is easy to print StringBuilder result = new StringBuilder (); result.append ("= =\ n"); result.append ("time:") .append (new Timestamp (timestamp-1)). Append ("\ n"); for (int iTunes I)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report