Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the aggregation operator in flink

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "what is the aggregation operator in flink". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the aggregation operator in flink".

Preface

Org.apache.flink.api.common.functions.AggregateFunction, an interface in flink, this class can be followed by the window stream to do statistical calculations in the window.

Note: in addition to this interface AggregateFunction,flink, there is also an abstract class AggregateFunction:org.apache.flink.table.functions.AggregateFunction, we should not confuse this, interface AggregateFunction can be understood as an operator in flink, which is at the same level as MapFunction, FlatMapFunction, etc., while the abstract class AggregateFunction is used for user-defined aggregate functions, and functions such as max and min are at the same level.

Principle analysis

For example, we want to implement a function similar to sql:

Select TUMBLE_START (proctime,INTERVAL'2' SECOND) as starttime,user,count (*) from logs group by user,TUMBLE (proctime,INTERVAL'2' SECOND)

This sql is to count the number of times each person appears in the sliding window every two seconds. Today we will take the function of this simple sql as an example to explain the aggregate operator of flink. In fact, we use the program to achieve the function of this sql.

First, take a look at the interface of the aggregate function:

@ PublicEvolving

Public interface AggregateFunction extends Function, Serializable {

ACC createAccumulator ()

ACC add (IN value, ACC accumulator)

ACC merge (ACC a, ACC b)

OUT getResult (ACC accumulator)

}

There are four methods in this interface AggregateFunction, which we will explain respectively.

AggregateFunction is a generic class with three parameters, IN, ACC, and OUT. IN is the input type of the aggregate function, ACC is the type that stores the intermediate results, and OUT is the output type of the aggregate function. CreateAccumulator

The first step of this method is to create an accumulator and do some initialization work, such as giving the accumulator an initial value if we want to perform count counting operations. Add

The add method is the core logic when we want to do aggregation. For example, when we do count accumulation, we actually take a number, and then add one.

Similar to the logic of sql above, when we write business logic, we can think that the data entering this method belongs to a certain user, and the system will group hash before calling this method, and then different users will call this method repeatedly. So the input parameter of this function is IN type, and the return value is ACC type merge.

Because flink is a distributed computing framework, computing may be carried out on many nodes at the same time. For example, in the add operation mentioned above, the same user may call the add method on different nodes to aggregate the local data on the local node, but what we want is the whole result. At the whole time, we need to merge the aggregate result on each user on each node. This is what the entire merge method does, so its input and output parameters are of the type of intermediate result type ACC. GetResult

This method is to process the final aggregate result of each user and return it according to the type of OUT, which is the output of the aggregate function. An example to explain how to customize source

First, let's customize the source to generate the user's information.

Public static class MySource implements SourceFunction {

Private volatile boolean isRunning = true

String userids [] = {

"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5"

"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b"

Aabbaa50-72f4-495c-b3a1-70383ee9d6a4, 3218bbb9-5874-4d37-a82d-3e35e52d1702

"3ebfb9602ac07779 | | 3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c"

"e7e896cd939685d7 | | e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"

}

@ Override

Public void run (SourceContext ctx) throws Exception {

While (isRunning) {

Thread.sleep (10)

String userid = userids [(int) (Math.random () * (userids.length-1))]

Ctx.collect (Tuple2.of (userid, System.currentTimeMillis ()

}

}

@ Override

Public void cancel () {

IsRunning = false

}

}

Custom aggregate function

Public static class CountAggregate

Implements AggregateFunction {

@ Override

Public Integer createAccumulator () {

Return 0

}

@ Override

Public Integer add (Tuple2 value, Integer accumulator) {

Return + + accumulator

}

@ Override

Public Integer getResult (Integer accumulator) {

Return accumulator

}

@ Override

Public Integer merge (Integer a, Integer b) {

Return a + b

}

}

Custom result output function

/ * *

* this is to output the aggregate result

, /

Public static class WindowResult

Implements WindowFunction {

@ Override

Public void apply (

Tuple key

TimeWindow window

Iterable input

Collector out) throws Exception {

String k = ((Tuple1) key) .f0

Long windowStart = window.getStart ()

Int result = input.iterator () .next ()

Out.collect (Tuple3.of (k, new Date (windowStart), result))

}

}

Main process

Final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ()

DataStream dataStream = env.addSource (new MySource ())

DataStream.keyBy (0) .window (TumblingProcessingTimeWindows.of (Time.seconds (2)

New CountAggregate (), new WindowResult ()

). Print ()

Env.execute ()

Thank you for your reading, these are the contents of "what is the aggregation operator in flink". After the study of this article, I believe you have a deeper understanding of what the aggregation operator in flink is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report