Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Kafka's High Level Consumer

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article is about how to use Kafka High Level Consumer to share with you, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Why use High Level Consumer

In some application scenarios, we want to read messages through multithreading, and we don't care about the order in which messages are consumed from Kafka, we just care that the data can be consumed. High Level is used to abstract this kind of consumption action.

Message consumption is in Consumer Group units. There can be multiple consumer in each Consumer Group, and each consumer is a thread. Each partition of topic can only be read by a certain consumer at the same time. Each partition corresponding to Consumer Group has the latest offset value, which is stored on the zookeeper. So there will be no duplication of consumption.

The design High Level Consumer High Level Consumer can and should be used in a multithreaded environment. The number of threads in the thread model (which also represents the number of consumer in the group) is related to the number of partition in the topic. Here are some rules:

When the number of threads provided is more than the number of partition, some threads will not receive messages

When the number of threads provided is less than the number of partition, some threads will receive messages from multiple partition

When a thread receives messages from multiple partition, there is no guarantee of the order in which they are received; it is possible to receive 5 messages from partition3, 6 messages from partition4, and then 10 messages from partition3

When more threads are added, it will cause kafka to do re-balance, which may change the correspondence between partition and threads.

Code sample ConsumerGroupExample

Package com.test.groups;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConsumerGroupExample {private final ConsumerConnector consumer; private final String topic; private ExecutorService executor Public ConsumerGroupExample (String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector (createConsumerConfig (a_zookeeper, a_groupId)); this.topic = a popular topic;} public void shutdown () {if (consumer! = null) consumer.shutdown () If (executor! = null) executor.shutdown ();} public void run (int a_numThreads) {Map topicCountMap = new HashMap (); topicCountMap.put (topic, new Integer (a_numThreads)); Map consumerMap = consumer.createMessageStreams (topicCountMap); List streams = consumerMap.get (topic) / / now launch all the threads / / executor = Executors.newFixedThreadPool (a_numThreads); / / now create an object to consume the messages / / int threadNumber = 0; for (final KafkaStream stream: streams) {executor.submit (new ConsumerTest (stream, threadNumber)) ThreadNumber++;}} private static ConsumerConfig createConsumerConfig (String a_zookeeper, String a_groupId) {Properties props = new Properties (); props.put ("zookeeper.connect", a_zookeeper); props.put ("group.id", a_groupId) Props.put ("zookeeper.session.timeout.ms", "400"); props.put ("zookeeper.sync.time.ms", "200"); props.put ("auto.commit.interval.ms", "1000"); return new ConsumerConfig (props) } public static void main (String [] args) {String zooKeeper = args [0]; String groupId = args [1]; String topic = args [2]; int threads = Integer.parseInt (args [3]); ConsumerGroupExample example = new ConsumerGroupExample (zooKeeper, groupId, topic); example.run (threads) Try {Thread.sleep (10000);} catch (InterruptedException ie) {} example.shutdown ();}}

ConsumerTest

Package com.test.groups;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream thread stream; private int thread number; public ConsumerTest (KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a thread number; m_stream = a_stream } public void run () {ConsumerIterator it = m_stream.iterator (); while (it.hasNext ()) System.out.println ("Thread" + m_threadNumber + ":" + new String (it.next (). Message (); System.out.println ("Shutting down Thread:" + m_threadNumber) }} the above is how to use Kafka's High Level Consumer, and the editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report