Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What should be paid attention to when using Kafka Consumer

2025-03-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "what you should pay attention to when using Kafka Consumer". Friends who are interested might as well take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn what you should pay attention to when using Kafka Consumer.

1. Characteristics:

Don't worry about offset, you will automatically read the last offset of the Consumer group in zookeeper.

II. Points for attention

1. It is a waste if there are more consumer than partition, because the design of kafka does not allow concurrency on a partition.

So the number of consumer should not be greater than the number of partition.

two。 If consumer is less than partition, one consumer will correspond to multiple partitions

Here we mainly distribute the numbers of multipliers and partition reasonably, otherwise the data in the partition will be taken unevenly.

It is better that the number of partiton is an integral multiple of the number of consumer, so the number of partition is very important.

For example, if you take 24, it is easy to set the number of consumer

3. If consumer reads data from multiple partition, the sequence between data is not guaranteed.

Kafka only guarantees that the data is ordered on one partition, but multiple partition will vary depending on the order in which you read it.

4. Adding or subtracting consumer,broker,partition will lead to rebalance

Therefore, the corresponding partition of consumer will change after rebalance.

5. When the data is not available in the High-level API, it will block.

3. The code is as follows:

Package kafkatest.kakfademo

Import java.io.UnsupportedEncodingException

Import java.util.HashMap

Import java.util.List

Import java.util.Map

Import java.util.Properties

Import java.util.concurrent.ExecutorService

Import java.util.concurrent.Executors

Import kafka.consumer.Consumer

Import kafka.consumer.ConsumerConfig

Import kafka.consumer.ConsumerIterator

Import kafka.consumer.KafkaStream

Import kafka.javaapi.consumer.ConsumerConnector

Public class ConsumerDemo1 {

Public static void main (String [] args) {

ConsumerDemo1 demo = new ConsumerDemo1 ()

Demo.test ()

}

@ SuppressWarnings ("rawtypes")

Public void test () {

String topicName = "test"

Int numThreads = 1

Properties properties = new Properties ()

Properties.put ("zookeeper.connect", "hadoop0:2181"); / / declare zk

Properties.put ("group.id", "group--demo"); / / you must use a different group name

/ / if producers and consumers are in the same group, the topic data in the same group cannot be accessed

ConsumerConnector consumer = Consumer

.createJava consumerConnector (new ConsumerConfig (properties))

Map topicCountMap = new HashMap ()

TopicCountMap.put (topicName, numThreads); / / get one data from the topic at a time

Map messageStreams = consumer

.createMessageStreams (topicCountMap)

/ / get the data received each time

List streams = messageStreams

.get (topicName)

/ / now launch all the threads

ExecutorService executor = Executors.newFixedThreadPool (numThreads)

/ / now create an object to consume the messages

/ /

Int threadNumber = 0

For (final KafkaStream stream: streams) {

Executor.execute (new ConsumerMsgTask (stream, threadNumber))

ThreadNumber++

}

}

Class ConsumerMsgTask implements Runnable {

Private KafkaStream m_stream

Private int m_threadNumber

Public ConsumerMsgTask (KafkaStream stream, int threadNumber) {

M_threadNumber = threadNumber

M_stream = stream

}

Public void run () {

ConsumerIterator it = m_stream.iterator ()

Long offset = 0

Try {

While (it.hasNext ())

Offset = it.next () .offset ()

Byte [] bytes = it.next () .message

String msg = new String (bytes, "UTF-8")

System.out.print ("offset:" + offset + ", msg:" + msg)

System.out.println ("Shutting down Thread:" + m_threadNumber)

} catch (UnsupportedEncodingException e) {

/ / TODO Auto-generated catch block

E.printStackTrace ()

}

}

}

}

IV. Experimental verification

At this point, I believe you have a deeper understanding of "what to pay attention to in the use of Kafka Consumer". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report