Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to carry on the simple analysis of Pulsar Kafka Client

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

What this article shares with you is a simple analysis of how to carry out Pulsar Kafka Client. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

In order to facilitate Kafka users to use Pulsar,Pulsar to do some packaging of Kafka Client, so that Kafka users more convenient to use Pulsar.

Here's how Kafka Client sends messages to Pulsar, consumes messages from Pulsar, and how to use Pulsar Schema.

⌨️ introduces dependency

Org.apache.pulsar pulsar-client-kafka {project.version} relies on the client that introduced version 0.10.2.1 of Kafka, as well as the client encapsulated by Pulsar to Kafka Client.

⌨️ uses Kafka Schema

> add producer code

String topic = "persistent://public/default/test"

Properties props = new Properties (); props.put ("bootstrap.servers", "pulsar://localhost:6650")

Props.put ("key.serializer", IntegerSerializer.class.getName (); props.put ("value.serializer", StringSerializer.class.getName ())

Producer producer = new KafkaProducer (props)

For (int I = 0; I

< 10; i++) { producer.send(new ProducerRecord(topic, i, Integer.toString(i)));} producer.close();在上述配置中 topic 是指 Pulsar 中的 Topic,接着使用 Kafka 的配置方式来初始化各种配置,包括 Server 地址、key 的序列化与 value 的序列化类,然后构造一个 ProducerRecord 的类将其发送出去。 >

> > add consumer code

String topic = "persistent://public/default/test"

Properties props = new Properties (); props.put ("bootstrap.servers", "pulsar://localhost:6650"); props.put ("group.id", "my-subscription-name"); props.put ("enable.auto.commit", "false"); props.put ("key.deserializer", IntegerDeserializer.class.getName ()); props.put ("value.deserializer", StringDeserializer.class.getName ())

@ SuppressWarnings ("resource") Consumer consumer = new KafkaConsumer (props); consumer.subscribe (Arrays.asList (topic))

While (true) {ConsumerRecords records = consumer.poll; records.forEach (record-> {log.info ("Received record: {}", record);})

/ / Commit last offset consumer.commitSync ();} some configurations are similar to those of producer code, such as topic,Server, and so on. In addition, group.id of Kafka is used as the subscription name in the configuration Pulsar, autocommit is turned off, and deserialized classes are configured for key and value on the consumer side. Then, similar to regular consumers, they begin to consume messages.

⌨️ uses Pulsar Schema

In the above case, Kafka's Schema is used for serialization and deserialization, and of course, Pulsar's Schema is also supported for this process. Here is a brief introduction using AVRO. First define the pojo class that Schema needs to use. @ Data@ToString@EqualsAndHashCodepublic class Foo {@ Nullable private String field1; @ Nullable private String field2; private int field3;}

@ Data@ToString@EqualsAndHashCodepublic class Bar {private boolean field1;}

> producer side code

String topic = "persistent://public/default/test-avro"

Properties props = new Properties (); props.put ("bootstrap.servers", "pulsar://localhost:6650")

Props.put ("key.serializer", IntegerSerializer.class.getName (); props.put ("value.serializer", StringSerializer.class.getName ())

AvroSchema barSchema = AvroSchema.of (SchemaDefinition.builder (). WithPojo (Bar.class). Build ()); AvroSchema fooSchema = AvroSchema.of (SchemaDefinition.builder (). WithPojo (Foo.class). Build ())

Bar bar = new Bar (); bar.setField1 (true)

Foo foo = new Foo (); foo.setField1 ("field1"); foo.setField2 ("field2"); foo.setField3 (3)

Producer producer = new KafkaProducer (props, fooSchema, barSchema)

For (int I = 0; I

< 10; i++) { producer.send(new ProducerRecord(topic, i, foo, bar)); log.info("Message {} sent successfully", i);} producer.close(); 可以看到大部分配置同上面使用 Kafka Client 的配置是类似的,但是中间加入了一些 Pulsar 的 Schema,使用 Foo 作为 key,使用 Bar 类作为 value。 >

> > Consumer side code

String topic = "persistent://public/default/test-avro"

Properties props = new Properties (); props.put ("bootstrap.servers", "pulsar://localhost:6650"); props.put ("group.id", "my-subscription-name"); props.put ("enable.auto.commit", "false"); props.put ("key.deserializer", IntegerDeserializer.class.getName ()); props.put ("value.deserializer", StringDeserializer.class.getName ())

AvroSchema barSchema = AvroSchema.of (SchemaDefinition.builder (). WithPojo (Bar.class). Build ()); AvroSchema fooSchema = AvroSchema.of (SchemaDefinition.builder (). WithPojo (Foo.class). Build ())

Bar bar = new Bar (); bar.setField1 (true)

Foo foo = new Foo (); foo.setField1 ("field1"); foo.setField2 ("field2"); foo.setField3 (3)

@ SuppressWarnings ("resource") Consumer consumer = new PulsarKafkaConsumer (props, fooSchema, barSchema); consumer.subscribe (Arrays.asList (topic))

While (true) {ConsumerRecords records = consumer.poll; records.forEach (record-> {log.info ("Received record: {}", record);})

/ / Commit last offset consumer.commitSync ();}

The consumer side also has a similar configuration, using the same Schema as the producer side for data deserialization.

The above is how to carry on the simple analysis of Pulsar Kafka Client, the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report