Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Java's method of using Kafka

2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "how Java uses Kafka". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how Java uses Kafka".

1. Maven dependency

Org.apache.kafka kafka-clients 0.11.0.0

2 、 Producer

2.1.The producer sends messages

Import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;/** * @ author Thomas * @ Description: the simplest kafka producer * @ date 22:18 2019-7-5 * / public class ProducerDemo {public static void main (String [] args) {Properties properties = new Properties () / / zookeeper server cluster addresses, separated by commas from properties.put ("bootstrap.servers", "172.16.0.218 bootstrap.servers 9092172.16.0.219bootstrap.servers"); properties.put ("acks", "all"); properties.put ("retries", 0); properties.put ("batch.size", 16384); properties.put ("linger.ms", 1) Properties.put ("buffer.memory", 33554432); properties.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); / / Custom producer interceptor properties.put ("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor") / / Custom message routing rules (to which Partition messages are sent) / / properties.put ("partitioner.class", "com.lt.kafka.producer.MyPartition"); Producer producer = null; try {producer = new KafkaProducer (properties); for (int I = 20; I)

< 40; i++) { String msg = "This is Message:" + i; /** * kafkaproducer中会同时调用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。 * 关键源码:Callback interceptCallback = this.interceptors == null * callback : new InterceptorCallback(callback, * this.interceptors, tp); */ producer.send(new ProducerRecord("leixiang", msg),new MyCallback()); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer!=null) producer.close(); } }} 2.2、自定义producer拦截器 import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;/** * @author Thomas * @Description:自定义producer拦截器 * @date 22:21 2019-7-5 */public class MyProducerInterceptor implements ProducerInterceptor{ /** * 打印配置相关信息 */ public void configure(Map configs) { // TODO Auto-generated method stub System.out.println(configs.toString()); } /** * producer发送信息拦截方法 */ public ProducerRecord onSend(ProducerRecord record) { System.out.println("拦截处理前============="); String topic=record.topic(); String value=record.value(); System.out.println("拦截处理前的消息====:"+value); ProducerRecord record2=new ProducerRecord(topic, value+" (intercepted)"); System.out.println("拦截处理后的消息:"+record2.value()); System.out.println("拦截处理后==============="); return record2; } /** * 消息确认回调函数,和callback的onCompletion方法相似。 * 在kafkaProducer中,如果都设置,两者都会调用。 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (metadata != null) System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString()); if (exception != null) exception.printStackTrace(); } /** * interceptor关闭回调 */ public void close() { System.out.println("MyProducerInterceptor is closed!"); }} 2.3、自定义消息路由规则 自定义路由规则,可以根据自己的需要定义消息发送到哪个分区。自定义路由规则需要实现Partitioner。 import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;/** * @author Thomas * @Description: * @date 22:24 2019-7-5 */public class MyPartition implements Partitioner { public void configure(Map arg0) { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) { // TODO Auto-generated method stub return 0; }}3、Consumer 3.1、自动提交 import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:26 2019-7-5 */public class AutoCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "true"); //想要读取之前的数据,必须加上 //props.put("auto.offset.reset", "earliest"); /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); /* * 一旦consumer和kakfa集群建立连接, * consumer会以心跳的方式来高速集群自己还活着, * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。 //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); @SuppressWarnings("resource") KafkaConsumer consumer = new KafkaConsumer(props); try { /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { //轮询数据。如果缓冲区中没有数据,轮询等待的时间为毫秒。如果0,立即返回缓冲区中可用的任何记录,则返回空 ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } }} 3.2、手动提交 import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:28 2019-7-5 */public class ManualCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "false");//手动确认 /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest");//想要读取之前的数据,必须加上 /* * 一旦consumer和kakfa集群建立连接, * consumer会以心跳的方式来高速集群自己还活着, * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。 props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); KafkaConsumer consumer = new KafkaConsumer(props); /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { //处理消息 saveMessage(record); //手动提交,并且设置Offset提交回调方法 //consumer.commitAsync(new MyOffsetCommitCallback()); consumer.commitAsync(); } } } public static void saveMessage(ConsumerRecord record){ System.out.printf("处理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }} 自定义Consumer拦截器 import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:29 2019-7-5 */public class MyConsumerInterceptor implements ConsumerInterceptor{public void configure(Map configs) { System.out.println("MyConsumerInterceptor configs>

> "+ configs.toString ();} public ConsumerRecords onConsume (ConsumerRecords records) {System.out.println (" onConsume "); return records;} public void onCommit (Map offsets) {System.out.println (" onCommit ");} public void close () {System.out.println (" MyConsumerInterceptor is closed! ");}}

Custom Offset submission callback method

Import java.util.Map;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;/** * @ author Thomas * @ Description: * @ date 22:31 2019-7-5 * / public class MyOffsetCommitCallback implements OffsetCommitCallback {public void onComplete (Map offsets, Exception exception) {if (offsets! = null) System.out.println ("offsets >" + offsets.toString ()) If (exception! = null) exception.printStackTrace ();}} at this point, I believe you have a deeper understanding of "Java's method of using Kafka". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report