In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
The content of this article mainly focuses on how to use Apache Pulsar Functions for simple event handling. The content of the article is clear and clear. It is very suitable for beginners to learn and is worth reading. Interested friends can follow the editor to read together. I hope you can get something through this article!
Event-based programming
The key feature of event-driven architecture (Event-driven Architecture,EDA) is the core importance of events. In EDA, the event consumer reacts to the arrival of events in a programming style based on event programming (EBP). Unlike batch-oriented or procedural programming, in EBP, the software system responds to receiving one or more event notifications and executes processing, and communicates asynchronously with other software components entirely through events.
Although all event-based applications are different, they usually follow the structure in the following figure, where the event producer introduces events into the intermediate event handling framework, which is responsible for persisting and delivering events to the event consumer.
Figure 1 event-based architecture
The intermediate event handling framework hosts event handler components in addition to event routing. Event handlers get events and can also forward or publish new events, so in a sense they are both event consumer and event producer. However, we will not call these event handlers event producer or event consumer, because we want to distinguish them from entities outside the event handling framework.
Event handler type
In EDA, event handlers are generally divided into the following categories:
Simple event handler: the event arrives and immediately triggers the action in the event handler. In general, if these processors are stateless, all logic is executed only based on the contents of the current event; if it is stateful, messages can be retained across calls to perform slightly more complex logic.
Complex event handlers: such event handlers process a series of events and perform more complex pattern analysis to identify meaningful patterns or relationships, such as detecting event correlation, causality, or timing. Typical use cases are typically used in e-commerce, fraud detection, network security, financial transactions, and other environments that require immediate response.
Event processing network
The degree of event-based application usually consists of many event handlers arranged in a specific order or flow. We call the collection of event producer, event handler, event consumer as event processing network. Event processing networks are used to solve one or more specific business problems.
Figure 2 event processing network
As shown in figure 2, the external event producer and event consumer are at the edge, with multiple event handlers in the middle. Figure 2 shows the flow of events between event handlers. These arrows, also known as implicit channels, are used to push events directly from one event handler to another. When implemented using Apache Pulsar, topic is these implicit channels.
Figure 2 also shows another way of communicating between event handlers: shared state management. Event handlers generally need to retain the computational state between multiple events, so the event handling architecture needs to provide a mechanism to persist state information and allow event handlers to access it directly. Shared state provides another mechanism for sharing information between event handlers and supports stateful event handling, which we will discuss in more detail in the next section.
Multiple event-based applications can be associated with a single event type. The figure above shows the process from an event-based application (blue) to another application. Before sending the first application to event consumer 2, output it to both event consumer 1 and another event handler for further processing.
There are many reasons for linking event-based applications together, such as a scenario where you need to monitor IoT sensor read patterns or exceptions, and you also want to store these events for a long time (a storage platform such as HDFS or Amazon S3) to train the data model.
The first-level event handler sequence first handles the ETL- type of the event, that is, converting the event to a consumable format. These records will be sent to event consumer 1, in this case HDFS. At the same time, we also want to forward the cleaned events to the secondary event handler sequence that implements the exception detection workflow. We will discuss how to use Apache Pulsar Functions as a framework in the next section, which uses simple programming logic functions to implement event-based processing.
Using Apache Pulsar Functions for event-based programming
Apache Pulsar Functions provides an easy-to-use framework for developers to use Functions to create or deploy processing logic that is executed by Apache Pulsar. You can write simple or complex function in Java or Python and deploy these function to the Pulsar cluster without running a separate stream processing engine. Pulsar Functions is a lightweight computing framework with the following characteristics:
Executes when a message is sent to the specified input topic.
Apply user-defined processing logic to each message.
Publish the calculation results to one or more topic.
Figure 3 Pulsar Functions programming model
You can write Pulsar Functions in Java and Python in two ways:
Using native language interfaces, no Pulsar-specific libraries or special dependencies are required. For example, to implement a Pulsar Function in Java, you only need to write a class that implements the java.util.Function interface, as follows:
Import java.util.Function;public class EchoFunction implements Function {public String apply (String input) {/ / Logic Here}}
Using Pulsar Functions SDK, take advantage of specific Pulsar libraries that provide a range of functions that are not available in native interfaces, such as state management capabilities provided by org.apache.pulsar.functions.api.Context objects.
Import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function
Public interface Function {O process (I input, Context context) throws Exception;}
Native language approach provides a clear, API-free Pulsar Functions writing method, which is very suitable for the development of stateless event handlers. However, this approach does not support access to previous state information.
Deploy Apache Pulsar Functions
After compiling and testing the Pulsar Functions, you need to deploy the Pulsar Functions to the Pulsar cluster. Pulsar Functions is designed to support multiple deployment scenarios. Currently, there are two ways to run Pulsar Functions:
Local mode: when running in this mode, Pulsar Function will run on the machine on which the command is executed (such as laptops, edge nodes, etc.). Here is an example of running a command locally:
$bin/pulsar-admin functions localrun\-py myfunc.py\-className myfunc.SomeFunction-inputs input-topic-1-outputs output-topic-1
Cluster mode: when running in cluster mode, the Pulsar Function code will be uploaded to broker in the Pulsar cluster and run with broker instead of running in the local environment. You can create a function in cluster mode using the command shown below, which is executed on the Pulsar broker node.
$bin/pulsar-admin functions create\-jar target/my-functions.jar\-className org.example.functions.MyFunction\-inputs input-topic-1\-outputs output-topic-1\-parallelism 4\-cpu 2\-ram 8589934592\-disk 10737418240
The above command enables four org.example.functions.MyFunction instances, each with 2 CPU cores, 8 GB RAM, and 10 GB disk space. (note that RAM and disk need to be set in bytes, and CPU and disk must be set in a Docker environment. )
There is also a way to provide user configuration properties when creating a Pulsar Function, which is useful when you need to reuse function. We pass in a set of key-value pairs in the following command by specifying a JSON string for the userConfig property. At run time, the values passed in can be accessed by using the Pulsar Functions Context object of Pulsar Functions SDK, which we will discuss in more detail in the next section.
$bin/pulsar-admin functions create\-jar target/my-functions.jar\-className org.example.functions.MyFunction\-inputs input-topic-1\-outputs output-topic-1\-parallelism 4\-cpu 2\-ram 8589934592\-disk 10737418240\-userConfig'{"key-1": "value-1", "key-2", "value-2"} 'Best practices for using Apache Pulsar Functions SDK
The Context objects defined in Java and Python SDK provide a variety of information and functions to function, including the ability to retain intermediate results that can be used to provide stateful event processing. The following example is the information contained in the Context object:
The name and ID of the Pulsar Function
Message ID for each message. Each Pulsar message is automatically assigned an ID.
The name of the topic that sent the message
Names of all input topic, output topic associated with function
Class name of the SerDe
Tenants and namespaces associated with function
ID of the Pulsar Functions instance running function
The version of Function
Logger object used by Function, which can be used to create function log messages
Access any user configuration values provided through CLI
Record the interface of metric
Next, we will introduce some usage patterns that take advantage of the properties of the Context object.
Best practice 1: dynamic configuration
When running or updating a Pulsar Functions created with SDK, you can use-userConfig flag to pass in any key / value from the command line. The key / value must be specified as JSON. The following example creates a function and passes in a user key / value.
$bin/pulsar-admin functions create\-name word-filter\-userConfig'{"filter", "$.sensors {? (@ .Type = = 'Temp')]"}'\ # Other function configs
This feature allows us to write generic function that can be used multiple times, but the configuration is slightly different. For example, suppose you want to write a function that filters JSON events based on JSON path expressions. When the event arrives, compare its contents with the configured expression and filter out mismatched entry.
It is clear that the behavior of the function depends entirely on the JSON path expression it filters. In order to use function multiple times, we use Pulsar SDK and don't specify this path expression until function is deployed.
As shown in the example above, the value of the JSON path filter to be used is unknown at compile time and needs to be obtained from Context using the getUserConfigValueOrDefault method.
Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.jayway.jsonpath.JsonPath
Public JsonPathFilterFunction implements Function {
String process (String input, Context context) throws Exception {/ / Get the filter from the context String filter = context.getUserConfigValueOrDefault ("filter", "$") .toString (); Object filtered = JsonPath.read (input, filter); Return filtered.toString ();}} Best practices 2: stateful event handlers
The stateful event handler uses the memory of previous events to generate output. The ability to store state is a key component in dealing with multiple events. In the Apache Pulsar Function framework, state information is stored in an Apache BookKeeper-based private key-value store. Pulsar SDK accesses state information through the Context object.
Figure 4 Apache Pulsar state management
Let's give an example to explain the status agent. Suppose there is an application to get temperature reading events from Internet of things sensors, and we want to know the average temperature of the sensor, we can use event handling agent to continuously update the temperature average with the following function:
Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function
Public AvgTempFunction implements Function {
Float process (Float currentTemp, Context context) {/ / Increment and get counter context.incrCounter ("num-measurements"); Integer n = context.getCounter ("num-measurements"); / / Calculate new average based on old average and count Float old_average = context.getState ("avg-temp"); Float new_average = (old_average * (nmur1) + currentTemp) / n Context.putState ("avg-temp", new_average); return new_average;}} Best practices 3:Void Funtions
Pulsar Functions can publish the results to one or more output topic, but may not publish the results. You can also use function to generate logs only and write the results to an external database, or only to monitor exceptions in the flow. The function in the following example only logs received events: import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger
Public LogFunction implements Function {
Void process (String input, Context context) throws Exception {Logger LOG = context.getLogger (); LOG.info ("Received {}", input); return null;}}
When using Java function with an output type of Void, function must always return null. Function with no Void can return null when you do not want to generate an output event, for example, when you are using a filter but do not want an event to be handled.
Best practice 4: handling events from multiple input topic
As shown in figure 3, Pulsar Functions can consume events in multiple topic. Let's take a look at how to write such a function:import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function.
Public MultiTopicFunction implements Function {
String process (String input, Context context) throws Exception {String sourceTopic = context.getSourceTopic (); if (sourceTopic.equals ("TopicA") {/ / parse as TopicA Object} else if (sourceTopic.equals ("TopicB") {/ / parse as TopicB Object} else if (sourceTopic.equals ("TopicC") {/ / parse as TopicC Object}... . }}
As you can see from the code, we first get the name of the input topic from the Context object, and then parse / handle the event accordingly based on the name of the input topic.
Best practices 5:Metric Collection
Apache Pulsar SDK provides a metric collection mechanism that can be used to record any user-defined metric of your choice. In the following example, we use a separate metric to track the total number of calls to the function and another metric to track the number of calls to the function with invalid input. For more instructions on reading and using metric, see the monitoring guide. Import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function
Public MetricFunction implements Function {
Void process (String input, Context context) throws Exception {context.recordMetric ("invocation count", 1); if (input < 0) {context.recordMetric ("Invalid data", 1);} return null }} Thank you for your reading. I believe you have some understanding of "how to use Apache Pulsar Functions for simple event handling". Go ahead and practice it. If you want to know more about it, you can follow the website! The editor will continue to bring you better articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.