In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "how to integrate Springboot into Kafka for bulk consumption". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how Springboot integrates Kafka for bulk consumption".
Introduce dependency org.springframework.kafka spring-kafka 1.3.11.RELEASE
Because the springboot version of my project is 1.5.22.RELEASE, I quote the 1.3.11.RELEASE package. Readers can choose the corresponding version according to the following figure. The picture may not be updated in time. For more information, please see the official website of spring-kafka.
Note: there is a crater here. If the package version is not correct, an org.springframework.core.log.LogAccessor exception will be thrown when the project starts:
Java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor create configuration class / * kafka configuration class * / @ Configuration @ EnableKafka public class KafkaConsumerConfig {private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger (KafkaConsumerConfig.class); @ Value ("${kafka.bootstrap.servers}") private String kafkaBootstrapServers; @ Value ("${kafka.group.id}") private String kafkaGroupId @ Value ("${kafka.topic}") private String kafkaTopic; public static final String CONFIG_PATH = "/ home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf"; public static final String LOCATION_PATH = "/ home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks"; @ Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory () Factory.setConsumerFactory (consumerFactory ()); / / set concurrency, the number of partitions less than or equal to Topic factory.setConcurrency (5); / / set to batch snooping factory.setBatchListener (Boolean.TRUE); factory.getContainerProperties () .setPollTimeout (30000); return factory } public ConsumerFactory consumerFactory () {return new DefaultKafkaConsumerFactory (consumerConfigs ());} public Map consumerConfigs () {Map props = new HashMap (); / / set the access point, please obtain the access point of the corresponding Topic through the console. Props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); / / set the path of the SSL root certificate. Please remember to modify XXX to your own path. / / similar to the SASL path, the file cannot be packaged into jar. The password of System.setProperty ("java.security.auth.login.config", CONFIG_PATH); props.put (SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); / / the root certificate store remains the same. Props.put (SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); / / access protocol. Currently, SASL_SSL protocol is supported. Props.put (CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); / / SASL authentication method remains unchanged. Props.put (SaslConfigs.SASL_MECHANISM, "PLAIN"); / / automatic submission of props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); / / the maximum allowable interval between two Poll. / / when the consumer exceeds this value, the heartbeat is not returned. The server determines that the consumer is in a non-living state, and the server removes the consumer from the Consumer Group and triggers the Rebalance. The default is 30s. Props.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); / / set the amount to be pulled at a time, which will have a great impact on public network access. Props.put (ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put (ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); / / maximum number of Poll at a time. / / Note that the value should not be changed too much. If the Poll has too much data and cannot be consumed before the next Poll, a load balancer will be triggered, resulting in stutters. Props.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); / / the deserialization of the message. Props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); / / the consumption group to which the current consumption instance belongs. Enter it after you apply in the console. / / consumption instances belonging to the same group will be loaded with consumption messages. Props.put (ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); / / Hostname check is changed to empty. Props.put (SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); return props;}}
Note: here through factory.setConcurrency (5); the concurrency is configured to be 5, assuming that our online Topic has 12 partitions. Then there will be 3 threads assigned to 2 partitions, 2 threads assigned to 3 partitions, 3 * 2 + 2 * 3 = 12.
Kafka consumers / * kafka message consumers * / @ Component public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaMessageListener.class) @ KafkaListener (topics = {"${kafka.topic}"}) public void listen (List recordList) {for (ConsumerRecord record: recordList) {/ / print message partition and offset LOGGER.info ("Kafka Consume partition: {}, offset: {}", record.partition (), record.offset ()); String value = record.value () System.out.println ("value =" + value); / / processing business logic.}
Because I set batch snooping in the configuration class, the input parameter to the listen method here is List:List.
At this point, I believe you have a deeper understanding of "how Springboot integrates Kafka for bulk consumption". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.