Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Apache Flink CEP

2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly shows you the "sample analysis of Apache Flink CEP", which is easy to understand and well-organized. I hope it can help you solve your doubts. Let the editor lead you to study and learn the article "sample Analysis of Apache Flink CEP".

1. Flink CEP concept and usage scenario 1. What is CEP?

CEP means complex event processing, such as getting up-> washing-- > eating-- > going to work, and so on. The pattern formed by a series of cascading events is called CEP. If you find that you don't brush your teeth and wash your face after getting up or go to work directly after eating, you can match this abnormal flow of events for analysis to see if you got up late today.

Several examples are listed in the following figure:

The first is an example of abnormal behavior detection: suppose that in a vehicle maintenance scenario, when a car breaks down, the car will be sent to the repair point for repair and then put back into the market. If the car is reported again after it has been put on the market before it has been used, then it is possible that the previous maintenance is invalid.

The second is the example of strategic marketing: suppose that in the scenario of taxi hailing, the user plans an order for an itinerary on APP. If the itinerary is not picked up by the driver for more than a certain period of time after the order is issued, then the order needs to be exported downstream to make relevant policy adjustments.

The third is an example of OPS monitoring: usually, OPS will monitor the server's CPU, network IO and other indicators to generate corresponding alarms when they exceed the threshold. However, in practical use, the restart of background services and network jitter will cause instantaneous traffic burrs. For non-critical links, these burrs can be ignored and only frequent anomalies can be alerted to reduce false positives.

2.Flink CEP application scenario

Risk control: real-time detection of user abnormal behavior patterns, when a user occurs behavior that should not have occurred, to determine whether the user is suspected of illegal operation.

Strategic marketing: real-time tracking of users' behavior trajectory with pre-defined rules, and real-time promotion of corresponding strategies for users whose behavior tracks match predefined rules.

Operation and maintenance monitoring: flexibly configure multi-indicators and multi-dependencies to achieve a more complex monitoring mode.

3.Flink CEP principle

The interior of Flink CEP is realized by NFA (nondeterministic finite automaton). A state graph composed of points and edges takes an initial state as the starting point and reaches the final state through a series of intermediate states. The points are divided into three types: the initial state, the intermediate state and the final state, and the edges are divided into take, ignore and proceed.

Take: there must be a conditional judgment. When the incoming message satisfies the take edge condition, the message is put into the result set and the state is transferred to the next state.

Ignore: when the message arrives, you can ignore the message and keep the state spin unchanged, which is a transition from yourself to your own state.

Proceed: also known as empty state transition, the current state can be transferred directly to the next state without relying on the arrival of the message. For example, when a user buys a product, if there is a consulting customer service behavior before the purchase, you need to put both the consulting customer service behavior and the purchase behavior into the result set to output downstream; if there is no consulting customer service behavior before the purchase, just put the purchase behavior into the result set and output it downstream. In other words, if there is a consulting customer service behavior, there is a message preservation on the consulting customer service status. If there is no consulting customer service behavior, there is no message preservation on the consulting customer service status. The consulting customer service status is connected by a proceed side and the downstream purchase status.

Here is an example of taking a taxi to show how the state flows. The rules are shown in the following figure.

Starting with the passenger setting the itinerary, matching the passenger's order event, if the order timeout has not been picked up by the driver, the trip event and the order event are output downstream as the result set.

If the order of arrival of the message is: itinerary-- > other-- > place an order-- > other.

The state flow is as follows:

(1) at the beginning, the state is in the travel state, that is, waiting for the user to make the itinerary.

(2) when the itinerary event is received, match the conditions of the itinerary state, put the itinerary event into the result set, and transfer the state down to the order state through the take edge.

(3) because there is an ignore edge on the order placement status, other events received can be ignored until the order event is matched, placed in the result set, and the current state is transferred down to the timeout missed state. At this time, there are two events in the result set: the schedule event and the order event.

(4) when the timeout does not receive an order, if there are some other events, it can also be ignored by the ignore side until the timeout event is triggered, and the state is transferred down to the final state. At this time, the whole pattern is matched successfully, and finally the itinerary events and order events in the result set are output to the downstream.

The above is an example of a successful match. What if it is an unsuccessful example?

If an order acceptance event is received when the state is in the timeout missed state, the trigger condition of the timeout missed order is not met, and the whole pattern matching fails. The itinerary event and order event that were previously placed in the result set will be cleaned up.

When the itinerary event is received, the conditions of the itinerary state are matched, the itinerary event is put into the result set, and the state is transferred down to the order state through the take edge.

2. Flink CEP program development

