Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Pulsar IO

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail about Pulsar IO, the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

Apache Pulsar is an industry-leading messaging system. When using a messaging system, a more common question is: what is the best way to move data into or out of the messaging platform?

Of course, users can use Pulsar's consumer and producer API to write custom code to transfer data. But apart from that, is there any other way?

The following are some related questions raised by users:

1. Where should I run the program to publish data to Pulsar or use data in Pulsar?

two。 How should I run the program to publish data to Pulsar or use data in Pulsar?

Users ask these questions because other message / publish-subscribe systems do not provide an organized and fault-tolerant way to help users import data from or export data to an external system, so users need to find custom solutions and run them manually.

In order to solve the above problems and simplify the process, we launched Pulsar IO.

Pulsar IO imports / outputs data by leveraging existing Pulsar Functions frameworks. All the advantages of Pulsar Functions framework (such as fault tolerance, parallelism, flexibility, load balancing, on-demand updates, etc.) can be directly utilized by Pulsar applications that import / export data.

Moreover, we often find that users spend a lot of effort (because they are not experts in messaging systems and may not want to be experts in this field) to write custom programs to access data from the messaging system.

Not only is it difficult to customize and write these applications, but we find that many users do the same thing when trying to implement applications that perform the same functions. In the final analysis, messaging systems are just tools for moving data, so one of our main goals when designing the Pulsar IO framework is ease of use.

We want users to be able to import data from or export data to external systems without writing any code or becoming both Pulsar and external system experts.

What does the Pulsar IO framework look like?

First, we define two applications, one to enter data into Pulsar as source and the other to receive data from Pulsar as sink.

Source imports data from an external system to Pulsar, while sink exports data from Pulsar to an external system. Specifically, source reads data from an external system and writes it to Pulsar topic, while sink reads data from one or more Pulsar topic and writes it to an external system.

The Pulsar IO framework runs on the existing Pulsar functions framework. A single source and sink can run with Pulsar broker like function, as shown in the following figure.

Therefore, all the advantages of the Pulsar Functions framework apply to the Pulsar IO framework, that is, sink and source applications.

As mentioned earlier, our design goals include that users can move data into or out of Pulsar without writing any custom applications or code.

As a result, there are a variety of built-in source and sink in the Pulsar IO framework (Kafka, Twitter Firehose, Cassandra, Aerospike, and so on, and more) that users can run with a single command. Users can therefore focus on the business logic without worrying about the implementation details.

How to use Pulsar IO

Using the Pulsar IO framework is easy. Users can start the built-in source or sink with one simple command in the command line interface. For example, users can submit source to an existing Pulsar cluster with the following command, in the following format:

$. / bin/pulsar-admin source create\-tenant\-namespace\-name\-destinationTopicName\-source-type

The following example is a command that runs twitter firehose source to import data from Twitter into Pulsar:

$. / bin/pulsar-admin source create\-tenant test\-namespace ns1\-name twitter-source\-destinationTopicName twitter_data\-sourceConfigFile examples/twitter.yml\-source-type twitter

After the above steps, users can enter data into Pulsar without having to write or compile any code. The only thing that may be needed is a configuration file that specifies some configuration for the source or sink. Users can submit the built-in sink to be run to the existing Pulsar cluster through the following format command:

$. / bin/pulsar-admin sink create\-tenant\-namespace\-name\-inputs\-sink-type

The following is an example command to run Cassandra sink to export data from Pulsar to Cassandra:

$. / bin/pulsar-admin sink create\-tenant public\-namespace default\-name cassandra-test-sink\-sink-type cassandra\-sinkConfigFile examples/cassandra-sink.yml\-inputs test_cassandra

For more information about how to run Cassandra source, see the Quick start Guide:

Https://pulsar.apache.org/docs/en/2.1.1-incubating/io-quickstart/

The above command shows how to run source and sink in "cluster" mode (that is, as part of an existing Pulsar cluster). In addition, source and sink can be run as separate processes in "local run" mode, which generates local processes on the machine and runs source or sink logic.

The local operation mode is helpful for testing and debugging, but it needs to be monitored and supervised by the user. The following is an example of a command to run source in local run mode:

$. / bin/pulsar-admin sink localrun\-tenant public\-namespace default\-name cassandra-test-sink\-sink-type cassandra\-sinkConfigFile examples/cassandra-sink.yml\-inputs test_cassandra

Because the Pulsar IO framework runs on Pulsar Functions, you can update source or sink dynamically by updating parameters and configurations. For example, when you want to enter data into another Pulsar topic using the Twitter firehose source mentioned earlier, you can execute the following command:

