In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
What are the details of the use of Spring Cloud Stream, I believe many inexperienced people do not know what to do, so this article summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.
Let's take a look at some of the details of using Spring Cloud Stream.
Custom message channel
In the last article, we mentioned two interfaces, Sink and Source, in which input and output channels are defined respectively, while Processor has both input and output channels by inheriting Source and Sink. Here we will define our own message channel by mimicking Sink and Source.
Based on the above, we first define an interface called MySink, as follows:
Public interface MySink {String INPUT = "mychannel"; @ Input (INPUT) SubscribableChannel input ();}
Here we define a message input channel named mychannel, the parameter of the @ Input annotation indicates the name of the message channel, and we define a method to return a SubscribableChannel object that is used to maintain the message channel subscriber. Then, we define an interface named MySource, as follows:
Public interface MySource {@ Output (MySink.INPUT) MessageChannel output ()}
The @ Output annotation describes the name of the message channel, or mychannel, and here we also define a method that returns a MessageChannel object with a method that sends messages to the message channel.
Finally, we define a message receiving class, as follows:
@ EnableBinding (value = {MySink.class}) public class SinkReceiver2 {private static Logger logger = LoggerFactory.getLogger (StreamHelloApplication.class); @ StreamListener (MySink.INPUT) public void receive (Object playload) {logger.info ("Received:" + playload);}}
OK, let's bind the message channel here, then listen to the custom message channel, and finally test it with a unit test, as follows:
@ RunWith (SpringJUnit4ClassRunner.class) @ WebAppConfiguration@SpringBootTest (classes = StreamHelloApplication.class) @ EnableBinding (MySource.class) public class StreamHelloApplicationTests {@ Autowired private MySource mySource; @ Test public void contextLoads () {mySource.output () .send (MessageBuilder.withPayload ("hello 123") .build ());}}
Running the unit test, we can see the following log, indicating that the message was sent successfully:
If you want to send an object, you can also send it directly without object conversion, as follows:
Send:
Book book = new Book (11, Romance of the three Kingdoms, Luo Guanzhong); mySource.output (). Send (MessageBuilder.withPayload (book). Build ())
Receive:
@ StreamListener (MySink.INPUT) public void receive (Book playload) {logger.info ("Received:" + playload);}
If we want to give a receipt after receiving successfully, it is also OK, as follows:
@ StreamListener (MySink.INPUT) @ SendTo (Source.OUTPUT) / / defines the message channel public String receive (Book playload) {logger.info ("Received:" + playload); return "receive msg:" + playload;}
The return value of the method is the receipt message. The receipt message is in the default output channel of the system. If we want to receive this message, of course, we need to listen to this channel, as follows:
@ StreamListener (Source.OUTPUT) public void receive2 (String msg) {System.out.println ("msg:" + msg);}
Of course, remember that the Source class is also bound in the @ EnableBinding annotation. At this point, the running result is as follows:
Consumer group
Since our service may have multiple instances running at the same time, if no setting is made, sending a message will be received by all instances at this time, but sometimes we may only want the message to be received by one instance. This requirement can be solved by message grouping. The way is simple: configure message groups and topics for the project, as follows:
Spring.cloud.stream.bindings.mychannel.group=g1spring.cloud.stream.bindings.mychannel.destination=dest1
Here we set that the project belongs to the G1 consumer group, and the subject name of the input channel is dest1. After the configuration is completed here, we will configure the message sender as follows:
Spring.cloud.stream.bindings.mychannel.destination=dest1
The message topic is also configured to be called dest1 (if sending and receiving are in the same application, it may not be configured here). OK, at this time, we launch two instances of our project, and note that the ports of the two instances are different. If we send a message again, it will only be received by one of the two instances and not by the other application, but it is uncertain which one of the two instances will receive it.
Message partition
Sometimes, we may need messages with the same characteristics to be sent to the same consumer for processing. If we simply use the consumer group, we will not be able to achieve the function. At this time, we need to use message partitioning. After message partitioning, messages with the same characteristics can always be processed by the same consumer. The configuration method is as follows (the configuration here is based on the configuration of the consumer group):
Add the following configuration to the consumer:
Spring.cloud.stream.bindings.mychannel.consumer.partitioned=truespring.cloud.stream.instance-count=2spring.cloud.stream.instance-index=0
I would like to make three points about this configuration:
1. The first line indicates that the message partition is turned on
two。 The second line represents the total number of instances of the current messenger
3. The third line represents the index of the current instance, starting from 0. When we start multiple instances, we need to configure the index on the command line at startup.
Then add the following configuration to the message producer:
Spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payloadspring.cloud.stream.bindings.mychannel.producer.partitionCount=2
The first line configuration sets the expression rules for the partition key, and the second line sets the number of message partitions.
OK, at this point we start multiple consumer instances again, and then send multiple messages repeatedly, all of which will be processed by the same consumer.
After reading the above, have you mastered the details of the use of Spring Cloud Stream? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.