Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the use of Producer and Consumer in kafka java code

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces you how to understand the use of kafka java code Producer and Consumer, the content is very detailed, interested friends can refer to, hope to be helpful to you.

To consume and send kafka messages with java code, we first have to introduce relevant jar packages

Maven:

Org.apache.kafka kafka_2.10 0.8.2.1

Gradle:

Compile ("org.apache.kafka:kafka_2.10:0.8.2.1")

In the new version of kafka (the specific version is not clear), the producer,consumer implemented with java code is still Scala, and the previous producer and consumer were written by Scala. Here we introduce the java version of producer.

There is another point that requires special attention:

When we do not specify key when sending a message, the mechanism for producer to distribute the message to each partition is as follows:

Scala version of producer: when your producer starts, you randomly get a partition, and then the following messages will be sent to this partition, that is, as long as the program starts, the producer will send messages to the same partition.

Java version of producer: each partition is polled, so it is sent more evenly.

So when using the Scala version of producer, try to pass in key to ensure that the message is average in partition.

Here is the specific code:

Import java.io.Serializable;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import org.apache.commons.lang.SerializationUtils;import org.apache.kafka.clients.producer.KafkaProducer Import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import cn.qlt.study.domain.User;public class KafkaUtil {private static KafkaProducer producer=null; private static ConsumerConnector consumer=null Static {/ / producer configuration file. For more information, please see the source code of ProducerConfig class, or refer to the official website to introduce Map config=new HashMap (); / / the address of kafka server is config.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.90 kafka 9092192.168.100.91)) / / kafka message serialization class is about to serialize the incoming object into a byte array config.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") / / kafka message key serialization class if the value of key is passed in, the hash hash is performed according to the value of the key to calculate the partition on which config.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); config.put (ProducerConfig.BATCH_SIZE_CONFIG, 1024 / 1024 / 5) / / interval between messages submitted to the kafka server, 0 will be submitted immediately without waiting for config.put (ProducerConfig.LINGER_MS_CONFIG,0); / / Consumer profile Properties props = new Properties (); / / zookeeper address props.put ("zookeeper.connect", "192.168.100.90 ProducerConfig.LINGER_MS_CONFIG,0 2181") / Group id props.put ("group.id", "123"); / / interval between automatic submission of consumption props.put (" auto.commit.interval.ms "," 1000 "); ConsumerConfig consumerConfig=new ConsumerConfig (props); producer=new KafkaProducer (config); consumer=Consumer.createJavaConsumerConnector (consumerConfig) } / * launch a consumer program * @ param topic to consume topic name * @ param handler own implementation of processing logic * @ param threadCount consumption threads, this value should be less than or equal to the number of partition Too much is no use * / public static void startConsumer (String topic,final MqMessageHandler handler,int threadCount) throws Exception {if (threadCount)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report