In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
It is believed that many inexperienced people don't know what to do about how to realize message sending and consumption by integrating SpringBoot in Kafka. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
It was written in front of kafka and zookeeper clusters. If kakfa says there is no connection, remember to delete all the logs logs under kafka, restart the kafka cluster and then start the springboot service.
Zookeeper https://my.oschina.net/u/3730149/blog/3071737kafka https://my.oschina.net/u/3730149/blog/3071754
Producer
Maven dependence
4.0.0 com.gzh.kafka.producer producer 0.0.1-SNAPSHOT jar kafka-producer-master demo project for kafka producer org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 2.1.5.RELEASE 1.8 org.springframework.boot spring-boot-starter org.springframework .Kafka spring-kafka ${spring-kafka.version} org.springframework.boot spring-boot-starter-web org.springframework.boot Spring-boot-starter-test test org.springframework.kafka spring-kafka-test ${spring-kafka.version} test Io.springfox springfox-swagger2 2.8.0 io.springfox springfox-swagger-ui 2.8.0 org.springframework.boot spring-boot-maven-plugin
Application.properties
Server.port=8000spring.application.name=kafka-producer#kafka configurationspring.kafka.producer.bootstrap-servers=127.0.0.1:9092127.0.0.1:9093127.0.0.1:9094spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#topickafka.app.topic.foo=test20180430
Use Spring Boot to send Spring Kafka messages
SpringKafka provides sending messages using Producer's KafkaTemplate class and provides advanced operations for sending data to Kafka topics. Provides both asynchronous and synchronous methods, which return Future. Spring Boot automatically configures and initializes KafkaTemplate based on the properties configured in the application.properties properties file.
To facilitate the test of sending messages, Spring's timed task is used, the timed task is started with the @ EnableScheduling annotation on the class, and the message sending rules are specified through the @ Scheduled annotation.
Package com.gzh.kafka.producer.component; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled Import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @ Component @ EnableScheduling public class KafkaMessageProducer {private static final Logger LOG = LoggerFactory.getLogger (KafkaMessageProducer.class); @ Autowired private KafkaTemplate kafkaTemplate; @ Value ("${kafka.app.topic.foo}") private String topic @ Scheduled (cron = "00ram 5 *?") Public void send () {String message= "Hello World---" + System.currentTimeMillis (); LOG.info ("topic=" + topic+ ", message=" + message); ListenableFuture future = kafkaTemplate.send (topic, message); future.addCallback (success-> LOG.info ("KafkaMessageProducer sent message successfully!") , fail-> LOG.error ("KafkaMessageProducer failed to send message!") ;}}
Create a message producer startup class
Package com.gzh.kafka.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @ SpringBootApplication @ EnableConfigurationProperties public class KafkaProducerApplication {public static void main (String [] args) {SpringApplication.run (KafkaProducerApplication.class, args);}}
At this point, the Spring Boot integration of Spring Kafka message producer applications has been integrated. Start zookeeper, kafka each server. Start the producer application and view the message producer application console log to show that the message was sent successfully! Indicates the integration of OK.
You can also use the way requested by the previous web page
Package com.gzh.kafka.producer.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture @ Service public class KafkaMessageSendService {private static final Logger LOG = LoggerFactory.getLogger (KafkaMessageSendService.class); @ Autowired private KafkaTemplate kafkaTemplate; @ Value ("${kafka.app.topic.foo}") private String topic Public void send (String message) {LOG.info ("topic=" + topic+ ", message=" + message); ListenableFuture future = kafkaTemplate.send (topic, message); future.addCallback (success-> LOG.info ("KafkaMessageProducer sent message successfully!") , fail-> LOG.error ("KafkaMessageProducer failed to send message!") ;}}
Interface request processing controller class
Package com.gzh.kafka.producer.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.gzh.kafka.producer.service.KafkaMessageSendService RestController @ RequestMapping (value= "send", produces=MediaType.APPLICATION_JSON_UTF8_VALUE) public class KafkaMessageSendController {@ Autowired private KafkaMessageSendService kafkaMessageSendService RequestMapping (value= "/ sendMessage", method=RequestMethod.POST) public String send (@ RequestParam (required=true) String message) {try {kafkaMessageSendService.send (message);} catch (Exception e) {return "send failed." } return message;}}
Test Controller service requests through Swagger access
Consumer
Maven dependence
4.0.0 com.gzh.kafka.consumer consumer 0.0.1-SNAPSHOT jar kafka-consumer-master demo project for kafka consumer org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 1.3.4.RELEASE 1.8 org.springframework.boot spring-boot-starter org.springframework .Kafka spring-kafka ${spring-kafka.version} org.springframework.boot spring-boot-starter-web org.springframework.boot Spring-boot-starter-test test org.springframework.kafka spring-kafka-test ${spring-kafka.version} test Org.springframework.boot spring-boot-maven-plugin
Note that when using Spring-Kafka, you must pay attention to the version problem, otherwise you will report all kinds of strange errors. The corresponding relationship between the SpringKafka and kafka-client versions (whose version number should be the same as the version of the kafka server) is given on the Spring official website:
Application.properties configuration
Server.port=8001spring.application.name=kafka-consumer#kafka configuration# specifies that the offset is automatically submitted after the message is consumed In order to continue to consume spring.kafka.consumer.enable-auto-commit=true# specified message group spring.kafka.consumer.group-id=guan# specified kafka server address spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092127.0.0.1:9093127.0.0.1:9094# specified starting from the nearest place (earliest) spring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring . kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer#topickafka.app.topic.foo=test20180430
Annotating a method Spring Kafka with @ KafkaListener automatically creates a message listener container. Use this annotation and specify the topic to be consumed (you can also specify the consumption group and partition number to support regular expression matching), so that once the consumer starts, they will listen to the topic on the kafka server and consume messages in real time.
Package com.gzh.kafka.consumer.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component @ Component public class KafkaMessageConsumer {private static final Logger LOG = LoggerFactory.getLogger (KafkaMessageConsumer.class); @ KafkaListener (topics= {"${kafka.app.topic.foo}"}) public void receive (@ Payload String message, @ Headers MessageHeaders headers) {LOG.info ("KafkaMessageConsumer received message:" + message) Headers.keySet () .forEach (key- > LOG.info ("{}: {}", key,headers.get (key));}}
Create a message consumer startup class
Package com.gzh.kafka.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @ SpringBootApplication @ EnableConfigurationProperties public class KafkaConsumerApplication {public static void main (String [] args) {SpringApplication.run (KafkaConsumerApplication.class, args);}}
Now that the consumer application is complete, let's verify the sending and receiving of Spring Kafka messages. First start the zookeeper server, then start the kafka server, then start the producer (kafka-producer-master) application, then start the consumer (kafka-consumer-master) application, and then observe the producer and consumer startup class log: show the acceptance message successfully!
After reading the above, have you mastered how to achieve message sending and consumption by integrating SpringBoot in Kafka? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.