Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use custom aggregate function to count website TP index in flink

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains the "flink how to use custom aggregate function statistics website TP indicators", the article explains the content is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in-depth, together to study and learn "how to use custom aggregate function statistics website TP indicators in flink" bar!

Background

In website performance testing, we often choose TP50, TP95 or TP99 as performance indicators. Let's talk about the meaning of these metrics and how to do real-time statistics in flink:

TP50,top percent 50, that is, 50% of the data meet a certain condition

TP95,top percent 95, that is, 95% of the data meet a certain condition

TP99,top percent 99, that is, 99% of the data meets a certain condition

Let's take an example. We want to count the TP90 of the response time of a website within one minute. The normal processing logic is to sort the response time of all websites within one minute from small to large, and then calculate the total number of count, and then calculate the response time of 90% (count*0.9), which is the value we want.

Custom aggregate function

This requirement is obviously a case of using aggregate functions. Flink provides a large number of aggregate functions, such as count,max,min, etc., but this requirement cannot be met, so we need to customize an aggregate function to implement our requirements.

Some time ago, we talked about the aggregation operator of flink. For more information, please refer to: flink practice-talk about the aggregation operator in flink. Aggregation operator is used to achieve an aggregation function when we write code. The aggregation function is actually similar to the aggregation operator, except that the aggregation function is used when writing sql.

Custom aggregate functions need to inherit the abstract class org.apache.flink.table.functions.AggregateFunction. And implement the following methods.

CreateAccumulator (): this method is called once at the beginning of an aggregation operation and is mainly used to construct an Accumulator for temporary objects stored in the aggregation process.

The accumulate () method is called every time a piece of data comes in, and we implement the specific logic of our aggregate function in this method.

The getValue () method processes the intermediate result after the aggregation is finished, and then returns the result. The final result data in sql is this value.

Example explanation

For TP indicators, the normal way of thinking is to create a temporary variable with a list in it. Every time a data comes, it is put into the list. In the getValue method, sort and take the corresponding TP value.

But there is a problem with this way of thinking, that is, if there is too much data within the time frame to be aggregated. It will store a large amount of data in list, which will cause the checkpoint to be too large and take too long, and finally cause the program to fail. Can't get the right results.

So we need to change the way of thinking, since the last thing we want is an ordered list, can we optimize the list structure and use Treemap to store it? the key of map is the indicator, such as response time. Value is the number of times the corresponding indicator appears. In this way, in the getValue method, we only need to accumulate the value values of map to get the total count, and then calculate the position position of the corresponding tp value. Finally, we accumulate the value of map from scratch until the accumulation result is greater than the corresponding position position, then the key of map is obtained.

The example is as follows: let's first build a source, just randomly generate a variable, the corresponding time response_time of the site.

String sql = "CREATE TABLE source (\ n" +

"response_time INT,\ n" +

"ts AS localtimestamp,\ n" +

"WATERMARK FOR ts AS ts," +

"proctime as proctime ()\ n" +

") WITH (\ n" +

"'connector' =' datagen',\ n" +

"'rows-per-second'='1000',\ n" +

"'fields.response_time.min'='1',\ n" +

"'fields.response_time.max'='1000'" +

")"

Define a temporary variable for the aggregate function:

Public static class TPAccum {

Public Integer tp

Public Map map = new HashMap ()

}

Implement a custom aggregate function class

Public static class TP extends AggregateFunction {

@ Override

Public TPAccum createAccumulator () {

Return new TPAccum ()

}

@ Override

Public Integer getValue (TPAccum acc) {

If (acc.map.size () = = 0) {

Return null

} else {

Map map = new TreeMap (acc.map)

Int sum = map.values () .stream () .reduce (0, Integer::sum)

Int tp = acc.tp

Int responseTime = 0

Int p = 0

Double d = sum * (tp / 100D)

For (Map.Entry entry: map.entrySet ()) {

P + = entry.getValue ()

Int position = d.intValue ()-1

If (p > = position) {

ResponseTime = entry.getKey ()

Break

}

}

Return responseTime

}

}

Public void accumulate (TPAccum acc, Integer iValue, Integer tp) {

Acc.tp = tp

If (acc.map.containsKey (iValue)) {

Acc.map.put (iValue, acc.map.get (iValue) + 1)

} else {

Acc.map.put (iValue, 1)

}

}

}

The actual query sql is as follows:

String sqlSelect =

"select TUMBLE_START (proctime,INTERVAL'1' SECOND) as starttime,mytp (response_time,50) from source" +

"group by TUMBLE (proctime,INTERVAL'1' SECOND)"

Thank you for your reading, the above is the content of "how to use custom aggregate function statistics website TP indicators in flink". After the study of this article, I believe you have a deeper understanding of how to use custom aggregate function statistics website TP indicators in flink, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report