Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize kafa specified offset consumption in springboot

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

Editor to share with you how to achieve kafa specified offset consumption in springboot, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!

The kafka consumption process will inevitably encounter scenarios that require re-consumption. For example, after we consume kafka data, we need to perform inventory operations. If the database down at a certain time, the data consumed by kafka cannot be stored in the database. In order to make up for the data loss during the database down, we can specify the offset of the kafka consumer to the value of the previous time, and then consume it again.

First create a kafka consumer service

@ Service@Slf4j// implements the CommandLineRunner interface and automatically runs its run method when springboot starts. Public class TspLogbookAnalysisService implements CommandLineRunner {@ Override public void run (String... Args) {/ / do something}}

Establishment of kafka consumption Model

There are multiple partition for each topic in kafka server, and each partition maintains an offset of its own (offset). Our goal is to achieve kafka consumer-specified offset consumption.

The one-to-one consumption model of consumer-- > partition is used here, and each consumer manages its own partition.

@ Service@Slf4jpublic class TspLogbookAnalysisService implements CommandLineRunner {/ / declares that the number of consumption threads with equal number of kafka partitions, one partition corresponds to a consumption thread private static final int consumeThreadNum = 9; / / specially specifies the offset private List partitionOffsets = Lists.newArrayList for each partition to start consumption. (1111, 1112, 1113, 1114, 1115, 1116, 1118, 1119); private ExecutorService executorService = Executors.newFixedThreadPool (consumeThreadNum); @ Override public void run (String...) Args) {/ / Loop to create consumption thread IntStream.range (0, consumeThreadNum) .forEach (partitionIndex-> executorService.submit (()-> startConsume (partitionIndex);}}

The treatment of offset by kafka consumer

Declare the configuration class for kafka consumer

Private Properties buildKafkaConfig () {Properties kafkaConfiguration = new Properties (); kafkaConfiguration.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "); kafkaConfiguration.put (ConsumerConfig.GROUP_ID_CONFIG,"); kafkaConfiguration.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "); kafkaConfiguration.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"); kafkaConfiguration.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "); kafkaConfiguration.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,") KafkaConfiguration.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "); kafkaConfiguration.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,");... More configuration items return kafkaConfiguration;}

Create kafka consumer, process offset, and start consuming data tasks #

Private void startConsume (int partitionIndex) {/ / create kafka consumer KafkaConsumer consumer = new KafkaConsumer (buildKafkaConfig ()); try {/ / specify the consumption partition corresponding to the consumer TopicPartition partition = new TopicPartition (kafkaProperties.getKafkaTopic (), partitionIndex); consumer.assign (Lists.newArrayList (partition)); / / consumer's offset handles if (collectionUtils.isNotEmpty (partitionOffsets) & & partitionOffsets.size () = consumeThreadNum) {Long seekOffset = partitionOffsets.get (partitionIndex); log.info ("partition: {}, offset seek from {}", partition, seekOffset) Consumer.seek (partition, seekOffset);} / start consuming data task kafkaRecordConsume (consumer, partition);} catch (Exception e) {log.error ("kafka consume error: {}", ExceptionUtils.getFullStackTrace (e));} finally {try {consumer.commitSync ();} finally {consumer.close ();}

Consumer data logic, offset operation

Private void kafkaRecordConsume (KafkaConsumer consumer, TopicPartition partition) {while (true) {try {ConsumerRecords records = consumer.poll (TspLogbookConstants.POLL_TIMEOUT); / / specific processing flow records.forEach ((k)-> handleKafkaInput (k.key (), k.value (); / /? Very important: log the offset,partition related information of the current consumer (then get offset,partition information from the log here if you need to reassign offset consumption) if (records.count () > 0) {String currentOffset = String.valueOf (consumer.position (partition)); log.info ("current records size is: {}, partition is: {}, offset is: {}", records.count (), consumer.assignment (), currentOffset);} / / offset submits consumer.commitAsync () Catch (Exception e) {log.error ("handlerKafkaInput error {}", ExceptionUtils.getFullStackTrace (e));}

The above is all the content of this article "how to achieve kafa specified offset consumption in springboot". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report