Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Chapter V Flink Complex Event Processing complex event handling

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Reprint should be marked with the source mythmoon@163.com.

Complex Event Processing complex event processing

In the previous chapter, we talked about the Table API provided by Apache Flink and how we can use it to process relational data structures. This chapter onwards, we will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let's try to understand a library called Comrlex Event Processing (CEP). CEP is a very interesting but complex topic that has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let's try to understand what CEP is all about. In the previous chapter, we discussed the table api provided by Apache Flink and how to use it to deal with relational data structures. Later in this chapter, we will begin to learn more about the libraries provided by apacheflink and how to use them for specific use cases. First, let's try to learn about a library called Comrlex Event Processing (CEP). Cep is a very interesting but complex topic, and it has its value in every industry. As long as there is an expected flow of events, people naturally want to perform complex event handling in all such use cases. Let's try to understand the meaning of cep.

What is complex event processing? What is complex event handling?

CEP analyzes streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: cep analyzes different streams of events that occur at high frequency and low delay. Nowadays, streaming media events can be found in different industries, such as:

In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment in the field of oil and gas, sensor data come from various drilling tools or upstream oil pipeline equipment

In the security domain, activity data, malware information, and usage pattern data come from various end points in the security domain, activity data, malware information, and usage pattern data come from different endpoints

In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on in the wearable field, the data comes from different wristbands, which contain information about your heart rate, activity, etc.

In the banking domain, data comes from credit card usage, banking activities, and so on in the banking field, data from credit card use, banking activities, etc.

It is very important to analyze variation patterns to get notified in real time about any change in the regular assembly. CEP can understand patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to prevent damage: it is important to analyze the variant schema to get real-time notification of any changes in a regular assembly. Cep can understand patterns across event streams, sub-events, and their sequences. Cep helps to identify meaningful patterns and complex relationships between unrelated events and to send notifications in real and near real time to prevent damage:

The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: the figure above shows how cep streams work. Although the flow looks simple, cep also has various capabilities, such as:

The ability to produce results as soon as the input event stream is available's ability to generate results when the input event stream is available

The ability to provide computations such as aggregation over time and timeout between two events of interest provides the ability to calculate, such as aggregation over time and timeouts between two events of interest

The ability to provide real-timeInear real-time alerts and notifications on detection of complex event patterns can provide real-time input real-time alerts and notifications to detect complex event patterns

The ability of The ability to connect and correlate heterogeneous sources and analyze patterns in them to connect and associate heterogeneous sources and analyze patterns within them

The ability to achieve high-throughput, low-latency processing's ability to achieve high throughput and low latency processing

There are various solutions available on the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink's CEP library.

There are all kinds of solutions on the market. With the development of big data's technology, we have a variety of options, such as apache spark, apache samza, apache beam, etc., but we don't have a dedicated library to accommodate all the solutions. Now, let's try to understand what flink's cep library can achieve.

Flink CEP

Apache Flink provides the Flink CEP library, which provides APIs to perform complex event processing. The library consists of the following core components: apache flink provides the flink cep library, which provides api for performing complex event processing. The library consists of the following core components:

Event stream

Pattern definition schema definition

Pattern detection mode detection

Alert generation warning generation

Flink CEP works on Flink's streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink's CEP engine detects the pattern and takes the appropriate action, such as generating alerts. Flink cep is suitable for flink streaming api named datastream. The programmer needs to define the pattern to be detected from the event flow, and then flink's cep engine detects the pattern and takes appropriate actions, such as generating alerts.

In order to get started, we need to add the following Maven dependency:

Org.apache.flink

Flink-streaming-java_2.11

1.1.4

Org.apache.flink

Flink-streaming-scala_2.11

1.1.4

Org.apache.flink

Flink-connector-kafka-0.9_2.11

1.1.4

Next we need to do following things for using Kafka.

First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this.

EventDeserializationSchema.java:

Package com.demo.chapter05

Import java.io.IOException

Import java.nio.charset.StandardCharsets

Import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor

Import org.apache.flink.streaming.util.serialization.DeserializationSchema

Public class EventDeserializationSchema implements DeserializationSchema {

Public TypeInformation getProducedType () {return TypeExtractor.getForClass (TemperatureEvent.class)

}

Public TemperatureEvent deserialize (byte [] arg0) throws IOException {String str = new String (arg0, StandardCharsets.UTF_8)

String [] parts = str.split ("=")

Return new TemperatureEvent (parts [0], Double.parseDouble (parts [1]))

}

Public boolean isEndOfStream (TemperatureEvent arg0) {return false

}

}

