In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "the example code of Kafka multithreaded Consumer". In the daily operation, I believe that many people have doubts about the example code of Kafka multithreaded Consumer. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "Kafka multithreaded Consumer instance code". Next, please follow the editor to study!
Multithreaded sample code: here to develop according to their own needs, I only give a simple example here, that is, a few partitions start several consumer, one by one corresponding.
Three classes:
Main:
Public static void main (String [] args) {
String bootstrapServers = "kafka01:9092,kafka02:9092"
String groupId = "test"
String topic = "testtopic"
Int consumerNum = 3
ConsumerGroup cg = new ConsumerGroup (consumerNum,bootstrapServers,groupId,topic)
Cg.execute ()
}
Import java.util.ArrayList
Import java.util.List
Public class ConsumerGroup {
Private List consumers
Public ConsumerGroup (int consumerNum,String bootstrapServers,String groupId,String topic) {
Consumers = new ArrayList (consumerNum)
For (int iTuno witi < consumerNum;i++) {
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable (bootstrapServers,groupId,topic)
Consumers.add (ConsumerRunnable)
}
}
Public void execute () {
For (ConsumerRunnable consumerRunnable:consumers) {
New Thread (consumerRunnable) .start ()
}
}
}
Import java.util.Arrays
Import java.util.Properties
Import org.apache.kafka.clients.consumer.ConsumerRecord
Import org.apache.kafka.clients.consumer.ConsumerRecords
Import org.apache.kafka.clients.consumer.KafkaConsumer
Public class ConsumerRunnable implements Runnable {
Private final KafkaConsumer consumer
Public ConsumerRunnable (String bootstrapServers,String groupId,String topic) {
Properties props = new Properties ()
Props.put ("bootstrap.servers", bootstrapServers)
Props.put ("group.id", groupId)
Props.put ("enable.auto.commit", "true")
Props.put ("auto.commit.interval.ms", "1000")
Props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
Props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
Props.put ("auto.offset.reset", "earliest")
This.consumer = new KafkaConsumer (props)
Consumer.subscribe (Arrays.asList (topic))
}
@ Override
Public void run () {
While (true) {
ConsumerRecords records = consumer.poll (10)
For (ConsumerRecord record: records) {
System.out.printf ("offset =% d, key =% s, value =% s% n", record.offset (), record.key (), record.value ())
}
}
}
}
Detailed explanation of poll method:
(old version: multi-partition and multi-threading new version: one thread manages multiple socket connections)
But the new version of KafkaConsumer is dual-threaded, and the main thread is responsible for: message acquisition, rebalance,coordinator, displacement submission, etc.
The other is the background heartbeat thread.
Depending on the various configurations above, the poll method finds the offset and returns when enough available data is obtained, or when the wait time exceeds the specified timeout.
Java consumer is not thread-safe. If the same KafkaConsumer is used in multiple threads, a KafkaConsumer is not safe for multi-threaded assess exception will be reported. A synchronization lock can be added to protect it.
It has been said that 1000 of the timeout parameter of poll is a timeout setting. If you don't have a lot of data, just wait a second and return. For example, if you write a message for 5 seconds, you can set the timeout parameter to 5000 to maximize efficiency.
If there are no scheduled tasks, set Long.MAX_VALUE to wait indefinitely if it doesn't get enough data. Here we want to capture WakeupException.
At this point, the study of "the example code of Kafka multithreaded Consumer" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.