This section describes the program structure of Flink CEP and API in detail.

1.Flink CEP program structure

It is mainly divided into two parts: defining event patterns and processing matching results.

Official examples are as follows:

DataStream input =... Pattern pattern = Pattern.begin ("start") .where (new SimpleCondition () {@ Override public boolean filter (Event event) {return event.getId () = = 42 }}) .next ("middle") .subtype (SubEvent.class) .where (new SimpleCondition () {@ Override public boolean filter (SubEvent subEvent) {return subEvent.getVolume () > = 10.0 }) .followedBy ("end") .where (new SimpleCondition () {@ Override public boolean filter (Event event) {return event.getName () .equals ("end");}}); PatternStream patternStream = CEP.pattern (input, pattern) DataStream result = patternStream.select (new PatternProcessFunction () {@ Override public void select (Map pattern, Context ctx, Collector out) throws Exception {out.collect (createAlertFrom (pattern));}})

The program structure is divided into three parts: first, you need to define a pattern (Pattern), as shown in line 2, then bind the defined pattern to the DataStream (line 25), and finally process the matching results on the CEP-capable DataStream (line 27). The following is a detailed explanation of the key parts:

Define the pattern: in the above example, there are three steps, first matching an event with an ID of 42, then matching an event with a volume greater than or equal to 10, and finally waiting for an event with a name equal to end.

Output of matching results: in this section, you need to pay attention to the pattern parameter of type Map in the select function (line 30, note: this article is based on Flink version 1.7). Key is a name of pattern, and its value is the Begin node start in the schema definition, or the middle in the next next, or the end in the third step. The value in the following map is the matching event that occurs at each step. Because the loop property can be used in each step and the match occurs multiple times, the value in map is a collection that matches all events that occur multiple times.

2.Flink CEP composition

In the image above, the blue box represents individual patterns; the light yellow ellipse represents the attributes that can be added to the pattern, including the number of cycles that can occur, or whether the pattern is greedy or optional; the orange ellipse represents the relationship between patterns and defines how multiple patterns are linked together. By defining patterns, adding corresponding attributes, and concatenating multiple patterns in three steps, a complete Flink CEP program can be formed.

2.1 define the pattern

Here is the sample code:

Pattern.next ("start") .where (new SimpleCondition () {@ Override public boolean filter (Event event) {return event.getId () = = 42;}})

The definition pattern is mainly composed of the following five parts:

Pattern: the previous mode

Next/followedBy/...: starts a new model

Start: schema name

Where: the content of the schema

Filter: core processing logic

2.2 attributes of the pattern

Next, let's show you how to set the properties of the mode. The attributes of the pattern are mainly divided into loop attributes and optional attributes.

The loop attribute defines that pattern matching occurs a fixed number of times (times), more than once (oneOrMore), and more than multiple times (timesOrMore).

The optional attribute can set the mode to be greedy (greedy), that is, to match the longest string, or to set it to optional (optional), either match or ignore.

2.3 period of validity of the model

Because the matching events of the pattern are managed in the state, you need to set a global expiration date (within). If you do not specify a validity period, the matching event will always be saved in the state and will not be cleared. As for how big the validity period can be, it should be measured according to the specific usage scenarios and the amount of data, and the key depends on the number of matching events. As the number of matching events increases, the matching events before traversing the newly arrived messages will increase the consumption of CPU and memory, and as the status becomes larger, the data skew will become more and more serious.

2.4 connections between patterns

There are three main types: strict continuity (next/notNext), loose continuity (followedBy/notFollowedBy), and uncertain loose continuity (followedByAny).

The differences in the three pattern matches are shown in the following table:

Mode & data flow strict continuity loose continuity uncertain loose continuity

Pattern (A B) Streaming ('axiomagh,' cedar,'b1,', 'b2')

Mismatch

Matching output: aQuery b1

Matching output: a _

The summary is as follows:

Strict continuity: messages need to arrive in exactly the same order as the pattern.

Loose continuity: allows mismatched events to be ignored.

Non-deterministic loose connectedness: you can ignore not only mismatched events, but also matched events.

2.5 Multi-mode combination

In addition to the previously mentioned schema definition and the relationship between patterns, you can also combine multiple connected patterns together as a pattern group, similar to a view, on which you can perform related operations.

In the example above, we first match a login event, and then match browsing, placing an order, and buying a user whose three events occur repeatedly three times.

If there is no pattern group, browse in the code, place an order, and write the purchase three times. With a pattern group, you just need to browse, place an order, and buy these three events as a pattern group, and add times (3) to the corresponding attributes.

2.6 processing result

There are four main interfaces for processing matching results: PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction and PatternTimeoutFunction.

As you can see from the name, the output can be divided into two categories: select and flatSelect specify whether to output one or more, and timeoutFunction and Function without timeout specify whether timeout events can be bypassed.

The following figure is a comprehensive sample code of the output:

2.7 State storage optimization

When an event arrives, how is the event saved if it conforms to multiple output result sets at the same time?

Flink CEP shares the same event copy in multiple result sets by Dewey counting to realize the resource sharing of the event copy.

3. Extension of Flink CEP

This chapter mainly introduces some extensions of Flink CEP, and describes how to achieve the accurate management of the timeout mechanism, as well as the dynamic loading and updating of rules.

1. Extension of timeout trigger mechanism

The function of timeout trigger in native Flink CEP can be implemented through the combination of within+outputtag, but there are problems in complex scenarios. As shown in the following figure, there is an advance payment event after the order event is issued. How to get an order that has been issued and has not been received after the time-out payment?

With reference to the practice of not being received after placing an order, the rule of issuing an order and not being received after advance payment is expressed as issuing an order .followedBy (prepayment). FollowedBy (receiving order). Will there be a problem with this implementation?

There will be dirty data in the calculation results of this approach, because this rule not only matches orders that are placed and timed out after advance payment (the desired result), it also matches orders that are not received only after the order behavior is placed (dirty data, no advance payment). The reason is that because the timeout within is controlled on the entire rule, not on a state node, no matter which state node the current state is in, it will be bypassed out after the timeout.

Then we need to consider whether we can directly control the state transfer through time, rather than saving the country through the curve of rule timeout. Therefore, in addition to the transition of the state triggered by the message, the transition support of the state triggered by time needs to be increased. To achieve this function, you need to add the concept of time attribute to the original state and state transition. As shown in the following figure, the waiting state is obtained by the wait operator, and then a ten-second time property is set on the waiting state to define a ten-second time window.

The wait operator corresponds to the ignore state in NFA and will spin when the end time of the time window is not reached, record the start time of the wait in ComputationState, process the incoming data with the waiting state in the doProcess of NFA, and transfer the state if the end time of the waiting is reached.

Two ignore edges are set for the waiting status in the red box in the image above:

WaitingStatus.addIgnore (lastSink,waitingCondition), the logic in waitingCondition is to get the current time (support event time), determine whether the set waiting threshold has been exceeded, and if so, move the state back.

WaitingStatus.addIgnore (waitingCondition), if the set waiting threshold is not reached in waitingCondition, the spin will remain unchanged in the current waiting state.

two。 Rule dynamic injection

Rule changes are often encountered in CEP running online, and it would be very inelegant to restart and redistribute the task each time it changes. Especially in real-time scenarios such as marketing or risk control, if the rule window is too long (one or two weeks) and the state is too large, the restart time will be lengthened. During this period, some abnormal behaviors that you want to deal with can not be found in time.

So how to update and load rules dynamically?

Combing through the overall architecture, Flink CEP runs in Flink Job, while the rule base is placed in external storage. First of all, you need to find changes in the rules in the external storage in time in the running Job, that is, you need to provide the ability to access the external library in the Job. Secondly, the rules changed in the rule base need to be dynamically loaded into CEP, that is, the description of external rules should be parsed into pattern structures that Flink CEP can recognize. Finally, convert the generated pattern to NFA, replacing the historical NFA, so that the new rules are used to match the new incoming messages.

The following figure is an interface that supports dynamic injection and update of external rules.

Four main methods are implemented in this interface:

Initialize: initialization method for initializing external library connections.

Inject: the main way to interact with external databases, listen for changes in external libraries, get the latest rules and dynamically load them through Groovy, and return pattern.

GetPeriod: set the cycle of round-robin inspection. In some simple scenarios with low real-time requirements, you can use the way of round-robin patrol to check the external database regularly.

GetNfaKeySelector: independent of dynamic updates, it is used to support multiple rule groups for one stream.

3. Clean up the results of historical matching

The new rules are dynamically loaded into the Job of Flink CEP, and after replacing the original NFA, you also need to clean up the result set of historical matches. Refresh NFA is implemented in AbstractKeyedCEPPatternOperator. Note whether the historical status needs to be cleaned up and related to the business:

The modified logic has no effect on the matching of events in the rule and retains the state of the historical result set.

The modified logic affects the previously matched part, and the state data in the previously matched result set needs to be cleared to prevent incorrect output.

The above is all the content of this article "sample Analysis of Apache Flink CEP". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report