In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the knowledge of "how Kafka2.7 resets the displacement of consumer groups". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Reset consumption Earliest
First of all, let's take a look at the consumption progress before resetting the displacement.
Bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- describe
According to the progress screenshot, you can see that the Lag of all partitions is 0, indicating that the messages have been consumed. Now, according to the Earliest policy, the consumption progress is reset, and all messages can be consumed again after the reset.
Script command mode bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic--to-earliest-- execute
If you check the consumption progress again at this time, you can see that consumers can re-consume these messages at this time.
Script command method (specify partition) bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic:1,2-- to-earliest-- execute Java API mode Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Final String topic = "mytopic"
Try (KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Collection partitions = consumer.partitionsFor (topic). Stream ()
.map (partitionInfo-> new TopicPartition (topic, partitionInfo.partition ()
.notify (Collectors.toList ())
Consumer.seekToBeginning (partitions)
Consumer.partitionsFor (topic) .forEach (I-> consumer.position (new TopicPartition (topic, i.partition ()
}
What needs to be specified is that seekToBeginning, seekToEnd and other methods need to execute position before they take effect immediately.
Java API mode (specify partition) Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Final String topic = "mytopic"
Try (KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
List partitions = new ArrayList ()
Partitions.add (new TopicPartition (topic, 1))
Partitions.add (new TopicPartition (topic, 2))
Consumer.seekToBeginning (partitions)
Consumer.position (new TopicPartition (topic, 1))
Consumer.position (new TopicPartition (topic, 2))
} Latest
First of all, take a look at the progress of consumption before resetting the displacement.
As you can see from the figure above, no messages in kafka are currently consumed. The consumption progress is now reset according to the Latest policy, and the original messages are no longer consumed after the reset.
Script command bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic--to-latest-- execute
After reset
Script command (specify partition) bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic:1,2-- to-latest-- execute Java APIProperties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Consumer.seekToEnd (consumer.partitionsFor (topic) stream ()
.map (partitionInfo-> new TopicPartition (topic, partitionInfo.partition ()
.notify (Collectors.toList ()
Consumer.partitionsFor (topic) .forEach (I-> consumer.position (new TopicPartition (topic, i.partition ()
} Java API (specified partition) Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
List partitions = new ArrayList ()
Partitions.add (new TopicPartition (topic, 1))
Partitions.add (new TopicPartition (topic, 2))
Consumer.seekToEnd (partitions)
Consumer.position (new TopicPartition (topic, 1))
Consumer.position (new TopicPartition (topic, 2))
} Current
This method can not associate with the corresponding application scenario for the time being, so it will be roughly skipped and will be added later.
Script command bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 topic mytopic:1,2-- group mytopic-consumer-group-- reset-offsets-- topic mytopic--to-current-- execute script command (specified partition) bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108VR 9092-- group mytopic-consumer-group-- reset-offsets-- topic mytopic:1,2-- to-current-- execute Java APIProperties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Consumer.partitionsFor (topic). Stream (). Map (info-> new TopicPartition (topic, info.partition () .forEach (tp-> {
Long committedOffset = consumer.committed (tp). Offset ()
Consumer.seek (tp, committedOffset)
});
} Java API (specified partition) Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
TopicPartition tp1 = new TopicPartition (topic, 1)
TopicPartition tp2 = new TopicPartition (topic, 2)
Consumer.seek (tp1, consumer.committed (tp1). Offset ())
Consumer.seek (tp2, consumer.committed (tp2). Offset ())
} Specified-Offset
Before reset
Script command bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic--to-offset 5-- execute
Script command (specify partition)
Generally speaking, the commit displacement of each partition is often different, so setting the displacement of all partitions to the same value is not displayed, and the partition needs to be specified.
Bin/kafka-consumer-groups.sh-bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic:2-- to-offset 11-- execute Java APIProperties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Consumer.partitionsFor (topic). Stream (). ForEach (pi-> {
TopicPartition tp = new TopicPartition (topic, pi.partition ())
Consumer.seek (tp, 5L)
});
} Java API (specified partition) Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Consumer.seek (new TopicPartition (topic, 2), 10L)
} Shift-By-N
Before reset
Script command bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic--shift-by-1-- execute
Script command (specify partition) bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic:2-- shift-by-2-- execute Java APIProperties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
For (PartitionInfo info: consumer.partitionsFor (topic)) {
TopicPartition tp = new TopicPartition (topic, info.partition ())
Consumer.seek (tp, consumer.committed (tp). Offset ()-1L)
}
} Java API (specified partition) Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
TopicPartition tp = new TopicPartition (topic, 2)
Consumer.seek (tp, consumer.committed (tp). Offset () + 2L)
}
DateTime
Sometimes it's a good way to reset the displacement according to the point in time, before resetting:
Script command bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic--to-datetime 2021-05-09T00:00:00.000-- execute
Script command (specify partition) bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic:2-- to-datetime 2020-05-09T00:00:00.000-- executeJava APIProperties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Long ts = new Date () .getTime ()-24 * 60 * 60 * 1000
Map timeToSearch = consumer.partitionsFor (topic). Stream ()
.map (pi-> new TopicPartition (topic, pi.partition ()
Collectors.toMap (Function.identity (), tp-> ts))
For (Entry entry: consumer.offsetsForTimes (timeToSearch). EntrySet ()) {
Consumer.seek (entry.getKey (), entry.getValue () = = null? Consumer.committed (entry.getKey ()) .offset (): entry.getValue () .offset ()
}
} Java API (specified partition) Properties props = new Properties ()
Props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.108 purl 9092")
Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName ())
Props.put (ConsumerConfig.GROUP_ID_CONFIG, "mytopic-consumer-group")
Final String topic = "mytopic"
Try (final KafkaConsumer consumer = new KafkaConsumer (props)) {
Consumer.subscribe (Arrays.asList (topic))
Consumer.poll (0)
Long ts = new Date () .getTime ()-365 * 24 * 60 * 60 * 1000
Map timeToSearch = new HashMap () {{
Put (new TopicPartition (topic, 2), ts)
}}
For (Entry entry: consumer.offsetsForTimes (timeToSearch). EntrySet ()) {
Consumer.seek (entry.getKey (), entry.getValue () = = null? Consumer.committed (entry.getKey ()) .offset (): entry.getValue () .offset ()
}
} Duration
Before reset
Script command
First of all, you need to understand the format of Java Duration, PnDTnHnMnS, which will not be expanded in detail here.
Bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 group mytopic-consumer-group-- reset-offsets-- topic mytopic--by-duration P1DT0H0M0S-- execute
Script command (specify partition) bin/kafka-consumer-groups.sh-- bootstrap-server 192.168.1.108 bin/kafka-consumer-groups.sh-- group mytopic-consumer-group-- reset-offsets-- topic mytopic:2-- by-duration P1DT0H0M0S-- executeJava API
Same as DateTime
This is the end of the introduction to "how Kafka2.7 resets the displacement of consumer groups". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.