In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly shows you "SpringBoot how to use RocketMQ", the content is easy to understand, clear, hope to help you solve your doubts, the following let the editor lead you to study and learn "SpringBoot how to use RocketMQ" this article.
What is RocketMQ? #
Official statement:
As more and more queues and virtual topics are used, the ActiveMQ IO module encounters a bottleneck. We try our best to solve this problem by throttling, circuit breaker or downgrade, but the effect is not good. So we started to focus on the popular messaging solution Kafka at that time. Unfortunately, Kafka does not meet our requirements, especially in terms of low latency and high reliability.
You can see here that RcoketMQ is a low-latency, highly reliable, scalable, easy-to-use message middleware.
Has the following characteristics:
Support for publish / subscribe (Pub/Sub) and peer-to-peer (P2P) message models can guarantee strict message order, reliable first-in-first-out (FIFO) and strict sequential delivery in a queue provide rich message pull mode, support pull and push message mode, single queue million message accumulation capability, 100 million message accumulation capability supports multiple message protocols Distributed and highly available deployment architectures, such as JMS and MQTT, that satisfy at least one message passing semantics
RocketMQ environment installation #
Download address: https://rocketmq.apache.org/dowloading/releases/
Download the binary or source code from the official to use it. Maven3.2x,JDK8 is required for source code compilation
Package in the root directory:
Mvn-Prelease-all-DskipTests clean packager-U
There will be a folder version of zip,tar in the distribution/target/apache-rocketmq folder with three complete programs that can be run.
Use rocketmq-4.6.0.zip:
Start the name service mqnamesrv.cmd start the data center mqbroker.cmd-n localhost:9876
Using RocketMQ# in SpringBoot environment
Getting started with SpringBoot: https://www.jb51.net/article/177449.htm
Start: https://www.jb51.net/article/177451.htm commonly used in SpringBoot
The current environment version is:
SpringBoot 2.0.6.RELEASE SpringCloud Finchley.RELEASE SpringCldod Alibaba 0.2.1.RELEASE RocketMQ 4.3.0
Import in the project project:
Org.apache.rocketmq rocketmq-client ${rocketmq.version}
Since we already have a project here, we are no longer in the process of creating it. The main thing is to see how to use RocketMQ.
Create a RocketMQProperties configuration property class, which contains the following:
@ ConfigurationProperties (prefix = "rocketmq") public class RocketMQProperties {private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerConsumeThreadMin = 5; private int consumerConsumeThreadMax = 30; private int consumerConsumeMessageBatchMaxSize = 1; / / omit get set}
Now the producers and consumers in all our subsystems correspond to:
Whether isEnable enables mq
NamesrvAddr cluster address
GroupName group name
Set to unified to facilitate system interfacing. If other requirements are being expanded, we have given a default value in the class and you can obtain the configuration in the configuration file or configuration center. The configuration is as follows:
# the setting for sending the same type of message is the same group, which is unique and does not need to be set by default. Rocketmq will use ip@pid (pid stands for jvm name) as the only nameserver address indicating whether rocketmq.groupName=please_rename_unique_group_name# is enabled to automatically configure rocketmq.isEnable=true#mq. The maximum length of rocketmq.namesrvAddr=127.0.0.1:9876# messages defaults to 10244th (4m) rocketmq.producer.maxMessageSize=4096# send message timeout, and the default number of failed retries when 3000rocketmq.producer.sendMsgTimeout=3000# sends messages. Default number of 2rocketmq.producer.retryTimesWhenSendFailed=2# consumer threads rocketmq.consumer.consumeThreadMin=5rocketmq.consumer.consumeThreadMax=32# sets the number of messages consumed at a time, default is 1 rocketmq.consumer.consumeMessageBatchMaxSize=1
Create a consumer interface RocketConsumer.java that users need to constrain the core steps that consumers need:
/ * Consumer Interface * * @ author SimpleWu * * / public interface RocketConsumer {/ * initialize consumers * / public abstract void init (); / * register snooping * * @ param messageListener * / public void registerMessageListener (MessageListener messageListener);}
Create an abstract consumer AbstractRocketConsumer.java:
/ * basic consumer information * * @ author SimpelWu * / public abstract class AbstractRocketConsumer implements RocketConsumer {protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitel; protected MQPushConsumer mqPushConsumer; / * * necessary information * * @ param topics * @ param tags * @ param consumerTitel * / public void necessary (String topics, String tags, String consumerTitel) {this.topics = topics; this.tags = tags; this.consumerTitel = consumerTitel;} public abstract void init () @ Override public void registerMessageListener (MessageListener messageListener) {this.messageListener = messageListener;}}
We must specify this topics,tags and message listening logic in the class
Public abstract void init (); this method is used to initialize the consumer and is implemented by a subclass.
Next we write the autoconfiguration class RocketMQConfiguation.java, where users initialize a default producer connection and load all consumers.
@ EnableConfigurationProperties ({RocketMQProperties.class}) uses this profile
@ Configuration is annotated as a configuration class
@ ConditionalOnProperty (prefix = "rocketmq", value = "isEnable", havingValue = "true") takes effect only when rocketmq.isEnable = true is specified in the configuration
The core content is as follows:
/ * mq configuration * * @ author SimpleWu * / @ Configuration@EnableConfigurationProperties ({RocketMQProperties.class}) @ ConditionalOnProperty (prefix = "rocketmq", value = "isEnable", havingValue = "true") public class RocketMQConfiguation {private RocketMQProperties properties; private ApplicationContext applicationContext; private Logger log = LoggerFactory.getLogger (RocketMQConfiguation.class); public RocketMQConfiguation (RocketMQProperties properties, ApplicationContext applicationContext) {this.properties = properties; this.applicationContext = applicationContext } / * inject a default consumer * @ return * @ throws MQClientException * / @ Bean public DefaultMQProducer getRocketMQProducer () throws MQClientException {if (StringUtils.isEmpty (properties.getGroupName () {throw new MQClientException (- 1, "groupName is blank");} if (StringUtils.isEmpty (properties.getNamesrvAddr () {throw new MQClientException (- 1, "nameServerAddr is blank");} DefaultMQProducer producer; producer = new DefaultMQProducer (properties.getGroupName ()); producer.setNamesrvAddr (properties.getNamesrvAddr ()) / / producer.setCreateTopicKey ("AUTO_CREATE_TOPIC_KEY"); / / if you need different producer in the same jvm to send messages to different mq clusters, you need to set different instanceName / / producer.setInstanceName (instanceName); producer.setMaxMessageSize (properties.getProducerMaxMessageSize ()); producer.setSendMsgTimeout (properties.getProducerSendMsgTimeout ()); / / if you fail to send the message, set the number of retries. Default is 2 producer.setRetryTimesWhenSendFailed (properties.getProducerRetryTimesWhenSendFailed ()); try {producer.start () Log.info ("producer is start! GroupName: {}, namesrvAddr: {} ", properties.getGroupName (), properties.getNamesrvAddr ();} catch (MQClientException e) {log.error (String.format (" producer is error {} ", e.getMessage (), e)); throw e;} return producer;} / * SpringBoot load all consumers at startup * / @ PostConstruct public void initConsumer () {Map consumers = applicationContext.getBeansOfType (AbstractRocketConsumer.class) If (consumers = = null | | consumers.size () = 0) {log.info ("init rocket consumer 0");} Iterator beans = consumers.keySet (). Iterator (); while (beans.hasNext ()) {String beanName = (String) beans.next (); AbstractRocketConsumer consumer = consumers.get (beanName); consumer.init (); createConsumer (consumer); log.info ("init success consumer title {}, toips {}, tags {}", consumer.consumerTitel, consumer.tags, consumer.topics) }} / * create consumers through consumer confidence * * @ param consumerPojo * / public void createConsumer (AbstractRocketConsumer arc) {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (this.properties.getGroupName ()); consumer.setNamesrvAddr (this.properties.getNamesrvAddr ()); consumer.setConsumeThreadMin (this.properties.getConsumerConsumeThreadMin ()); consumer.setConsumeThreadMax (this.properties.getConsumerConsumeThreadMax ()); consumer.registerMessageListener (arc.messageListenerConcurrently) / * set whether consumption starts at the head of the queue or at the end of the queue when the Consumer starts for the first time. If it is not the first time, the consumption will be continued according to the location of the last consumption * / consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); / * set the consumption model, cluster or broadcast. Default is cluster * / / consumer.setMessageModel (MessageModel.CLUSTERING). / * set the number of messages consumed at a time. Default is 1 * / consumer.setConsumeMessageBatchMaxSize (this.properties.getConsumerConsumeMessageBatchMaxSize ()); try {consumer.subscribe (arc.topics, arc.tags); consumer.start (); arc.mqPushConsumer=consumer;} catch (MQClientException e) {log.error ("info consumer title {}", arc.consumerTitel, e);}
Then add an automatic configuration class to the directory and file META-INF/spring.factories in the src/main/resources folder to enable the launch configuration. We only need to import dependencies:
Org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.xcloud.config.rocketmq.RocketMQConfiguation
Next, import the dependency into the service, and then create the consumer by getting all the necessary information from our abstract class, which is done after all consumers are initialized, and only consumers who are Spring Bean are managed.
Let's take a look at how to create a consumer. The steps to create a consumer are very simple. You only need to inherit AbstractRocketConsumer and add the @ Component of Spring to complete the creation of the consumer. You can customize the themes and tags of the consumer in the class.
Whether to continue to start the project when consumer creation fails according to the requirements of the project.
Create a default consumer DefaultConsumerMQ.java
@ Componentpublic class DefaultConsumerMQ extends AbstractRocketConsumer {/ * initialize consumer * / @ Override public void init () {/ / set theme, label and consumer title super.necessary ("TopicTest", "*", "this is the title") / / Consumer specific execution logic registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {msgs.forEach (msg-> {System.out.printf ("consumer message boyd% s% n", new String (msg.getBody ());}); / / marks that the message has been successfully consumed by return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}}
Super.necessary ("TopicTest", "*", "this is the title"); it must be set to monitor all tags under the TopicTest theme on behalf of the consumer. The title field is defined by me, so it doesn't make any sense for this configuration.
We can inject Spring's Bean here for arbitrary logic processing.
Create a message sending class for testing
@ Overridepublic String qmtest (@ PathVariable ("name") String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {Message msg = new Message ("TopicTest", "tags1", name.getBytes (RemotingHelper.DEFAULT_CHARSET)); / / send a message to a Broker SendResult sendResult = defaultMQProducer.send (msg); / / return whether the message was successfully delivered to System.out.printf ("% s% n", sendResult) via sendResult; return null;}
Let's pass the Http request test:
Http://localhost:10001/demo/base/mq/hello consumer message boyd hello http://localhost:10001/demo/base/mq/. Hey, consumer message boyd.
All right, the design of the simple start is completed, and there are a series of functions such as sequential message production, sequential consumption message, asynchronous message production and so on. Officials can refer to the official to deal with it on their own.
ActiveMQ has not been verified in large-scale throughput scenarios, and the community is not high and inactive. The dynamic expansion of RabbitMQ cluster is troublesome, and it is not difficult to customize with the current program language. Kafka supports the main MQ functions, which can not meet the requirements of the program, so it is not used, and it is not difficult to customize with the current program language. After the baptism of women all over the world, rocketMQ has become very powerful; MQ is relatively complete, distributed and scalable; and supports complex MQ business scenarios. (business complexity can be the first choice)
The above is all the content of the article "how SpringBoot uses RocketMQ". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.