Next we create topics in Kafka called temperature:

Bin/kafka-topics.sh-create-zookeeper localhost:2181-replication- factor 1-partitions 1-topic temperature

Now we move to Java code which would listen to these events in Flink streams: now let's go to the java code, which listens for these events in the flink stream:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ()

Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "localhost:9092"); properties.setProperty ("group.id", "test")

DataStream inputEventStream = env.addSource (

New FlinkKafkaConsumer09 ("temperature", new EventDeserializationSchema (), properties))

Next we will define the pattern to check if the temperature is greater than 26.0 degrees Celsius within 10 seconds:

Pattern warningPattern = Pattern. Begin ("first") .subtype (TemperatureEvent.class) .where (new FilterFunction () {

Private static final long serialVersionUID = 1L

Public boolean filter (TemperatureEvent value) {if (value.getTemperature ()) > = 26.0) {

Return true

}

Return false

}

}) .notify (Time.seconds (10))

Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here:

DataStream patternStream = CEP.pattern (inputEventStream, warningPattern)

.select (new PatternSelectFunction () {private static final long serialVersionUID = 1L)

Public Alert select (Map event) throws Exception {

Return new Alert ("Temperature Rise Detected:" + event.get ("first") .getTemperature ()

+ "on machine name:" + event.get ("first") .getMachineName ()

}

});

In order to know what the alerts were generated, we will print the results:

PatternStream.print ()

And we execute the stream:

Env.execute ("CEP on Temperature Sensor")

Now we are all set to execute the application. As and when we get messages in Kafka topics, the CEP will keep on executing.

The actual execution will looks like the following. Here is how we can provide sample input:

Xyz=21.0 xyz=30.0 LogShaft=29.3 Boiler=23.1 Boiler=24.2 Boiler=27.0 Boiler=29.0

Here is how the sample output will look like:

Connected to JobManager at Actor[akka: / / flink/user/jobmanager_1#1010488393]

10/09/2016

18:15:55

Job execution switched to status RUNNING.

10/09/2016

18:15:55

Source: Custom Source (1amp 4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source (1amp 4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source (2amp 4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source (2amp 4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source (3thumb 4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source (3thumb 4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source (4amp 4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source (4amp 4) switched to DEPLOYING

10/09/2016

18:15:55

CEPPatternOperator (1Acer 1) switched to SCHEDULED

10/09/2016

18:15:55

CEPPatternOperator (1Acer 1) switched to DEPLOYING

10/09/2016

18:15:55

Map-> Sink: Unnamed (1gamma 4) switched to SCHEDULED

10/09/2016

18:15:55

Map-> Sink: Unnamed (1gamma 4) switched to DEPLOYING

10/09/2016

18:15:55

Map-> Sink: Unnamed (2amp 4) switched to SCHEDULED

10/09/2016

18:15:55

Map-> Sink: Unnamed (2amp 4) switched to DEPLOYING

10/09/2016

18:15:55

Map-> Sink: Unnamed (3gamma 4) switched to SCHEDULED

10/09/2016

18:15:55

Map-> Sink: Unnamed (3gamma 4) switched to DEPLOYING

10/09/2016

18:15:55

Map-> Sink: Unnamed (4amp 4) switched to SCHEDULED

10/09/2016

18:15:55

Map-> Sink: Unnamed (4amp 4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source (2amp 4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source (3thumb 4) switched to RUNNING

10/09/2016

18:15:55

Map-> Sink: Unnamed (1gamma 4) switched to RUNNING

10/09/2016

18:15:55

Map-> Sink: Unnamed (2amp 4) switched to RUNNING

10/09/2016

18:15:55

Map-> Sink: Unnamed (3gamma 4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source (4amp 4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source (1amp 4) switched to RUNNING

10/09/2016

18:15:55

CEPPatternOperator (1Acer 1) switched to RUNNING

10/09/2016

18:15:55

Map-> Sink: Unnamed (4amp 4) switched to RUNNING

1 > Alert [message=Temperature Rise Detected:30.0 on machine name:xyz]

2 > Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft] 3 > Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler] 4 > Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler]

We can also configure a mail client and use some external web hook to send e-mail or messenger notifications.

Summary of Summary

In this chapter, we learnt about CEP. We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well. In this chapter, we learned about cep. We discussed the challenges involved and how to use the flink cep library to solve cep problems. We also learned about the pattern api and various operators that we can use to define the pattern. In the last section, we try to join points and see a complete use case. With some changes, this setting can also be used in various other domains.

In the next chapter, we will see how to use Flink's built-in Machine Learning library to solve complex problems.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report