In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains "what is Flink CEP". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is Flink CEP".
What is Flink CEPFlink CEP?
Flink CEP is a complex event processing library based on Flink, which can find complex events from multiple data streams, identify meaningful events (such as opportunities or threats), and respond as soon as possible, rather than waiting for days or months to find problems.
Flink CEP API
At the core of CEP API is the Pattern API, which allows you to quickly define complex event patterns. Each pattern contains multiple phases (stage) or what we might call a state (state). By switching from one state to another, the user can specify conditions that can be applied to adjacent or independent events.
Before introducing API, let's understand a few concepts:
1. Patterns and pattern sequences
Simple patterns are called patterns, and the complex pattern sequences that are finally searched and matched in the data stream are called pattern sequences, and each complex pattern sequence is composed of multiple simple patterns.
A match is a series of input events that can access all patterns of a complex pattern graph through a series of effective pattern transformations.
Each pattern must have a unique name, and we can use the pattern name to identify the events to which the pattern matches.
two。 Single mode
A pattern can be either singleton or circular. Singleton mode accepts a single event, while loop mode can accept multiple events.
3. Example of a pattern:
There are the following modes: a b + centerd
Among them, the letters "arecaly", "b" and "c" represent patterns, "+" means "cycle", and "b +" means "cycle pattern". Representatives are optional, c? Is the optional mode.
So the above pattern means: a can be followed by one or more b, followed by optional c, and finally d.
Among them, a, c, d are singleton mode, and b + is cycle mode.
In general, patterns are singleton patterns, which can be converted to circular patterns using quantifiers (Quantifiers).
Each pattern can have one or more conditions that are defined based on event reception. In other words, each pattern matches and receives events through one or more conditions.
After understanding the above concepts, let's introduce a few CEP API that need to be used in the case:
CEP API used in the case:
Begin: define a starting mode state
Usage: start = Pattern.begin ("start")
Next: appends a new mode state. The matching event must directly follow the previous matching event
Usage: next = start.next ("next")
Where: defines the filter criteria for the current mode state. The event can match the state only if it passes through the filter
Usage: patternState.where (_ .message = = "TMD")
Within: defines the maximum time interval between a sequence of events and a pattern. If the unfinished sequence of events exceeds this time, it will be discarded
Usage: patternState.within (Time.seconds 10)
Times: a given type of event occurs a specified number of times
Usage: patternState.times (5)
API first introduces the above, and then we will address the cases mentioned at the beginning of the article:
A case of monitoring user's on-screen comment behavior
Case 1: monitoring malicious users
Rule: if a user enters TMD more than 5 times within 10 seconds, the user will be considered as a malicious attack and the user will be identified.
Use Flink CEP to detect malicious users:
Import org.apache.flink.api.scala._ import org.apache.flink.cep.PatternSelectFunction import org.apache.flink.cep.scala. {CEP, PatternStream} import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala. {DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object BarrageBehavior01 {case class LoginEvent (userId:String, message:String) Timestamp:Long) {override def toString: String = userId} def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment / / use IngestionTime as EventTime env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) / / used to observe the test data processing order env.setParallelism (1) / / Analog data source val loginEventStream: DataStream [LoginEvent] = env.fromCollection (LoginEvent ("1") "TMD", 1618498576), LoginEvent ("1", "TMD", 1618498577), LoginEvent ("1", "TMD", 1618498579), LoginEvent ("1", "TMD", 1618498582), LoginEvent ("2", "TMD", 1618498583), LoginEvent ("1", "TMD" AssignAscendingTimestamps (_ .timestamp * 1000) / / defines the pattern val loginEventPattern: Pattern [LoginEvent, LoginEvent] = Pattern.begin [LoginEvent] ("begin") .where (_ .message = = "TMD") .times (5) .matching (Time.seconds (10)) / / matching pattern val patternStream: PatternStream [LoginEvent] = CEP.pattern (loginEventStream.keyBy (_ .UserID) LoginEventPattern) import scala.collection.Map val result = patternStream.select ((pattern:Map [String, Iterable [LoginEvent]]) = > {val first = pattern.getOrElse ("begin", null). Iterator.next () (first.userId, first.timestamp)}) / / malicious user The actual processing can be done on a per-user basis, and only the user's result.print ("malicious user >") env.execute ("BarrageBehavior01")}} will be printed here for simplification.
Example 2: monitoring screen-brushing users
Rule: if a user enters the same sentence more than 5 times in 10 seconds at the same time, it is considered to be malicious screen brushing.
Use Flink CEP to detect screen brushing users
Object BarrageBehavior02 {case class Message (userId: String, ip: String, msg: String) def main (args: Array [String]): Unit = {/ / initialize the runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment / / set parallelism env.setParallelism (1) / / Analog data source val loginEventStream: DataStream [Message] = env.fromCollection (List ("1", "192.168.0.1", "beijing") Message ("1", "192.168.0.2", "beijing"), Message ("1", "192.168.0.3", "beijing"), Message ("1", "192.168.0.4", "beijing"), Message ("2", "192.168.10.10", "shanghai"), Message ("3", "192.168.10.10") "beijing"), Message ("3", "192.168.10.11", "beijing"), Message ("4", "192.168.10.10", "beijing"), Message ("5", "192.168.10.11", "shanghai"), Message ("4", "192.168.10.12", "beijing"), Message ("5" "192.168.10.13", "shanghai"), Message ("5", "192.168.10.14", "shanghai"), Message ("5", "192.168.10.15", "beijing"), Message ("6", "192.168.10.16", "beijing"), Message ("6", "192.168.10.17", "beijing") Message ("6", "192.168.10.18", "beijing"), Message ("5", "192.168.10.18", "shanghai"), Message ("6", "192.168.10.19", "beijing"), Message ("6", "192.168.10.19", "beijing"), Message ("5", "192.168.10.18") "shanghai") / / define the pattern val loginbeijingPattern = Pattern.begin [Message] ("start") .where (_ .msg! = null) / / A login failure.times (5) .optional / / prints data pairs that satisfy five times.match (Time.seconds (10)) / / to match groups val loginbeijingDataPattern = CEP.pattern (loginEventStream.keyBy (_ .UserID)) LoginbeijingPattern) / / find data that conforms to the rules val loginbeijingResult: datastream [patternSelectFun [String [message] = loginbeijingDataPattern.select (patternSelectFun = (pattern: collection.Map [String, Iterable [message]]) = > {var loginEventList: option [String]] = null loginEventList = pattern.get ("start") match {case Some (value) = > {if (x = > (x.userId) X.msg) .distinct.size = = 1) {Some (value)} else {None} loginEventList}) / / print test loginbeijingResult.filter (x = > x match {case Some (value) = > value}}) .print () Env.execute ("BarrageBehavior02)}} Flink CEP API
In addition to the several API introduced in the case, we will introduce other commonly used API:
1. Conditional API
In order for the incoming event to be accepted by the pattern, specify the conditions that the incoming event must meet, which are set by the properties of the event itself or the attribute statistics of the previously matched event. For example, a value of an event is greater than 5, or greater than the average value of a previously accepted event.
You can use the pattern.where (), pattern.or (), and pattern.until () methods to specify conditions. The condition can be either iterative condition IterativeConditions or simple condition SimpleConditions.
FlinkCEP supports three proximity conditions between events:
Next (): strictly satisfy the condition
Example: the schema is begin ("first"). Where (_. Name='a'). Next ("second"). Where (.name ='b') if and only if the data is a meme b, the pattern is hit. If the data is a memory c _ b, because an is followed by c, a will be discarded directly and the pattern will not be hit.
FollowedBy (): loosely satisfied condition
Example: the pattern is begin ("first"). Where (_. Name='a'). FollowedBy ("second"). Where (.name ='b') if and only if the data is aline b or a pencil b, the pattern is hit and the middle c will be ignored.
FollowedByAny (): indeterminate loose satisfying condition
Example: the pattern is begin ("first"). Where (_. Name='a'). FollowedByAny ("second"). Where (.name ='b') if and only if the data is aforcec, the hit for the followedBy pattern is {aforce b}, and for the followedByAny there will be two hits {aforce b}, {aperior b}.
two。 Quantifier API
Remember what we said when we explained the concept of patterns above: in general, patterns are singleton patterns, which can be converted into circular patterns using quantifiers (Quantifiers). The quantifier here refers to the quantifier API.
The following quantifiers, API, can specify the pattern as a circular pattern:
Pattern.oneOrMore (): a given event occurs one or more times, such as b + mentioned above.
Pattern.times (# ofTimes): a given type of event occurs a specified number of times, for example, 4 times.
Pattern.times (# fromTimes, # toTimes): the number of times an event of a given type occurs within a specified range of times, for example, 2 times 4 times.
You can use the pattern.greedy () method to turn a pattern into a circular pattern, but you cannot turn a set of patterns into a circular pattern. Greedy: just repeat as much as possible.
Use the pattern.optional () method to make the loop pattern optional, either a loop pattern or a single pattern.
3. Skip strategy after matching
The so-called matching skip strategy is to filter multiple successful matching patterns. In other words, if multiple matches are successful, maybe I don't need so many, just filter them according to the matching strategy.
There are five skip strategies in Flink:
NO_SKIP: without filtering, all possible matches are emitted.
SKIP_TO_NEXT: discard the same event as the event to which the match started, emit the event to which the match starts, that is, jump directly to the event to which the next pattern matches, and so on.
SKIP_PAST_LAST_EVENT: discards events that are matched after the match starts but before the match ends.
SKIP_TO_FIRST [PatternName]: discards events that are matched after the match starts but before the first event to which the PatternName pattern matches.
SKIP_TO_LAST [PatternName]: discards events that are matched after the match starts but before the last event to which the PatternName pattern matches.
How to understand the above strategy, let's take NO_SKIP and SKIP_PAST_LAST_EVENT as examples to explain:
In the pattern is: begin ("start"). Where (_. Name='a'). OneOrMore (). FollowedBy ("second"). Where (_. Name='b'), we enter the data: a NO_SKIP strategy, that is, an unfiltered strategy, and a pattern match to: {ajinga b}, {acent acot b}, {ajinajol b}, {arecine acot b}, In the case of a SKIP_PAST_LAST_EVENT policy, that is, to discard events that are matched after the match begins but before the end, the pattern matches to: {a _ remiere _ a _ journal _ b}.
Usage scenarios of Flink CEP
In addition to the above case scenarios, Flink CEP is also widely used in network fraud, fault detection, risk aversion, intelligent marketing and other fields.
1. Real-time anti-cheating and risk control
For e-commerce, econnoisseur is essential. Pinduoduo in China once exposed a 100-yuan non-threshold coupon to get casually, and he was matted by tens of billions of people that night. For this situation, he must not have done a good job in timely risk control. In addition, when merchants put products on the shelves, they frequently change the name of the product and abuse the title to improve the ranking of search keywords, and register a batch of machine accounts to quickly brush orders to increase the sales volume of goods and other cheating behavior. a variety of cheating methods also need to constantly make rules to match this behavior.
two。 Real-time marketing
Analyze the real-time behavior of the user in the mobile phone APP, count the user's activity cycle, and recommend to the user by drawing the user. For example, within 1 minute after logging in to APP, the user only browses the goods and does not place an order; after browsing a product, the user checks other similar products within 3 minutes to compare the price; whether the order has been paid within 1 minute after the order is issued. If these data can be well used, then users can be recommended to browse similar products, which can greatly increase the purchase rate.
3. Real-time network attack detection
At present, the Internet security situation is still grim, and network attacks are common and numerous. Here, we use the inflow traffic generated by DDOS (distributed denial of service attacks) as the basis for judging attacks. For real-time detection and early warning of potential attacks on the network, multiple data centers of cloud service vendors will regularly report their instantaneous traffic to the monitoring center, which is considered to be normal if the traffic is within the preset normal range, and does nothing; if the traffic reported by a data center for five consecutive times in 10 seconds exceeds the threshold of the normal range, a warning event will be triggered If the reported traffic in a data center exceeds the threshold of the normal range for 30 consecutive times within 30 seconds, a serious alarm will be triggered.
A brief introduction to the principle of Flink CEP
Apache Flink draws lessons from the model of NFA in Efficient Pattern Matching over Event Streams's paper when implementing CEP. In this paper, we also mention some optimizations. Let's skip it here and only talk about the concept of NFA.
In this paper, it is mentioned that NFA, or Non-determined Finite Automaton, is called an uncertain finite state machine, which means that the state is finite, but each state may be transformed into multiple states (uncertainty).
Nondeterministic finite automatic state machine:
Let's start with two concepts:
State: the state is divided into three categories, the initial state, the intermediate state and the final state.
Conversion: take/ignore/proceed is the name of the transformation.
In NFA matching rules, it is essentially a process of state transition. The meanings of the three transformations are as follows:
Take: it is mainly the judgment of the condition. When a piece of data is used to judge, once the condition is met, the current element is obtained, put into the result set, and then the current state is transferred to the next state.
Proceed: the current state can be transferred to the next state independent of any event, such as the meaning of transparent transmission.
Ignore: when a piece of data arrives, you can ignore this message event, and the current state remains the same, which is equivalent to your own state.
Characteristics of NFA: in NFA, given the current state, there may be multiple next states. You can choose the next state randomly or in parallel (simultaneously). The input symbol can be empty.
Rule engine
Rules engine: separate business decisions from application code and write business decisions using predefined semantic modules. Accept data input, interpret business rules, and make business decisions according to business rules.
Using a rule engine can reduce the maintenance and scalability costs of an application by reducing the complexity of components that implement complex business logic.
1. Drools
Drools is an open source rule engine written in Java, which is usually used to solve the separation of business code and business rules. Its built-in Drools Fusion module also provides CEP functions.
Advantages:
The function is relatively perfect, such as system monitoring, operation platform and other functions.
Rules support dynamic updates.
Disadvantages:
The function of time window is realized in memory, and the time window with long span can not be supported.
Can not effectively support timing touch (for example, the user touches the condition judgment after browsing for a period of time).
2. Aviator
Aviator is a high-performance, lightweight expression evaluation engine implemented in Java language, which is mainly used for dynamic evaluation of various expressions.
Advantages:
Most operational operators are supported.
Support function calls and custom functions.
Regular expression matching is supported.
Support for incoming variables and excellent performance.
Disadvantages:
There are no if else, do while, etc., no assignment statements, no bit operators.
3. EasyRules
EasyRules integrates a lightweight rule engine for MVEL and SpEL expressions.
Advantages:
Lightweight framework, low learning cost.
Based on POJO.
Provide useful abstractions and simple applications for defining business engines.
Support to build from simple rules to complex rules.
4. Esper
Esper is designed for a lightweight solution of CEP, which can be easily embedded in services and provide CEP functions.
Advantages:
Lightweight can be embedded in development, commonly used CEP functions are simple and easy to use.
EPL grammar is similar to SQL in that it costs less to learn.
Disadvantages:
Stand-alone full-memory solution requires the integration of other distributions and storage.
The function of time window is realized in memory, and the time window with long span can not be supported.
Can not effectively support timing touch (for example, the user touches the condition judgment after browsing for a period of time).
5. Flink CEP
Flink is a streaming system with high throughput and low latency. Flink CEP is a set of universal and easy-to-use real-time streaming event processing scheme.
Advantages:
Inherit the characteristics of high throughput of Flink.
Event support is stored externally and can support a longer time window.
Can support timing touch (implemented with followedBy + PartternTimeoutFunction).
Thank you for your reading, the above is the content of "what is Flink CEP", after the study of this article, I believe you have a deeper understanding of what is Flink CEP, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.