Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Kafka single-thread Consumer and detailed explanation of its parameters

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Please use a version after 0.9:

Sample code Properties props = new Properties (); props.put ("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put ("group.id", "test"); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000") Props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put ("auto.offset.reset", "earliest"); KafkaConsumer consumer = new KafkaConsumer (props); consumer.subscribe (Arrays.asList ("foo", "bar")) Try {while (true) {ConsumerRecords records = consumer.poll (1000); for (ConsumerRecord record: records) {System.out.printf ("offset =% d, key =% s, value =% s% n", record.offset (), record.key (), record.value ());} finally {consumer.close () }

1. You only need to configure the server groupid autocommit serialization autooffsetreset of kafka (where bootstrap.server group.id key.deserializer value.deserializer must be specified)

2. Use these Properties to build consumer objects (KafkaConsumer has other constructs that can be passed in serialization)

3. Subscribe subscribe to topic list (you can use regular subscription Pattern.compile ("kafka.*")

You must specify a listener subscribe (Pattern pattern, ConsumerRebalanceListener listener) to use the rule; you can override this interface to implement the logic when the partition changes. Ignore this logic if enable.auto.commit = true is set.

4. Then loop the poll message (1000 here is the timeout setting, so if you don't have a lot of data, just wait a second)

5. Processing messages (offset key value is printed and processing logic is written here).

6. Close KafkaConsumer (the number of seconds to wait by passing a timeout value is 30 by default).

Detailed explanation of parameters

Bootstrap.server (it's best to use the hostname instead of the ip kafka internal hostname unless you've configured ip yourself)

Deserializer deserialization consumer takes a byte array from the broker side and returns it to the object type.

There are more than a dozen kinds by default: StringDeserializer LongDeserializer DoubleDeserializer.

You can also customize: define serializer format to create custom deserializer classes to implement Deserializer interface rewriting logic

Except for the four must-pass bootstrap.server group.id key.deserializer value.deserializer

And session.timeout.ms "time when the coordinator test failed"

Is to detect the consumer hang time in order to be timely rebalance default is 10 seconds can be set a smaller value to avoid message delay.

Max.poll.interval.ms "maximum time for consumer processing logic"

When the processing logic is complicated, you can set this value to avoid unnecessary rebalance, because twice the poll time exceeds this parameter, kafka thinks that the consumer can no longer keep up with the group, it will be kicked out of the group, and if the offset cannot be submitted, it will repeat consumption. The default is 5 minutes.

Auto.offset.reset "coping strategy for kafka without displacement or displacement crossing the boundary"

So if you start a group ab initio consumption, successfully submit the displacement, restart or continue to consume, this parameter is invalid.

So the interpretation of the three values is:

When there is a submitted offset under each partition, earliset consumption starts from the submitted offset; when there is no committed offset, it is consumed from the earliest shift.

Latest starts consumption from the submitted offset when there is a submitted offset under each partition; if there is no committed offset, the consumption starts after the offset when the newly generated data none topic under the partition contains the submitted offset; as long as there is no committed offset in one partition, an exception is thrown

(note that before the kafka-0.10.1.X version: the value of auto.offset.reset is smallest, and largest. (offest is saved in zk),

We are talking about the new version: after the kafka-0.10.1.X version, the value of auto.offset.reset has been changed to: earliest,latest, and none (offest is saved in a special topic name of kafka: _ _ consumer_offsets).

Whether enable.auto.commit automatically commits displacement

True auto-commit false requires the user to manually submit a recently set false control that only needs to be processed once.

Fetch.max.bytes consumer gets the maximum number of bytes at a time

Maximum number of messages returned by max.poll.records in a single poll

Default 500 if the consumption is very light, you can appropriately increase this value to increase the consumption speed.

Hearbeat.interval.ms consumer other team members perceive the time of rabalance

This value must be less than session.timeout.ms. If you detect that consumer is dead, you will not be able to perceive rabalance at all.

Time that connections.max.idle.ms closes the connection periodically

The default is 9 minutes, which can be set to-1 and never turn off.

For more blog posts on real-time computing, Kafka and other related technologies, welcome to follow real-time streaming computing.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report