In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you about how to use Spring Cloud Stream to play RabbitMQ,RocketMQ and Kafka. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
The development of business is more and more dependent on MQ, the status is higher and higher, and the demand for it is more and more. For example, sequential consumption, transaction messages, backtracking consumption and so on, there are also higher performance requirements. More and more trends remind us that there are better MQ solutions.
If we put the proposal of "replacing MQ from Rabbit to Rocket" on the agenda, it will be announced that this is a very huge project. In the past, many services used the characteristic code of RabbitMQ. If you want to replace the code equivalent to all services, the operational risk is huge and requires a lot of development and testing resources.
In retrospect, can we hide the feature code as much as possible when we first use rabbitmq to reserve the possibility for future upgrades and replacements?
At this point, you need to use Spring Cloud Stream, a subcomponent of Spring Cloud. It is a framework for building message-driven micro-services, which provides a set of standards for message subscription consumption to integrate message middleware from different vendors. At present, the official implementation of the integration of Kafka and RabbitMQ is provided, while Ali also implements the integration of RocketMQ.
A brief introduction to Spring Cloud Stream
Spring Cloud Stream applications are composed of third-party middleware. The communication between applications is accomplished through input channel (input channel) and output channel (output channel). These channels are injected by Spring Cloud Stream. On the other hand, the connection between the channel and the external agent is realized through Binder.
Second, RabbitMQ integration 1. Introduce the package org.springframework.cloud spring-cloud-starter-stream-rabbit2. Set message input and output channel public interface Source {String OUTPUT = "myOutput"; @ Output (OUTPUT) MessageChannel message ();} public interface Sink {String INPUT = "myInput"; @ Input (INPUT) SubscribableChannel sub1 ();}
The output channel is the sender and the input channel is the receiver of the message.
MyOutput,myInput is the channel name, and the feature is configured later through the configuration file. Keep in mind that the binding of the two channels should be defined separately, otherwise an inexplicable error may occur.
3. Message feature configuration spring cloud: stream: bindings: myOutput: destination: login-user myInput: # Channel name Corresponding to the consumer monitoring group destination: login-user # exchange group: logined-member # consumer group rabbit: bindings: myOutput: producer: routing-key-expression: headers.routingKey # sender routing key delayed-exchange: true # enable delay queue myInput: Consumer: binding-routing-key: login.user.succeed # Consumer snooping routing expression delayed-exchange: true # enable delay queue auto-bind-dlq: true # bind Dead letter queue republish-to-dlq: true # rejoin Dead letter queue with error message 1) subject name of destination message
Used in Rabbit to define exchange and to become part of queue
2) group consumption group
When no consumer group is defined, if multiple instances are started, one message is consumed at the same time.
After the consumer group is defined, multiple instances share a queue, and the load is consumed. From the figure, you can see that the queue is composed of destination.group.
Binding-routing-key: consumption routing snooping expression
Delayed-exchange: enable delay queue
Auto-bind-dlq: open the dead letter queue
Republish-to-dlq: this setting allows dead letter messages with error messages
4. Implementation of sending and receiving messages
Send a message
@ Autowiredprivate Source source;@GetMapping ("/") public void sendSucceed () {source.message () .send (MessageBuilder.withPayload ("Hello World...") .setHeader ("routingKey", "login.user.succeed") .setHeader ("version", "1.0") .setHeader ("x-delay", 5000) .build ();}
Here, you can set different header for the message to realize different functions. Each MQ has different characteristics, depending on the situation.
Receive messages
@ StreamListener (value = Sink.MY_INPUT_1, condition = "headers ['version'] = =' 1.0'") public void receiveSucceed_v1 (@ Payload String message) {String msg = "StreamReceiver v1:" + message; log.error (msg);} 5. Bind the message channel @ EnableBinding (value = {Source.class, Sink.class}) @ SpringBootApplicationpublic class RabbitApplication {public static void main (String [] args) {SpringApplication.run (RabbitApplication.class, args);}}
If you implement these five steps, you can send and receive messages normally. You will find that except for the introduction of different package and message feature configurations, the other code is abstract code without any characteristic code of rabbitmq.
III. RocketMQ integration
According to the relevant code of RabbitMQ, you only need to modify the introduction package and feature configuration to replace it with RocketMQ (except for some features).
1. Introduce the package com.alibaba.cloud spring-cloud-starter-stream-rocketmq2. Message feature configuration spring cloud: stream: bindings: myOutput: destination: login-user content-type: application/json myInput: # Channel name Corresponding to the consumer monitoring group destination: login-user # exchange group: logined-member # consumer group in the code, the same group load consumption rocketmq: binder: name-server: 127.0.0.1 rocketmq 9876 IV, Kafka integration 1. Introduce the package org.springframework.cloud spring-cloud-stream-binder-kafka2. Message feature configuration spring cloud: stream: bindings: myOutput: destination: login-user content-type: application/json myInput: # Channel name Corresponding to the consumer monitoring group destination: login-user # exchange group: logined-member # consumer group in the code, the same group load consumes the message middleware server auto-create-topics: true of kafka: binder: brokers: localhost:9092 # Kafka
As can be seen from the above three simple examples, Spring Cloud Stream highly abstracts message subscription and consumption, and uses a set of code to support a variety of message middleware. At the same time, it can also easily realize the mixed use of many kinds of message middleware, which greatly expands the play of message middleware.
The above is the editor for you to share how to use Spring Cloud Stream to play RabbitMQ,RocketMQ and Kafka, if you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 217
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.