In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "what is the principle and function of spring cloud stream and kafka". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what is the principle and function of spring cloud stream and kafka"?
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.
Spring cloud stream is planning to unify the messaging middleware harem man, he is flexible, behind the backing spring, will make 18 weapons (news subscription model, consumer group, stateful partitions, etc.), the harem currently has the eastern palace empress kafka and the west palace empress rabbitMQ.
Gossip Party: today, let's talk about the relationship between spring cloud stream and kafka. RabbitMQ will let her stay in the cold palace.
1. The first lady of Zhenggong: kafkaApache Kafka ®is a distributed streaming platform. What exactly does that mean?
A streaming platform has three key capabilities:
Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
Store streams of records in a fault-tolerant durable way.
Process streams of records as they occur.
I am a stream processing platform, can do a lot of work:
Ability to handle publish / subscribe messages
Save the message in a stable way
I'll deal with it as soon as I get here. It's really fast.
To sum up, it is fast, stable and accurate.
Kafka is very simple to run, download from here, and then run zookeeper first. A zookeeper is also included in the latest kafka download package, which can be used directly. After zookeeper starts, you need to configure the ip and port of zookeeper in the configuration file of kafka, which is config/server.properties.
# # Zookeeper # # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk# server. E.g. "127.0.0.1 3000127.0.0.1" # You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=localhost:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000
Then run the command under the bin directory and start kafka.
The personal manager of bin/kafka-server-start.sh-daemon config/server.properties2 and kafka, kafka-manager
Although kafka is started, we still need a manager to report the situation, although we need to know what she said. I use kafka-manager here. The download address is here. It is a pity that there is only the download of the source code, there is no runnable version, need to compile on their own, this compilation speed is quite slow, I provide a compiled version for you, click here.
Kafka-manager also needs to configure the relationship with kafka, in the conf/application.conf file, but the configuration is not kafka itself, but kafka-mounted zookeeper.
Kafka-manager.zkhosts= "localhost:2181"
Then start bin/kafka-manager (kafka-manager.bat can also be run in windows environment)
There is a pit here. If you run under windows, the startup may fail, indicating that the input line is too long.
This is because the directory is too long, shorten the kafak-manager-2.0.0.2 directory name and it will work properly.
After startup, fill in the address port of zookeeper through Add Cluster, and the version of Kafka Version must match the version of kafka you are using, otherwise you may not see the content of kafka.
Then we can see kafka's broker,topic,consumers,partitions and other information.
3. Your Majesty is here, spring cloud stream
The starting point of everything is still in start.spring.io.
This dark interface is what spring did for Halloween. Related to us are the two dependencies on the right, which correspond to these in pom.xml.
Org.apache.kafka kafka-streams org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka- Streams org.springframework.cloud spring-cloud-stream-test-support test org.springframework.cloud Spring-cloud-dependencies ${spring-cloud.version} pom import
But this alone is not enough. If you run it directly, it will prompt you.
Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka
You also need to add a dependency package
Org.springframework.cloud spring-cloud-stream-binder-kafka 4. Send messages, biubiubiu
After the spring cloud stream project framework is set up, we need to divide it into two parts, one is the sending part of the message, and the other is the place to receive the message. Let's first look at the part of sending messages. The first is the configuration file, application.yml.
Spring: cloud: stream: default-binder: kafka # default binder, kafka: # if rabbitMQ is used, enter the message middleware server address of rabbit binder: brokers: # Kafka here-localhost:9092 bindings: output: # Channel name binder: kafka destination: test1 # the destination to which the message is sent The format of the group content-type: text/plain # message corresponding to topic group: output-group-1 # corresponding to kafka
Note that the output here indicates that the message is published and corresponds to the subsequent subscription message. The name of this output is the name of the message channel, which can be customized, as we'll talk about later.
Then we need to create a publisher
Import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;@EnableBinding (Source.class) public class Producer {private Source mySource; public Producer (Source mySource) {super (); this.mySource = mySource;} public Source getMysource () {return mySource } public void setMysource (Source mysource) {mySource = mySource;}}
@ EnableBinding literally knows that it is bound to a channel. The name of the bound channel is that the above output,Soure.class is provided by spring, indicating that this is a bindable publishing channel. Its channel name is output, which corresponds to the output in application.yml.
The source code can be seen clearly.
Package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;/** * Bindable interface with one output channel. * * @ author Dave Syer * @ author Marius Bogoevici * @ see org.springframework.cloud.stream.annotation.EnableBinding * / public interface Source {/ * Name of the output channel. * / String OUTPUT = "output"; / * * @ return output channel * / @ Output (Source.OUTPUT) MessageChannel output ();}
If we need to define our own channel, we can write a class ourselves, such as this one, and the channel name will be changed to my-out.
Import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel; public interface MySource {String INPUT = "my-in"; String OUTPUT = "my-out"; @ Input (INPUT) SubscribableChannel myInput (); @ Output (OUTPUT) MessageChannel myOutput ();}
In that case, application.yml will change.
The destination of the my-out: binder: kafka destination: mytest # message, which corresponds to the format of the group content-type: text/plain # message of the kafka corresponding to topic group: output-group-2 #
The @ EnableBinding of Product.class also needs to be changed. In order to correspond, I wrote another MyProducer.
Import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding (MySource.class) public class MyProducer {private MySource mySource; public MyProducer (MySource mySource) {super (); this.mySource = mySource;} public MySource getMysource () {return mySource;} public void setMysource (MySource mysource) {mySource = mySource }}
In this way, the part of publishing the message is written, and we write a controller to send the message.
Import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;import com.wphmoon.kscs.service.ChatMessage;import com.wphmoon.kscs.service.MyProducer;import com.wphmoon.kscs.service.Producer;@RestControllerpublic class MyController {@ Autowired private Producer producer @ Autowired private MyProducer myProducer; / / get the String message via HTTP, publish it to broker using spring cloud stream @ RequestMapping (value = "/ sendMessage/string", method = RequestMethod.POST) public String publishMessageString (@ RequestBody String payload) {/ / send message to channel output producer.getMysource () .output () .send (MessageBuilder.withPayload (payload) .setHeader ("type", "string") .build ()) Return "success";} @ RequestMapping (value = "/ sendMyMessage/string", method = RequestMethod.POST) public String publishMyMessageString (@ RequestBody String payload) {/ / send message to channel myoutput myProducer.getMysource () .myOutput () .send (MessageBuilder.withPayload (payload) .setHeader ("type", "string") .build (); return "success";}}
It's simple. Just call producer to send a string. I use postman to initiate this action.
The message has been sent out, how can we receive the message? Look down.
5. Receive the news, come and come
Similarly, we use the previous spring cloud stream project framework to receive the message, starting with the application.yml file
Server: port: 8081spring: cloud: stream: default-binder: kafka kafka: binder: brokers:-localhost:9092 bindings: input: binder: kafka destination: test1 content-type: text/plain group: input-group-1 my-in: binder: kafka destination: mytest content -type: text/plain group: input-group-2
The focus is on input and my-in, which corresponds to the previous output and my-out.
By default, the Source class corresponds to Sink, which is officially provided. The code is as follows
Package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;/** * Bindable interface with one input channel. * * @ author Dave Syer * @ author Marius Bogoevici * @ see org.springframework.cloud.stream.annotation.EnableBinding * / public interface Sink {/ * Input channel name. * / String INPUT = "input"; / * * @ return input channel. * / @ Input (Sink.INPUT) SubscribableChannel input ();
The class Consumer that calls it is used to receive messages, with the following code
Import java.time.Instant;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.time.format.FormatStyle;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.handler.annotation.Payload;@EnableBinding (Sink.class) public class Consumer {private static final Logger logger = LoggerFactory.getLogger (Consumer.class) @ StreamListener (target = Sink.INPUT) public void consume (String message) {logger.info ("recieved a string message:" + message) } @ StreamListener (target = Sink.INPUT, condition = "headers ['type'] = =' chat'") public void handle (@ Payload ChatMessage message) {final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime (FormatStyle.MEDIUM) .withZone (ZoneId.systemDefault ()); final String time = df.format (Instant.ofEpochMilli (message.getTime () Logger.info ("recieved a complex message: [{}]: {}", time, message.getContents ());}}
The MySink and MyConsumer codes of our custom channel are as follows:
Import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface MySink {String INPUT = "my-in"; @ Input (INPUT) SubscribableChannel myInput ();} import java.time.Instant;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.time.format.FormatStyle;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener Import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.handler.annotation.Payload;@EnableBinding (MySink.class) public class MyConsumer {private static final Logger logger = LoggerFactory.getLogger (MyConsumer.class); @ StreamListener (target = MySink.INPUT) public void consume (String message) {logger.info ("recieved a string message:" + message) } @ StreamListener (target = MySink.INPUT, condition = "headers ['type'] = =' chat'") public void handle (@ Payload ChatMessage message) {final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime (FormatStyle.MEDIUM) .withZone (ZoneId.systemDefault ()); final String time = df.format (Instant.ofEpochMilli (message.getTime () Logger.info ("recieved a complex message: [{}]: {}", time, message.getContents ());}}
This is OK. When we send a message using postman above, we can see it directly in the log here.
2019-10-29 18 com.wphmoon.kscsclient.MyConsumer INFO 13556-[container-0-C-1] com.wphmoon.kscsclient.MyConsumer: recieved a string message: what are you looking at? 2019-10-29 18 INFO 4314 INFO 13556-[container-0-C-1] com.wphmoon.kscsclient.Consumer: recieved a string message: what are you looking at? 6. Take another look in the kafka-manager.
The destination we define in application.yml is the topic of kafka, which can be seen in the topic list of kafka-manager.
And the consumer that receives the message can also see
This is the queen love between spring cloud stream and kafka, but their political marriage is not so simple. We'll talk about the complicated part later. Please look forward to driving back to the palace. (wild translator: The Return of the King)
At this point, I believe you have a deeper understanding of "what is the principle and function of spring cloud stream and kafka". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.