$. / bin/pulsar-admin source update\-tenant test\-namespace ns1\-name twitter-source\-destinationTopicName twitter_data_2\-sourceConfigFile examples/twitter.yml\-source-type twitter

You can also update sink with commands in the same format. Most updates to source and sink can be configured at run time to simplify the process of modification, testing, deployment, and so on.

If you want to customize the implementation of a niche use case, you can create a source or sink by implementing a simple interface. However, the purpose of Pulsar IO is to help users directly use the existing built-in source or sink without having to implement source or sink manually.

? Implement a custom source

To create a custom source, you need to write a Java class that implements the source interface:

Public interface Source extends AutoCloseable {/ * * Open source with configuration * * @ param config initialization config * @ throws Exception IO type exceptions when opening a connector * / void open (final Map config) throws Exception; / * Reads the next message from source. * If source does not have any new messages, this call should block. * @ return next message from source. The return result should never be null * @ throws Exception * / Record read () throws Exception;}

This is a simple example of a source implementation:

Public class TestSource implements Source {private int i = 0; @ Override public void open (Map config) throws Exception {} @ Override public Record read () throws Exception {return ()-> iTunes;} @ Override public void close () throws Exception {}}

In the source example above, monotonously incrementing integers are passed into Pulsar.

Objects that implement the "Record" interface need to be returned through the "read" method, because the "Record" interface contains fields that can be used to implement different messaging semantics or guarantees, such as exactly-once/effectively-once. In future articles, I will discuss how to do this in detail.

? Implement a custom sink

To create a custom sink, you need to write a Java class that implements the sink interface:

Public interface Sink extends AutoCloseable {/ * Open Sink with configuration * * @ param config initialization config * @ throws Exception IO type exceptions when opening a connector * / void open (final Map config) throws Exception; / * Write a message to Sink * @ param inputRecordContext Context of value * @ param value value to write to sink * @ throws Exception * / void write (RecordContext inputRecordContext, T value) throws Exception;}

For example, a simple sink implementation:

Public class TestSink implements Sink {private static final String FILENAME = "/ tmp/test-out"; private BufferedWriter bw = null;private FileWriter fw = null;@Overridepublic void open (Map config) throws Exception {File file = new File (FILENAME); / / if file doesnt exists, then create itif (! file.exists ()) {file.createNewFile ();} fw = new FileWriter (file.getAbsoluteFile (), true); bw = new BufferedWriter (fw) } @ Overridepublic void write (RecordContext inputRecordContext, String value) throws Exception {try {bw.write (value); bw.flush ();} catch (IOException e) {throw new RuntimeException (e);}} @ Overridepublic void close () throws Exception {try {if (bw! = null) bw.close (); if (fw! = null) fw.close () } catch (IOException ex) {ex.printStackTrace ();}

The above example shows how sink reads data from Pulsar and writes to a file. Similar to the source interface, the "write" method in the sink interface has a RecordContext parameter. This parameter provides sink with the context of the value that needs to be written to the external system.

The RecordContext parameter can be used to implement sink that can provide different levels of messaging semantics or guarantees (such as Exactly-once/Effective-once). We will discuss this in more depth in subsequent articles.

Users can submit custom source and sink in a way similar to running built-in source and sink:

$. / bin/pulsar-admin source create\-className\-jar\-tenant\-namespace\-name\-destinationTopicName

An example of the command is as follows:

$. / bin/pulsar-admin source create\-className org.apache.pulsar.io.twitter.TwitterFireHose\-jar\ ~ / application.jar\-tenant test\-namespace ns1\-name twitter-source\-destinationTopicName twitter_data

The format of the command to submit a custom sink to run in an existing Pulsar cluster is as follows:

$. / bin/pulsar-admin sink create\-className\-jar\-tenant test\-namespace\-name\-inputs

Example of the command:

$. / bin/pulsar-admin sink create\-className org.apache.pulsar.io.cassandra\-jar\ ~ / application.jar\-tenant test\-namespace ns1\-name cassandra-sink\-inputs test_ topic```

Advantages of using the Pulsar IO framework

As mentioned above, the Pulsar IO framework runs on top of the existing Pulsar Functions framework. Pulsar IO makes full use of the existing Pulsar Functions framework. As part of Pulsar IO, source and sink have all the advantages of Pulsar Functions:

So much for sharing about Pulsar IO. I hope the above content can help you to some extent and learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report