In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Original text link
Flink CEP0. An overview of this article
FlinkCEP is a complex event handling (CEP) library implemented on top of Flink. It allows you to detect event patterns in a stream of events, giving you the opportunity to master important things in the data.
This article describes the API calls available in Flink CEP. First introduce Pattern API, which allows you to specify the patterns to be detected in the flow, and then describe how to detect and manipulate a sequence of matching events.
Then, we will introduce the assumptions made by the CEP library when dealing with event delays.
1. Introduction
The first step is to introduce the CEP library into your pom.xml file.
Org.apache.flink flink-cep_2.11 1.5.0
Note that events in the DataStream to which pattern matching is to be applied must implement the correct equals () and hashCode () methods, because FlinkCEP uses them to compare and match events.
The first demo is as follows:
Java:
DataStream input =... Pattern pattern = Pattern.begin ("start") .where (new SimpleCondition () {@ Override public boolean filter (Event event) {return event.getId () = = 42 }}) .next ("middle") .subtype (SubEvent.class) .where (new SimpleCondition () {@ Override public boolean filter (SubEvent subEvent) {return subEvent.getVolume () > = 10.0 }) .followedBy ("end") .where (new SimpleCondition () {@ Override public boolean filter (Event event) {return event.getName () .equals ("end");}}); PatternStream patternStream = CEP.pattern (input, pattern) DataStream result = patternStream.select (new PatternSelectFunction {@ Override public Alert select (Map pattern) throws Exception {return createAlertFrom (pattern);})
Scala:
Val input: DataStream [Event] =... val pattern = Pattern.begin [Event] ("start"). Where (_ .getId = = 42). Next ("middle") .subtype (classOf [subevent]). Where (_ .getVolume > = 10.0) .followedBy ("end"). Where (_ .ge tName = "end") val patternStream = CEP.pattern (input, pattern) val result: DataStream [Alert] = patternStream.select (createAlert (_) 2.Pattern API)
Pattern API allows you to define a complex sequence of patterns to extract from the input stream.
Each complex pattern sequence consists of multiple simple patterns, that is, patterns that look for a single event with the same attributes. We can first define some simple patterns and then combine them into a complex sequence of patterns.
You can think of a sequence of patterns as a structural diagram of such patterns, transforming from one pattern to the next based on user-specified conditions, for example, event.getName (). Equals ("start").
A match is a series of input events that access all patterns in a complex pattern graph through a series of valid pattern transformations.
Note that each pattern must have a unique name so that it can be used later to identify matching events.
Note that the pattern name cannot contain the character ":".
In the next part of this section, we will first describe how to define a single pattern, and then how to combine individual patterns into complex patterns.
2.1 single mode
The Pattern can be a single or circular mode. A single pattern accepts a single event, while a loop mode can accept multiple events. In the pattern matching symbol, the pattern "a b + c? d" (or "a" followed by one or more "b", optionally followed by "c" followed by "d"), a. , and d are singleton mode, while b + is circular mode.
By default, the pattern is a single mode, which you can convert to circular mode using Quantifiers. Each pattern can have one or more conditions based on which it accepts events.
2.1.1 Quantifiers
In FlinkCEP, you can specify circular patterns using the following methods: pattern.oneOrMore () for patterns that expect one or more events to occur (for example, b + mentioned earlier); and pattern.times (# ofTimes)
Patterns for expecting specific occurrences of events of a given type, such as 4; and patterntimes (# fromTimes,#toTimes), for patterns that expect minimum and maximum occurrences of events of a given type, for example, 2-4.
You can use the pattern.greedy () method to make circular mode greedy, but not group mode greedy. You can use the pattern.optional () method to make all patterns, cyclic or not, optional.
For the schema named start, the following is a valid Quantifiers:
/ / expecting 4 occurrences start.times (4); / / expecting 0 or 4 occurrences start.times (4). Optional (); / expecting 2,3 or 4 occurrences start.times (2,4); / / expecting 2,3 or 4 occurrences and repeating as many as possible start.times (2,4). Greedy (); / expecting 0,2,3 or 4 occurrences start.times (2,4). Optional () / / expecting 0,2,3 or 4 occurrences and repeating as many as possible start.times (2,4). Optional (). Greedy (); / expecting 1 or more occurrences start.oneOrMore (); / / expecting 1 or more occurrences and repeating as many as possible start.oneOrMore (). Greedy (); / / expecting 0 or more occurrences start.oneOrMore (). Optional (); / / expecting 0 or more occurrences and repeating as many as possible start.oneOrMore (). Optional (). Greedy () / / expecting 2 or more occurrences start.timesOrMore (2); / / expecting 2 or more occurrences and repeating as many as possible start.timesOrMore (2). Greedy (); / / expecting 0,2 or more occurrences and repeating as many as possible start.timesOrMore (2). Optional (). Greedy (); 2.1.2 Conditions- condition
In each mode, you can specify additional conditions when you move from one mode to the next. You can use the following conditions:
The properties of the incoming event, for example, its value should be greater than 5, or greater than the average of previously accepted events.
For the continuity of matching events, such as the detection pattern aphib _ c, there can be no unmatched events in the middle of the sequence. 2.1.3 Conditions on Properties- conditions on attributes
You can specify the condition of the event property through the pattern.where (), pattern.or (), or pattern.until () methods. The condition can be IterativeConditions or SimpleConditions.
Iteration conditions:
This is the most common type of condition. You can specify a condition that accepts subsequent events based on the properties of previously accepted events or the statistics of their subsets.
The following code says that if the name starts with "foo" and if the sum of the previously accepted events of the pattern plus the price of the current event does not exceed that value, the iterative condition accepts the next event of the pattern named "middle".
Iterative conditions can be powerful, especially when combined with circular patterns, such as oneOrMore ().
Java
Middle.oneOrMore () .where (new IterativeCondition () {@ Override public boolean filter (SubEvent value, Context ctx) throws Exception {if (! value.getName (). StartsWith ("foo")) {return false;} double sum = value.getPrice (); for (Event event: ctx.getEventsForPattern ("middle")) {sum + = event.getPrice () } return Double.compare (sum, 5.0)
< 0; }}); Scala: middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) =>{lazy val sum = ctx.getEventsForPattern ("middle") .map (_ .getPrice) .sum value.getName.startsWith ("foo") & & sum + value.getPrice
< 5.0 } ) 注意对context.getEventsForPattern(...)的调用,将为给定潜在匹配项查找所有先前接受的事件。 此操作的代价可能会变化巨大,因此在使用条件时,请尽量减少其使用。 简单条件: 这种类型的条件扩展了前面提到的IterativeCondition类,并且仅根据事件本身的属性决定是否接受事件。 start.where(new SimpleCondition() { @Override public boolean filter(Event value) { return value.getName().startsWith("foo"); }}); 最后,还可以通过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型的子类型。 Java: start.subtype(SubEvent.class).where(new SimpleCondition() { @Override public boolean filter(SubEvent value) { return ... // some condition }}); Scala: start.where(event =>Event.getName.startsWith ("foo") combination condition:
As shown above, you can combine subtype conditions with other conditions. This applies to all conditions. You can combine any condition by calling where () sequentially.
The end result will be the logical AND of the result of each condition. To combine conditions using OR, you can use the or () method, as shown below.
Pattern.where (new SimpleCondition () {@ Override public boolean filter (Event value) {return... / / some condition}}) .or (new SimpleCondition () {@ Override public boolean filter (Event value) {return... / / or condition}})
Scala:
Pattern.where (event = >... / * some condition * /) .or (event = >... / * or condition * /) stop condition:
In the case of loop mode (oneOrMore () and oneOrMore (). Optional ()), you can also specify stop conditions, such as accepting events with values greater than 5 until the sum of the values is less than 50.
To better understand, take a look at the following example:
Given pattern: (a + until b), before b, one or more an appears.
Given input sequence: A1, crec, a2, b, 3.
Output: {A1 a2} {A1} {a2} {a3}
We can see that there is no output for the two words {a1mena2recovera3} and {a2jina3}. This is the effect of the stop condition.
Continuous event condition
FlinkCEP supports continuity between events in the following forms:
Strict continuity: you want all matching events to occur one after another without any mismatched events.
Loose continuity: ignores mismatches between matching events. Matching events between two events cannot be ignored.
Non-deterministic easy continuity: further relaxes continuity to allow other matches for certain matching events to be ignored.
In order to explain the above, let's give an example. If there is a pattern sequence "a + b", and the input sequence "a1Magnec 2reb", there are different differences under different continuous conditions:
Strict continuity: {a2b}-A1 is abandoned due to the existence of c
Loose continuity: {A1 _ 2b} and {A1 _ a2b}-c are ignored
Non-deterministic easing continuity: {A1 b}, {a2 b}, and {A1 a2 b}
For circular modes such as oneOrMore () and times (), the default is loose continuity. If you want strict continuity, you must specify it explicitly using consecutive ()
If you want non-deterministic relaxation continuity, you can use the allowCombinations () method.
Note that in this section, we are talking about continuity in a single loop pattern, and we need to understand consecutive () and allowCombinations () in that context.
When we talk about combinational patterns later, we will discuss other methods, such as next () and followedBy (), to specify continuity conditions between patterns.
2.1.4 introduction to API where (condition)
Defines the conditions for the current mode. In order to match the pattern, the event must meet the condition. Multiple consecutive where (), whose condition is AND:
Pattern.where (new IterativeCondition () {@ Override public boolean filter (Event value, Context ctx) throws Exception {return... / / some condition}}); or (condition)
Add a new condition for OR operations with existing conditions. The event can match the pattern only if at least one of the conditions is passed:
Pattern.where (new IterativeCondition () {@ Override public boolean filter (Event value, Context ctx) throws Exception {return... / / some condition}) .or (new IterativeCondition () {@ Override public boolean filter (Event value, Context ctx) throws Exception {return... / / alternative condition}}); until (condition)
Specifies the stop condition for loop mode. This means that if an event that matches a given condition occurs, the event in the pattern is no longer accepted.
Applies only to oneOrMore ()
Note: it allows you to clear the state of the corresponding pattern under event-based conditions.
Pattern.oneOrMore () .until (new IterativeCondition () {@ Override public boolean filter (Event value, Context ctx) throws Exception {return... / / alternative condition}}); subtype (subClass)
Defines the subtype conditions of the current schema. If the event belongs to this subtype, the event can only match the pattern:
Pattern.subtype (SubEvent.class); oneOrMore ()
Specifies that at least one matching event occurs in this pattern.
By default, loose internal continuity is used.
Note: it is recommended to use until () or within () to enable stateful cleanup
Pattern.oneOrMore () .until (new IterativeCondition () {@ Override public boolean filter (Event value, Context ctx) throws Exception {return... / / alternative condition}}); timesOrMore (# times)
Specifies that this pattern requires at least # times matching events.
By default, loose internal continuity is used (between subsequent events).
Pattern.timesOrMore (2); times (# ofTimes)
Specifies the exact number of times this pattern needs to match the event.
By default, loose internal continuity is used (between subsequent events).
Pattern.times (2); times (# fromTimes, # toTimes)
Specifies that this pattern is expected to occur between # fromTimes and # toTimes times of matching events.
By default, loose internal continuity is used.
Pattern.times (2,4); optional ()
Specify that this mode is optional, that is, it may not happen at all. This applies to all the above quantifiers.
Pattern.oneOrMore (). Optional (); greedy ()
Specify that this pattern is greedy, that is, it will be repeated as much as possible. This applies only to quantifiers, and group mode is not currently supported.
Pattern.oneOrMore (). Greedy (); consecutive ()
Used with oneOrMore () and times () and imposes strict continuity between matching events, that is, any mismatched element will break the match.
If not, use loose continuity (such as followBy ()).
For example, this mode:
Pattern.begin ("start") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("c");}}) .followedBy ("middle") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("a")) }}) .oneOrMore () .consecutive () .followedBy ("end1") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("b");}})
For the above pattern, if we enter a sequence such as C D A1 A 2 A 3 D A4 B
Use consecutive: {C A1 B}, {C A1 A 2B}, {C A1 A 2A 3 B}
Do not use: {C A1 B}, {C A1 A 2B}, {C A1 A 2A 3 B}, {C A1 A 2A 3 A 4 B}
AllowCombinations ()
Used with oneOrMore () and times (), and imposes non-deterministic loose continuity between matching events (such as followedByAny ()).
If not, use loose continuity (such as followBy ()).
For example, this mode:
Pattern.begin ("start") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("c");}}) .followedBy ("middle") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("a")) }}) .oneOrMore () .allowCombinations () .followedBy ("end1") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("b");}})
For the above pattern, if we enter a sequence such as C D A1 A 2 A 3 D A4 B
Use allowCombinations: {C A1B}, {C A1A2B}, {CA1A3B}, {CA1A4B}, {CA1A2A3B}, {CA1A3A4B}, {CA1A3A4B}, {CA1AA3A4B}
Do not use: {C A1 B}, {C A1 A 2B}, {C A1 A 2A 3 B}, {C A1 A 2A 3 A 4 B}
2.2 introduction to combination Mode 2.2.1
Now that you know what a single pattern looks like, it's time to look at how to combine them into a complete sequence of patterns.
The pattern sequence must start with the initial pattern, as follows:
Pattern start = Pattern.begin ("start")
Next, you can add more patterns to the pattern sequence by specifying the required continuity conditions between them. In the previous section, we described the different adjacency modes supported by Flink, namely strict, loose, and non-deterministic easing, and how to apply them in circular mode.
To apply them between continuous modes, you can use:
The next () correspondence is strict.
FollowedBy () corresponds to loose continuity
FollowedByAny () corresponds to non-deterministic loose continuity
Or
NotNext () if you don't want one event type to appear immediately after another.
NotFollowedBy () does not want this event to occur anywhere between two events.
Note that the pattern sequence cannot end with notFollowedBy ().
Note that the NOT mode cannot be preceded by an optional mode.
/ / strict contiguityPattern strict = start.next ("middle"). Where (...); / / relaxed contiguityPattern relaxed = start.followedBy ("middle"). Where (...); / / non-deterministic relaxed contiguityPattern nonDetermin = start.followedByAny ("middle"). Where (...); / / NOT pattern with strict contiguityPattern strictNot = start.notNext ("not"). Where (...); / / NOT pattern with relaxed contiguityPattern relaxedNot = start.notFollowedBy ("not"). Where (...)
Loose continuity means that only the first successfully matched event will be matched, while non-deterministic loose continuity will have multiple matching results at the same start. Distance, if a pattern is "a b", the given input sequence is "a c b1 b2". There will be different outputs for different continuity.
Strict continuity between an and b will return {}, that is, there is no match. Because the appearance of c causes a to be abandoned.
The loose continuity between an and b returns {a _ journal b1}, because the loose continuity will be discarded as a matching successful element until it matches to the next event to match.
The non-deterministic loose continuity between an and b returns {acentine b1}, {amenade b2}.
You can also define time constraints for patterns. For example, you can use the pattern.within () method to define that the pattern should occur within 10 seconds. The time mode supports processing time and event time.
Note that pattern sequences can only have one time constraint. If multiple such constraints are defined on different separate modes, the smallest constraint is applied.
Next.within (Time.seconds (10))
You can define a pattern sequence as a condition for begin,followBy,followByAny and next. The pattern sequence will be logically treated as a matching condition and will return GroupPattern and
You can use oneOrMore (), times (# ofTimes), times (# fromTimes,#toTimes), optional (), consecutive (), allowCombinations () and other methods for GroupPattern.
PatternPatte start = Pattern.begin (Pattern.begin ("start"). Where (...). FollowedBy ("start_middle"). Where (...); / / strict contiguityPattern strict = start.next (Pattern.begin ("next_start"). Where (.). FollowedBy ("next_middle"). Where (...) .times (3) / / relaxed contiguityPattern relaxed = start.followedBy (Pattern.begin ("followedby_start"). Where (...). FollowedBy ("followedby_middle"). Where (...). OneOrMore (); / / non-deterministic relaxed contiguityPattern nonDetermin = start.followedByAny (Pattern.begin ("followedbyany_start"). Where (.). FollowedBy ("followedbyany_middle"). Where (...). Optional (); 2.2.2 APIbegin (# name)
Define a start mode
Pattern start = Pattern.begin ("start"); begin (# pattern_sequence)
Define a start mode
Pattern start = Pattern.begin (Pattern.begin ("start"). Where (...). FollowedBy ("middle"). Where (...); next (# name)
Add a new mode. The matching event must directly follow the previous matching event (strict continuity):
Pattern next = start.next ("middle"); next (# pattern_sequence)
Add a new mode. Matching events must be directly followed by previous matching events (strict continuity):
Pattern next = start.next (Pattern.begin ("start"). Where (...). FollowedBy ("middle"). Where (...); followedBy (# name)
Add a new mode. Other mismatched events may occur between matching events and previous matching events (loose contiguous):
Pattern followedBy = start.followedBy ("middle"); followedBy (# pattern_sequence)
Add a new mode. Other mismatched events may occur between matching events and previous matching events (loose contiguous):
Pattern followedBy = start.followedBy (Pattern.begin ("start"). Where (...). FollowedBy ("middle"). Where (...); followedByAny (# name)
Add a new mode. Other events may occur between matching events and previous matching events, and an alternative match will be presented for each alternative matching event (non-deterministic relaxation continuity):
Pattern followedByAny = start.followedByAny ("middle"); followedByAny (# pattern_sequence)
Add a new mode. Other events may occur between matching events and previous matching events, and an alternative match will be presented for each alternative matching event (non-deterministic relaxation continuity):
Pattern followedByAny = start.followedByAny (Pattern.begin ("start"). Where (...). FollowedBy ("middle"). Where (...); notNext ()
Add a new negative mode. Matching (negative) events must directly follow previous matching events (strict continuity) in order to discard partial matches:
Pattern notNext = start.notNext ("not"); notFollowedBy ()
Append a new negative pattern match. Even if other events occur between matching (negative) events and previous matching events (loose continuity), partial matching event sequences are discarded:
Pattern notFollowedBy = start.notFollowedBy ("not"); within (time)
Defines the maximum time interval for pattern matching in a sequence of events. If the unfinished sequence of events exceeds this time, it is discarded:
Pattern.within (Time.seconds (10)); 2.3Skip strategy after matching
For a given pattern, the same event can be assigned to multiple successful matches. To control the number of matches that will be assigned to events, you need to specify a skip policy named AfterMatchSkipStrategy.
There are four types of skip policies, as follows:
NO_SKIP: every possible match will be issued.
SKIP_PAST_LAST_EVENT: discards every partial match that contains a matching event.
SKIP_TO_FIRST: discards each partial match that contains the first previous match event of the PatternName.
SKIP_TO_LAST: discard each partial match that contains the last matching event of the PatternName.
Note that when using SKIP_TO_FIRST and SKIP_TO_LAST skip policies, you should also specify a valid PatternName.
For example, for a given pattern ab {2} and data flow ab1,ab2,ab3,ab4,ab5,ab6, the differences between the four skip strategies are as follows:
To specify the skip policy to use, simply call the following command to create the AfterMatchSkipStrategy:
How to use it:
AfterMatchSkipStrategy skipStrategy =... Pattern.begin ("patternName", skipStrategy); 2.4Detection mode-Detecting Patterns
After you specify the sequence of patterns to find, you can apply it to the input stream to detect potential matches. To run an event flow against a pattern sequence, you must create a PatternStream.
Given the input stream input, mode pattern, and optional comparator comparator, to sort or arrive at the same time for events with the same timestamp in the case of EventTime, create a PatternStream by calling the following command:
DataStream input =... Pattern pattern =... EventComparator comparator =... / / optionalPatternStream patternStream = CEP.pattern (input, pattern, comparator)
Depending on the actual situation, the created flow can be key-free or key-free.
Note that using patterns on key-free streams will result in job parallelism of 1.
2.5 Selecting from Patterns
After you get the PatternStream, you can query from the detected sequence of events through the select or flatSelect method.
The select () method requires the implementation of PatternSelectFunction. PatternSelectFunction has a select method called for each sequence of matching events.
It receives matches in the form of Map, where key is the name of each pattern in the pattern sequence, and the value is a list of all accepted events for that pattern (IN is the type of input element).
Events for a given pattern are sorted by timestamp. The reason for returning a list of accepted events for each pattern is that when circular patterns such as oneToMany () and times () are used, multiple events can be accepted for a given pattern.
The selection function returns only one result.
Class MyPatternSelectFunction implements PatternSelectFunction {@ Override public OUT select (Map pattern) {IN startEvent = pattern.get ("start") .get (0); IN endEvent = pattern.get ("end") .get (0); return new OUT (startEvent, endEvent);}}
PatternFlatSelectFunction is similar to PatternSelectFunction, except that it can return any number of results. To do this, the select method has an additional Collector parameter that forwards the output element downstream.
Class MyPatternFlatSelectFunction implements PatternFlatSelectFunction {@ Override public void flatSelect (Map pattern, Collector collector) {IN startEvent = pattern.get ("start") .get (0); IN endEvent = pattern.get ("end") .get (0); for (int I = 0; I
< startEvent.getValue(); i++ ) { collector.collect(new OUT(startEvent, endEvent)); } }}2.6 处理超时部分模式 每当模式具有通过within关键字附加的时间窗口长度时,部分事件序列可能因为超出时间窗口长度而被丢弃。 为了对这些超时的部分匹配作出相应的处理,select和flatSelect API调用允许指定超时处理程序。 为每个超时的部分事件序列调用此超时处理程序。 超时处理程序接收到目前为止由模式匹配的所有事件,以及检测到超时时的时间戳。 为了处理部分模式,select和flatSelect API提供了一个带参数的重载版本 PatternTimeoutFunction/ PatternFlatTimeoutFunction。OutputTag 超时的匹配将会在其中返回。PatternSelectFunction / PatternFlatSelectFunction。PatternStreamPatte patternStream = CEP.pattern(input, pattern);OutputTag outputTag = new OutputTag("side-output"){};SingleOutputStreamOperator result = patternStream.select( new PatternTimeoutFunction() {...}, outputTag, new PatternSelectFunction() {...});DataStream timeoutResult = result.getSideOutput(outputTag);SingleOutputStreamOperator flatResult = patternStream.flatSelect( new PatternFlatTimeoutFunction() {...}, outputTag, new PatternFlatSelectFunction() {...});DataStream timeoutFlatResult = flatResult.getSideOutput(outputTag);2.7 事件事件模式下处理滞后数据 在CEP中,元素处理的顺序很重要。为了保证在采用事件事件时以正确的顺序处理事件,最初将传入的事件放入缓冲区,其中事件基于它们的时间戳以升序排序, 并且当watermark到达时,处理该缓冲区中时间戳小于watermark时间的所有元素。这意味着watermark之间的事件按事件时间顺序处理。 请注意,在采用事件时间时,CEP library会假设watermark是正确的。 为了保证跨watermark的记录按照事件时间顺序处理,Flink的CEP库假定watermark是正确的,并将时间戳小于上次可见watermark的时间视为滞后事件。滞后事件不会被进一步处理。 2.8 例子 以下示例检测事件的带key数据流上的模式start,middle(name ="error") - >End (name = "critical"). The key of the event is its id, and the valid mode must occur within 10 seconds. The whole process is done in event time.
StreamExecutionEnvironment env =... env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); DataStream input =... DataStream partitionedInput = input.keyBy (new KeySelector () {@ Override public Integer getKey (Event value) throws Exception {return value.getId ();}}) Pattern pattern = Pattern.begin ("start") .next ("middle") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("error");}) .followedBy ("end") .where (new SimpleCondition () {@ Override public boolean filter (Event value) throws Exception {return value.getName () .equals ("critical")) (Time.seconds (10)); PatternStream patternStream = CEP.pattern (partitionedInput, pattern); DataStream alerts = patternStream.select (new PatternSelectFunction () {@ Override public Alert select (Map pattern) throws Exception {return createAlert (pattern);}}))
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.