Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Kafka Note arrangement (2): use of Kafka Java API

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

The following test code uses the following topic:

$kafka-topics.sh-- describe hadoop-- zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs: Topic:hadoop Partition: 0 Leader: 103 Replicas: 103101102 Isr: 103101102 Topic:hadoop Partition: 1 Leader: 101 Replicas: 101102103 Isr: 101102103 Topic:hadoop Partition: 2 Leader: 102103101 Isr: 102103101Kafka Java API producer

On the instructions for the use of producer API, you can check the code comments of this class org.apache.kafka.clients.producer.KafkaProducer, there is a very detailed description, the following directly gives the program code and testing.

The program code KafkaProducerOps.javapackage com.uplooking.bigdata.kafka.producer;import com.uplooking.bigdata.kafka.constants.Constants;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import java.util.Random;/** * produces relevant data to Kafka topic through this KafkaProducerOps *

* Producer * / public class KafkaProducerOps {public static void main (String [] args) throws IOException {/ * specially load configuration file * configuration file format: * key=value * * minimize hard coding in the code * do not write the code to death, but configure * / Properties properties = new Properties () InputStream in = KafkaProducerOps.class.getClassLoader () .getResourceAsStream ("producer.properties"); properties.load (in) / * * two generic parameters * the first generic parameter: refers to the type of record key in kafka * the second generic parameter: refers to the type of record value in kafka * / String [] girls = new String [] {"Yao Huiying", "Liu Qian", "Zhou Xin", "Yang Liu"} Producer producer = new KafkaProducer (properties); String topic = properties.getProperty (Constants.KAFKA_PRODUCER_TOPIC); String key = "1"; String value = "Today's girls are beautiful"; ProducerRecord producerRecord = new ProducerRecord (topic, key, value); producer.send (producerRecord); producer.close ();} Constants.javapackage com.uplooking.bigdata.kafka.constants Constant of key produced by public interface Constants {/ * / String KAFKA_PRODUCER_TOPIC = "producer.topic" } producer.properties## Producer Basics # # list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2... bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# specify the compression codec for all data generated: none, gzip, snappy Lz4compression.type=none# name of the partitioner class for partitioning events Default partition spreads data randomly# partitioner.class=# the maximum amount of time the client will wait for the response of a request#request.timeout.ms=# how long `KafkaProducer.send` and `Kafk aProducer.partitionsFor` will block for#max.block.ms=# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together#linger.ms=# the maximum size of a request in bytes#max.request.size=# the default batch size in bytes when batching multiple records sent to a partition#batch.size=# the total bytes of memory The producer can use to buffer records waiting to be sent to the server#buffer.memory=# sets custom topicproducer.topic=hadoopkey.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer

In fact, this configuration file is the configuration file under the kafka conf directory, but it needs to be modified accordingly. For the meaning of each field, you can check the code comments of the org.apache.kafka.clients.producer.KafkaProducer class.

test

Start the consumer monitoring topic message in the terminal:

[uplooking@uplooking02] $kafka-console-consumer.sh-- topic hadoop-- zookeeper uplooking01:2181

Then execute the producer program and look at the terminal output:

[uplooking@uplooking02] $kafka-console-consumer.sh-- topic hadoop-- zookeeper uplooking01:2181 Today's girls are beautiful. Kafka Java API's consumer program code KafkaConsumerOps.javapackage com.uplooking.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.IOException;import java.io.InputStream;import java.util.Arrays. Import java.util.Collection;import java.util.Properties;public class KafkaConsumerOps {public static void main (String [] args) throws IOException {Properties properties = new Properties (); InputStream in = KafkaConsumerOps.class.getClassLoader (). GetResourceAsStream ("consumer.properties"); properties.load (in); Consumer consumer = new KafkaConsumer (properties); Collection topics = Arrays.asList ("hadoop"); / / Consumer subscription topic consumer.subscribe (topics) ConsumerRecords consumerRecords = null; while (true) {/ / the next step is to pull data from topic consumerRecords = consumer.poll (1000); / / traverse each record for (ConsumerRecord consumerRecord: consumerRecords) {long offset = consumerRecord.offset (); int partition = consumerRecord.partition () Object key = consumerRecord.key (); Object value = consumerRecord.value (); System.out.format ("% d\ t% d\ t% s\ t% s\ n", offset, partition, key, value);} consumer.properties# Zookeeper connection string# comma separated host:port pairs, each corresponding to a zk# server. E.g. "127.0.0.1 zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#consumer group idgroup.id=test-consumer-group#consumer timeout#consumer.timeout.ms=5000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer 3000127.0.0.1" zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#consumer group idgroup.id=test-consumer-group#consumer timeout#consumer.timeout.ms=5000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer test

First execute the consumer's code, and then execute the producer's code, and you can see the following output at the consumer terminal:

The girls today are very beautiful (offset partition key value) partition of Kafka Java API

You can customize the partitioner to decide which partition our messages should be stored on, just implement the Partitioner interface on our code.

The program code MyKafkaPartitioner.javapackage com.uplooking.bigdata.kafka.partitioner;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;import java.util.Random;/** * creates a custom partition and divides it according to the key of the data *

* data can be distributed in different partitions according to the definition of key or value hashCode * requirements: * calculate the model according to the hashCode value and the number of partition entered by the user * / public class MyKafkaPartitioner implements Partitioner {public void configure (Map configs) {} / * * set the relevant partitions according to the given data * * @ param topic topic name * @ param key key * @ param keyBytes key * @ param value value * @ param valueBytes serialized value * @ param cluster metadata information of the current cluster * / public int partition (String topic Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {Integer partitionNums = cluster.partitionCountForTopic (topic) Int targetPartition =-1; if (key = = null | | keyBytes = = null) {targetPartition = new Random (). NextInt (10000)% partitionNums;} else {int hashCode = key.hashCode (); targetPartition = hashCode% partitionNums; System.out.println ("key:" + key + ", value:" + value + ", hashCode:" + hashCode + ", partition:" + targetPartition) } return targetPartition;} public void close () {}} KafkaProducerOps.javapackage com.uplooking.bigdata.kafka.producer;import com.uplooking.bigdata.kafka.constants.Constants;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import java.util.Random / * produce relevant data to Kafka topic through this KafkaProducerOps *

* Producer * / public class KafkaProducerOps {public static void main (String [] args) throws IOException {/ * specially load configuration file * configuration file format: * key=value * * minimize hard coding in the code * do not write the code to death, but configure * / Properties properties = new Properties () InputStream in = KafkaProducerOps.class.getClassLoader () .getResourceAsStream ("producer.properties"); properties.load (in) / * * two generic parameters * the first generic parameter: refers to the type of record key in kafka * the second generic parameter: refers to the type of record value in kafka * / String [] girls = new String [] {"Yao Huiying", "Liu Qian", "Zhou Xin", "Yang Liu"} Producer producer = new KafkaProducer (properties); Random random = new Random (); int start = 1; for (int I = start; I)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report