In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains "how to use KafkaAPI-ProducerAPI". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn how to use KafkaAPI-ProducerAPI.
1. Message sending process
Kafka's Producer sends messages asynchronously. In the process of sending the message, two threads are involved-- the main thread and the Sender thread, and a thread sharing the variable-- RecordAccumulator. The main thread sends messages to the RecordAccumulator, and the Sender thread constantly pulls messages from the RecordAccumulator and sends them to Kafka broker.
Related parameters:
Batch.size: sender will not send data until the data has been accumulated to batch.size.
Linger.ms: if the data is too late to reach batch.size, sender will send the data after waiting for linger.time.
two。 Asynchronously send API1) Import dependency
Org.apache.kafka kafka-clients 2.7.02) write code
The classes you need to use:
KafkaProducer: you need to create a producer object to send data
ProducerConfig: get a series of required configuration parameters
ProducerRecord: each piece of data should be encapsulated into a ProducerRecord object
2.1). APIimport org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class MyProducer {public static void main (String [] args) {/ / producer configuration information without callback function can be fetched Key / / 1 from ProducerConfig. Create the configuration information of the kafka producer Properties properties=new Properties (); / / 2. Specify the connected kafka cluster / / properties.put ("bootstrap.servers", "192.168.1.106 all 9091"); properties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.106 bootstrap.servers"); / / 3.ACK reply level / / properties.put ("acks", "all"); properties.put (ProducerConfig.ACKS_CONFIG, "all"); / / 4. Number of retries / / properties.put ("retries", 3); properties.put (ProducerConfig.RETRIES_CONFIG,3); / / 5. Batch size 16k / / properties.put ("batch.size", 16384); properties.put (ProducerConfig.BATCH_SIZE_CONFIG,16384); / / 6. Wait time / / properties.put ("linger.ms", 1); properties.put (ProducerConfig.LINGER_MS_CONFIG,1); / / 7.RecordAccumulator buffer size 32m properties.put (ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); / / properties.put ("buffer.memory", 33554432) / / properties.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); / / properties.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") Properties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") / / 9. Create the producer object KafkaProducer producer = new KafkaProducer (properties); / / 10. Send data for (int I = 0; I
< 10; i++) { ProducerRecord producerRecord = new ProducerRecord("first","atguigu--"+i); producer.send(producerRecord); } //11.关闭资源 producer.close(); }}2.2) 带回调函数的API 回调函数会在 producer 收到 ack 时调用,为异步调用, 该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CallBackProducer { public static void main(String[] args) { //生产者配置信息可以从ProducerConfig中取Key Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //创建生产者对象 KafkaProducer producer = new KafkaProducer(properties); /*创建topic /opt/kafka/kafka03/bin/kafka-topics.sh --create --zookeeper 192.168.1.106:2181,192.168.1.106:2182,192.168.1.106:2183 --replication-factor 3 --partitions 2 --topic aaa * */ //发送数据 for (int i = 0; i < 10; i++) { ProducerRecord producerRecord = new ProducerRecord("bbb","d","bbb-atguigu++"+i); producer.send(producerRecord, (recordMetadata, e) ->{if (e==null) {System.out.println ("aaa" + recordMetadata.partition () + "-" + recordMetadata.offset ());} else {e.printStackTrace ();}});} / / 11. Close the resource producer.close ();}} 3. Send API synchronously
Synchronous sending means that after a message is sent, the current thread is blocked until ack is returned. Because the send method returns a Future object, according to the characteristics of the Futrue object, we can also achieve the effect of sending synchronously, as long as we call the get of the Future object.
/ / 10. Send data for (int I = 0; I
< 10; i++) { ProducerRecord producerRecord = new ProducerRecord("first","atguigu--"+i); producer.send(producerRecord).get(); }4.自定义分区器 默认分区策略源码: org.apache.kafka.clients.producer.internals.DefaultPartitioner 1.1. 自定义分区器代码:import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import java.util.List;import java.util.Map;public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /*自定义分区规则*/ List partitionInfos = cluster.availablePartitionsForTopic(topic); Integer integer =partitionInfos.size(); return key.toString().hashCode()%integer; /*指定分区*/ /* return 1;*/ } @Override public void close() { } @Override public void configure(Map map) { }}1.2. 生产者使用自定义分区器//配置方法properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner"); 完整代码: import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class PartitionProducer { public static void main(String[] args) { //生产者配置信息可以从ProducerConfig中取Key Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //配置分区器的全类名 partitioner.class properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner"); //创建生产者对象 KafkaProducer producer = new KafkaProducer(properties); //发送数据 for (int i = 0; i < 10; i++) { ProducerRecord producerRecord = new ProducerRecord("bbb","d","bbb-atguigu++"+i); producer.send(producerRecord, (recordMetadata, e) ->{if (e==null) {System.out.println (recordMetadata.topic () + "-" + recordMetadata.partition () + "-" + recordMetadata.offset ());} else {e.printStackTrace ();}});} / / 11. Close the resource producer.close ();}} Thank you for reading, the above is the content of "how to use KafkaAPI-ProducerAPI". After the study of this article, I believe you have a deeper understanding of how to use KafkaAPI-ProducerAPI, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.