In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article is to share with you about the automatic configuration of springboot. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
Integration of spring boot automatic configuration mode
Spring boot has many automatic configurations, and the automatic configuration of kafka is also included. Integrating kafka based on spring boot automatic configuration requires the following steps.
Introduction of pom dependency package for kafka
Org.springframework.kafka spring-kafka 2.2.2.RELEASE
Configure the kafka-related attributes configuration in the configuration file, configure the attributes of producers and consumers respectively. When the program starts, the spring boot framework will automatically read the attributes of these configurations and create relevant producers, consumers, and so on. A simple configuration is shown below.
# kafka default Consumer configuration spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092spring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=earliest#kafka default producer configuration spring.kafka.producer.bootstrap-servers=192.168.0.15:9092spring.kafka.producer.acks=-1spring.kafka.client-id=kafka-producerspring.kafka.producer.batch-size=5
Of course, the configuration in actual production must be more complex than the configuration above, and some customized operations are required. If neither the producer nor the consumer created by spring boot's automatic configuration can satisfy us, we should customize the relevant configuration. For example, we will analyze the automatic configuration first.
After the above configuration, when a producer is needed, the mode of use is shown in the following code.
@ RunWith (SpringRunner.class) @ SpringBootTest (classes = {UserSSOApplication.class}) public class UserSSOApplicationTests {@ Resource / / inject kafkatemplete, which is automatically created by spring boot @ KafkaTemplate kafkaTemplate; @ Test public void testKafkaSendMsg () {/ / send the message kafkaTemplate.send ("test", 0Mae 12, "1222");}}
The consumer uses the annotation method to integrate, the code is as follows.
@ Component@Slf4jpublic class KafkaMessageReceiver2 {/ / specify the topic to listen to. The current consumer group id @ KafkaListener (topics = {"test"}, groupId = "receiver") public void registryReceiver (ConsumerRecord integerStringConsumerRecords) {log.info (integerStringConsumerRecords.value ());}}
The above is the simplest configuration, a simple example of implementation, if you need a more customized configuration, you can refer to the class
Org.springframework.boot.autoconfigure.kafka.KafkaProperties this contains most of the required kafka configurations. For configuration, you can add it in the properties file.
Shortcomings of spring boot automatic configuration
The above is an integration method that relies on spring boot automation configuration, and virtually all configuration implementations are done in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration. You can see that you rely on @ Configuration to complete the bean configuration, which can basically achieve most cases, as long as you are familiar with the configuration in org.springframework.boot.autoconfigure.kafka.KafkaProperties.
But there is another problem with this approach, which is that org.springframework.boot.autoconfigure.kafka.KafkaProperties does not cover all the configurations in org.apache.kafka.clients.producer.ProducerConfig, which leads to some special configurations that cannot be created automatically by spring boot, which requires us to create Producer and comsumer manually.
Configuration@ConditionalOnClass (KafkaTemplate.class) @ EnableConfigurationProperties (KafkaProperties.class) @ Import (KafkaAnnotationDrivenConfiguration.class) public class KafkaAutoConfiguration {private final KafkaProperties properties; private final RecordMessageConverter messageConverter; public KafkaAutoConfiguration (KafkaProperties properties, ObjectProvider messageConverter) {this.properties = properties; this.messageConverter = messageConverter.getIfUnique ();} @ Bean @ ConditionalOnMissingBean (KafkaTemplate.class) public KafkaTemplate kafkaTemplate (ProducerFactory kafkaProducerFactory, ProducerListener kafkaProducerListener) {KafkaTemplate kafkaTemplate = new KafkaTemplate (kafkaProducerFactory) If (this.messageConverter! = null) {kafkaTemplate.setMessageConverter (this.messageConverter);} kafkaTemplate.setProducerListener (kafkaProducerListener); kafkaTemplate.setDefaultTopic (this.properties.getTemplate (). GetDefaultTopic (); return kafkaTemplate;} @ Bean @ ConditionalOnMissingBean (ConsumerFactory.class) public ConsumerFactory kafkaConsumerFactory () {return new DefaultKafkaConsumerFactory (this.properties.buildConsumerProperties ()) } @ Bean @ ConditionalOnMissingBean (ProducerFactory.class) public ProducerFactory kafkaProducerFactory () {DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory (this.properties.buildProducerProperties ()); String transactionIdPrefix = this.properties.getProducer () .getTransactionIdPrefix (); if (transactionIdPrefix! = null) {factory.setTransactionIdPrefix (transactionIdPrefix);} return factory;} / / slightly} manually configure kafka under spring boot
Since we need to configure some special configurations, we may need to manually configure the kafka-related bean to create a configuration class like this
Org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration. After the bean of the corresponding type is created here, the corresponding Bean definition in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration will not work.
All producer configurations can refer to the ProducerConfig class, and all consumer configurations can refer to the ConsumerConfig class.
/ * kafka configuration. In fact, there is already a default to create configuration based on configuration file information in KafkaAutoConfiguration, but the automatic configuration attribute does not cover all * We can customize the creation of related bean and configure as follows: * * @ author zhoujy * @ date December 17, 2018 * * / @ Configurationpublic class KafkaConfig {@ Value ("${spring.kafka.consumer.bootstrap-servers}") private String bootstrapServers / / construct more configurable attributes in consumer properties map,ConsumerConfig than spring boot auto-configuration private Map consumerProperties () {Map props = new HashMap (); props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class) Props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5); props.put (ConsumerConfig.GROUP_ID_CONFIG, "activity-service"); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return props } / * do not use the DefaultKafkaConsumerFactory created by spring boot by default, redefine the creation method * @ return * / @ Bean ("consumerFactory") public DefaultKafkaConsumerFactory consumerFactory () {return new DefaultKafkaConsumerFactory (consumerProperties ()) } @ Bean ("listenerContainerFactory") / / Personalized definition of Consumer public ConcurrentKafkaListenerContainerFactory listenerContainerFactory (DefaultKafkaConsumerFactory consumerFactory) {/ / specify the use of DefaultKafkaConsumerFactory ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory); / / set the consumer ack mode to manual, see the demand setting factory.getContainerProperties () .setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE) / / set message consumption that can be pulled in batches. Pull quantity 3 at a time, depending on the demand setting factory.setConcurrency (3); factory.setBatchListener (true); return factory;} / * @ Bean / / Code creation method topic public NewTopic batchTopic () {return new NewTopic ("topic.quick.batch", 8, (short) 1) } * / / create more configurable attributes in the producer configuration map,ProducerConfig than spring boot auto configuration private Map producerProperties () {Map props = new HashMap (); props.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put (ProducerConfig.ACKS_CONFIG, "- 1") Props.put (ProducerConfig.BATCH_SIZE_CONFIG, 5); props.put (ProducerConfig.LINGER_MS_CONFIG, 500); props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return props } / * redefine * @ return * / @ Bean ("produceFactory") public DefaultKafkaProducerFactory produceFactory () {return new DefaultKafkaProducerFactory (producerProperties ()) instead of using the DefaultKafkaProducerFactory created by spring boot's KafkaAutoConfiguration default method } / * do not use the KafkaTemplate created by spring boot's KafkaAutoConfiguration default, redefine * @ param produceFactory * @ return * / @ Bean public KafkaTemplate kafkaTemplate (DefaultKafkaProducerFactory produceFactory) {return new KafkaTemplate (produceFactory);}}
The producer uses it in the same way as automatic configuration by injecting KafkaTemplate directly. The main reason is that the use of consumers is somewhat different.
Bulk consumption of messages
The above consumer configuration is configured with a bean,@Bean ("listenerContainerFactory"), which can be specified as a consumer, and is annotated as follows.
ContainerFactory = "listenerContainerFactory" specifies that listenerContainerFactory is used as the consumer.
Note the parameters in registryReceiver. ConsumerRecord compares with previous consumers. Because setting listenerContainerFactory is batch consumption, ConsumerRecord is a List, if not batch consumption, corresponding to an object.
Note the second parameter, Acknowledgment, which can only be injected if the consumer's ack reply mode is set to AckMode.MANUAL_IMMEDIATE, which means manual ack is required.
@ Component@Slf4jpublic class KafkaMessageReceiver {/ * listenerContainerFactory sets batch pull messages, so the parameter is List, otherwise it is ConsumerRecord * @ param integerStringConsumerRecords * @ param acknowledgment * / @ KafkaListener (topics = {"test"}, containerFactory = "listenerContainerFactory") public void registryReceiver (List integerStringConsumerRecords, Acknowledgment acknowledgment) {Iterator it = integerStringConsumerRecords.iterator (); while (it.hasNext ()) {ConsumerRecord consumerRecords = it.next () / / dosome acknowledgment.acknowledge ();}
If you do not want to consume messages in bulk, you can define another bean similar to @ Bean ("listenerContainerFactory"), as long as bulk consumption is not set.
@ Bean ("listenerContainerFactory2") / / Personalized definition of consumer public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2 (DefaultKafkaConsumerFactory consumerFactory) {/ / specify the use of DefaultKafkaConsumerFactory ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory); / / set the consumer ack mode to manual, see the demand setting factory.getContainerProperties (). SetAckMode (AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory;} spring boot integrate kafka error report
Timeout expired while fetching topic metadata
This error should check for kafka connection problems, whether the service is started, and whether the port is correct.
Kafka Producer error Expiring 10 record (s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time
This kind of error report should consider the version problem corresponding to kafka and spring. This problem occurs when my springboot 2.1.2 uses kafka_2.12-2.1.1. After changing the kafka version to 2.11-1.1.1, the problem is solved.
Thank you for reading! This is the end of this article on "what are the automatic configuration methods of springboot?". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.