Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the Pulsar Functions-based event handling design pattern?

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article is to share with you what the Pulsar Functions-based event handling design pattern is, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Some common real-time streaming modes and their implementations are introduced below.

Mode 1: dynamic routing

Let's first review how to implement content-based routing using Apache Pulsar Functions. Content-based routing is an integrated mode. This pattern has been around for many years and is commonly used in event centers and messaging frameworks. The basic idea is to examine the content of each message and route the message to a different destination according to the message content.

The following example uses Apache Pulsar SDK,SDK to allow the user to configure three different values:

A regular expression used to find a match in a message

The topic to which the message is sent when it matches the expression pattern

The topic to which the message is sent when it does not match the expression pattern

This example demonstrates the power of Pulsar Functions to dynamically decide where to send events based on functional logic.

Import java.util.regex.*; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function

Public ContentBasedRoutingFunction implements Function {

String process (String input, Context context) throws Exception {String regex = context .getUserConfigValue ("regex"). ToString (); String matchedTopic = context .getUserConfigValue ("matched-topic"). ToString (); String unmatchedTopic = context .getUserConfigValue ("unmatched-topic"). ToString ()

Pattern p = Pattern.compile (regex); Matcher m = p.matcher (input); if (m.matches ()) {context.publish (matchedTopic, input);} else {context.publish (unmatchedTopic, input);}

Mode 2: filterin

If you want to exclude most events on the topic by retaining only events that meet a given condition, apply the selective filtering mode. The filtering mode is particularly effective for finding only events of interest, such as credit card payments exceeding a certain amount, ERROR messages in log files, sensor readings exceeding a specific threshold, and so on (see pattern 4).

Suppose the user is monitoring the event flow of credit card transactions and tries to detect fraudulent or suspicious behavior. Due to the large volume of transactions and the limited time to choose "agree / disagree", users must first filter out transactions with "risk" characteristics, such as cash in advance, large payments and so on.

Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.company.creditcard.Purchase

Public class FraudFilter implements Function {

Purchase process (Purchase p, Context context) throws Exception {if (p.getTransactionType () = = 'CASH ADVANCE') | | p.getAmount > 500.00) {return p;} return null;}}

You can use filters to filter transactions with "risk" characteristics. The filter can identify these "risk" characteristics and route these transactions only to a separate topic for further evaluation.

After filtering, all credit card payments can be routed to a "potentially fraudulent" topic for further evaluation, while other events are filtered out, and the filter does not take any action on filtered events.

The figure above is based on the FraudFilter function of three separate payment objects. The first payment meets the given criteria and is routed to the "potential fraud" topic for further evaluation, while the second and third payments do not meet the fraud criteria and are directly filtered out (not routed to the "potential fraud" filter).

Mode 3: conversion

Conversion patterns are used to convert events from one type to another, or to add, delete, or modify the value of an input event.

| | projection |

The projection pattern is similar to the projection operator in relational algebra, selecting a subset of attributes for input events and creating output events that contain only those attributes. Projection mode can be used to delete sensitive fields in an event or to retain only the necessary attributes in the event. The following figure shows an application of projection mode that "blocks" incoming social security numbers before posting records to downstream topic.

| | enrichment mode |

The enrichment mode is used to add data to output events that do not exist in the input attributes. A typical enrichment pattern involves some lookup of reference data based on a key value in the input event. The following example shows how to add a geolocation to an output event based on the IP address contained in the input event.

Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.company.creditcard.Purchase;import com.company.services.GeoService

Public class IPLookup implements Function {Purchase process (Purchase p) throws Exception {Geo g = GeoService.getByIp (p.getIPAddress ()); / / By default, these fields are blank, so we just modify the object p.setLongitude (g.getLon ()); p.setLatitiude (g.getLat ()); return p;}}

| | detach mode |

In detached mode, the event handler receives a single input event and divides it into a plurality of output events. The detach mode works well when the input event is a batch that contains multiple individual events, such as entry in a log file, and you want to handle each event separately. The following figure shows the process of separating mode: the input is separated by newline characters, and then published line by line to the configured output topic.

The implementation of this function is as follows:

Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function

Public class Splitter implements Function {

String process (String s, Context context) throws Exception {Arrays.asLists (s.split ("\ R")) .forEach (line-> context.publish (context.getOutputTopic (), line)); return null;}}

Mode 4: alerts and thresholds

Alarm and threshold modes can be detected and alerts (such as high temperature alerts) are generated according to the detection conditions. Alerts can be generated based on simple values or based on more complex conditions, such as growth rates, continuous changes in quantities, and so on.

The following example generates an alert based on a user-configured threshold parameter (for example, 100.0010, 38.7, etc.) and the mailbox address that receives alert notifications. When this function receives a sensor event that exceeds the configured threshold, an email is sent.

Import javax.mail.*;import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function

Public SimpleAlertFunction implements Function {Void process (Sensor sensor, Context context) throws Exception {Double threshold = context .getUserConfigValue ("threshold") .toString (); String alertEmail = context .getUserConfigValue ("alert-email") .toString ()

If (sensor.getReading () > = threshold) {Session s = Session.getDefaultInstance (); MimeMessage msg = new MineMessage (s); msg.setText ("Alert for Sensor:" + sensor.getId ()); Transport.send (msg);} return null;}}

The following is an example of a stateful function that generates alerts based on the growth rate of readings for a particular sensor. When deciding whether to generate an alert, you need to access previous sensor readings.

Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function

Public ComplexAlertFunction implements Function {

Void process (Sensor sensor, Context context) throws Exception {Double threshold = context .getUserConfigValue ("threshold") .toString (); String alertTopic = context .getUserConfigValue ("alert-topic") .toString ()

/ / Get previous & current metric values Float previous = context.getState (sensor.getId () + "- metric"); Long previous_time = context.getState (sensor.getId () + "- metric-time"); Float current = sensor.getMetric (); Long current_time = sensor.getMetricTime ()

/ / Calculate Rate of change & compare to threshold. Double rateOfChange = (current-previous) / (current_time-previous_time); if (abs (rateOfChange) > = threshold) {/ / Publish the sensor ID to the alert topic for handling context.publish (alertTopic, sensor.getId ();}

/ / Update metric values context.putState (sensor.getId () + "- metric", current); context.putState (sensor.getId () + "- metric-time", current_time);}}

Only the previous metric readings and time are retained through the Apache Pulsar Functions state management feature, and sensor ID is added to these values (sensor ID is required because metrics from multiple sensors will be processed). For simplicity, assume that events arrive in the correct order, that is, they are always the latest readings and there are no out-of-order readings.

In addition, this time we forwarded the sensor ID to a dedicated alarm topic for further processing, rather than just sending an email. In this way, we can do additional enrichment processing for events (through Pulsar Functions). For example, find the geographic location of the sensor, and then notify the person concerned.

Mode 5: simple count and window count

The simple count and window count modes use aggregate functions, which take a collection of events as input and generate a desired output event by applying a function to the input event. Aggregate functions include: summation, average, maximum, minimum, percentile and so on.

The following is an example of using Pulsar Functions to implement word count, calculating the sum of the number of times each word appears in a given topic.

Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function

Public WordCountFunction implements Function {

Void process (String s, Context context) throws Exception {Arrays.asLists (s.split ("\\.") .forEach (word-> context.incrCounter (word, 1)); return null;}}

Given the endless nature of streaming data source, indefinite aggregation is not useful because these calculations are usually done on the data window (such as the number of failures in the previous hour).

The data window represents a limited subset of the event flow, as shown in the figure above. But how do you define the boundaries of the data window? There are two common properties that define windows:

Trigger policy: controls when function code is executed or triggered. The Apache Pulsar Function framework uses these rules to inform the code to process all the data collected in the window.

Purge policy: controls the amount of data retained in the window. These rules are used to determine whether to clear data elements from the window.

Both strategies are driven by time or the amount of data in the window. What is the difference between the two? And how do they work together? Among a variety of window technologies, scrolling windows and sliding windows are the most commonly used.

| | Scroll window |

The window is full of the only condition for the scrolling window cleanup policy, so you only need to specify that you want to use the trigger policy (count-based or time-based). How does a count-based scrolling window work?

In the first example in the figure below, the trigger policy is set to 2, that is, when there are two items in the window, the trigger will trigger and start executing the Pulsar Function code. This series of behaviors have nothing to do with time. It doesn't matter whether it takes 5 seconds or 5 hours for the window count to reach 2. The important thing is that the window count reaches 2.

Compare the count-based scroll window with the time-based scroll window (the time is set to 10 seconds). After a 10-second interval, the function code is triggered no matter how many events there are in the window. In the following figure, there are seven events in the first window and only three events in the second window.

| | sliding window |

The sliding window count defines the window length, the window length sets the cleanup policy to limit the amount of data to be processed, and the sliding interval defines the trigger policy. Both the rolling window policy and the sliding window policy can be defined based on time (time period) or length (number of data elements).

In the following figure, the window length is 2 seconds, that is, the data from 2 seconds ago is cleared and will not be used for calculation. The sliding interval is 1 second, that is, the Pulsar function code is executed every 1 second. In this way, you can process data throughout the length of the window.

The previous examples define cleanup and trigger policies based on time, or you can define cleanup or trigger policies based on length, or both.

Implementing both types of window function in Pulsar Functions is easy, just specify a java.util.Collection as the input type, as shown below, and specify the appropriate window configuration property in the-userConfig flag when you create the function.

The configuration parameters for implementing the four scenarios of the time window mentioned earlier are as follows:

"- windowLengthCount": number of messages per window

"- windowLengthDurationMs": window time (in milliseconds)

"- slidingIntervalCount": the number of messages after the window slides

"- slidingIntervalDurationMs": the time after the window slides

The correct combination is as follows:

Time, sliding window-windowLengthDurationMs = XXXX

-slidingIntervalDurationMs = XXXX time, Batch Window (that is, scroll window)-windowLengthDurationMs = XXXX length, sliding window-windowLengthCount = XXXX

-slidingIntervalCount = XXXX length, Batch Window (that is, scrolling window)-windowLengthCount = XXXX is what the event handling design pattern based on Pulsar Functions is. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 278

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report