Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize message sending and consumption by integrating SpringBoot in Kafka

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

It is believed that many inexperienced people don't know what to do about how to realize message sending and consumption by integrating SpringBoot in Kafka. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

It was written in front of kafka and zookeeper clusters. If kakfa says there is no connection, remember to delete all the logs logs under kafka, restart the kafka cluster and then start the springboot service.

Zookeeper https://my.oschina.net/u/3730149/blog/3071737kafka https://my.oschina.net/u/3730149/blog/3071754

Producer

Maven dependence

4.0.0 com.gzh.kafka.producer producer 0.0.1-SNAPSHOT jar kafka-producer-master demo project for kafka producer org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 2.1.5.RELEASE 1.8 org.springframework.boot spring-boot-starter org.springframework .Kafka spring-kafka ${spring-kafka.version} org.springframework.boot spring-boot-starter-web org.springframework.boot Spring-boot-starter-test test org.springframework.kafka spring-kafka-test ${spring-kafka.version} test Io.springfox springfox-swagger2 2.8.0 io.springfox springfox-swagger-ui 2.8.0 org.springframework.boot spring-boot-maven-plugin

Application.properties

Server.port=8000spring.application.name=kafka-producer#kafka configurationspring.kafka.producer.bootstrap-servers=127.0.0.1:9092127.0.0.1:9093127.0.0.1:9094spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#topickafka.app.topic.foo=test20180430

Use Spring Boot to send Spring Kafka messages

SpringKafka provides sending messages using Producer's KafkaTemplate class and provides advanced operations for sending data to Kafka topics. Provides both asynchronous and synchronous methods, which return Future. Spring Boot automatically configures and initializes KafkaTemplate based on the properties configured in the application.properties properties file.

To facilitate the test of sending messages, Spring's timed task is used, the timed task is started with the @ EnableScheduling annotation on the class, and the message sending rules are specified through the @ Scheduled annotation.

Package com.gzh.kafka.producer.component; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled Import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @ Component @ EnableScheduling public class KafkaMessageProducer {private static final Logger LOG = LoggerFactory.getLogger (KafkaMessageProducer.class); @ Autowired private KafkaTemplate kafkaTemplate; @ Value ("${kafka.app.topic.foo}") private String topic @ Scheduled (cron = "00ram 5 *?") Public void send () {String message= "Hello World---" + System.currentTimeMillis (); LOG.info ("topic=" + topic+ ", message=" + message); ListenableFuture future = kafkaTemplate.send (topic, message); future.addCallback (success-> LOG.info ("KafkaMessageProducer sent message successfully!") , fail-> LOG.error ("KafkaMessageProducer failed to send message!") ;}}

Create a message producer startup class

Package com.gzh.kafka.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @ SpringBootApplication @ EnableConfigurationProperties public class KafkaProducerApplication {public static void main (String [] args) {SpringApplication.run (KafkaProducerApplication.class, args);}}

At this point, the Spring Boot integration of Spring Kafka message producer applications has been integrated. Start zookeeper, kafka each server. Start the producer application and view the message producer application console log to show that the message was sent successfully! Indicates the integration of OK.

You can also use the way requested by the previous web page

Package com.gzh.kafka.producer.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture @ Service public class KafkaMessageSendService {private static final Logger LOG = LoggerFactory.getLogger (KafkaMessageSendService.class); @ Autowired private KafkaTemplate kafkaTemplate; @ Value ("${kafka.app.topic.foo}") private String topic Public void send (String message) {LOG.info ("topic=" + topic+ ", message=" + message); ListenableFuture future = kafkaTemplate.send (topic, message); future.addCallback (success-> LOG.info ("KafkaMessageProducer sent message successfully!") , fail-> LOG.error ("KafkaMessageProducer failed to send message!") ;}}

Interface request processing controller class

Package com.gzh.kafka.producer.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.gzh.kafka.producer.service.KafkaMessageSendService RestController @ RequestMapping (value= "send", produces=MediaType.APPLICATION_JSON_UTF8_VALUE) public class KafkaMessageSendController {@ Autowired private KafkaMessageSendService kafkaMessageSendService RequestMapping (value= "/ sendMessage", method=RequestMethod.POST) public String send (@ RequestParam (required=true) String message) {try {kafkaMessageSendService.send (message);} catch (Exception e) {return "send failed." } return message;}}

Test Controller service requests through Swagger access

Consumer

Maven dependence

4.0.0 com.gzh.kafka.consumer consumer 0.0.1-SNAPSHOT jar kafka-consumer-master demo project for kafka consumer org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 1.3.4.RELEASE 1.8 org.springframework.boot spring-boot-starter org.springframework .Kafka spring-kafka ${spring-kafka.version} org.springframework.boot spring-boot-starter-web org.springframework.boot Spring-boot-starter-test test org.springframework.kafka spring-kafka-test ${spring-kafka.version} test Org.springframework.boot spring-boot-maven-plugin

Note that when using Spring-Kafka, you must pay attention to the version problem, otherwise you will report all kinds of strange errors. The corresponding relationship between the SpringKafka and kafka-client versions (whose version number should be the same as the version of the kafka server) is given on the Spring official website:

Application.properties configuration

Server.port=8001spring.application.name=kafka-consumer#kafka configuration# specifies that the offset is automatically submitted after the message is consumed In order to continue to consume spring.kafka.consumer.enable-auto-commit=true# specified message group spring.kafka.consumer.group-id=guan# specified kafka server address spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092127.0.0.1:9093127.0.0.1:9094# specified starting from the nearest place (earliest) spring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring . kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer#topickafka.app.topic.foo=test20180430

Annotating a method Spring Kafka with @ KafkaListener automatically creates a message listener container. Use this annotation and specify the topic to be consumed (you can also specify the consumption group and partition number to support regular expression matching), so that once the consumer starts, they will listen to the topic on the kafka server and consume messages in real time.

Package com.gzh.kafka.consumer.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component @ Component public class KafkaMessageConsumer {private static final Logger LOG = LoggerFactory.getLogger (KafkaMessageConsumer.class); @ KafkaListener (topics= {"${kafka.app.topic.foo}"}) public void receive (@ Payload String message, @ Headers MessageHeaders headers) {LOG.info ("KafkaMessageConsumer received message:" + message) Headers.keySet () .forEach (key- > LOG.info ("{}: {}", key,headers.get (key));}}

Create a message consumer startup class

Package com.gzh.kafka.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @ SpringBootApplication @ EnableConfigurationProperties public class KafkaConsumerApplication {public static void main (String [] args) {SpringApplication.run (KafkaConsumerApplication.class, args);}}

Now that the consumer application is complete, let's verify the sending and receiving of Spring Kafka messages. First start the zookeeper server, then start the kafka server, then start the producer (kafka-producer-master) application, then start the consumer (kafka-consumer-master) application, and then observe the producer and consumer startup class log: show the acceptance message successfully!

After reading the above, have you mastered how to achieve message sending and consumption by integrating SpringBoot in Kafka? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report