In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article shows you what the RocketMQ push-pull mode is, which is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.
There are two ways for consumer clients to get messages from message middleware and consume them. Strictly speaking, RocketMQ does not implement the PUSH pattern, but wraps the pull pattern in a layer. Although the name begins with Push, it is actually implemented in Pull. Broker is constantly polled for messages through Pull. When there is no new message, Broker suspends the request until a new message is generated, cancels the suspension, and returns the new message.
1. Overview 1.1, PULL mode
The consumer client actively pulls messages from the message middleware (MQ message server agent); in Pull mode, how to set the pull frequency of Pull messages needs to be considered. For example, 1000 messages may come continuously in 1 minute, and then no new messages will be generated within 2 hours (to sum up, "message delay and busy waiting"). If the time interval of each Pull is long, it will increase the delay of messages, that is, the time it takes for messages to arrive at consumers is longer, and the accumulation of messages in MQ becomes larger; if the time interval of each Pull is shorter, but there are no messages in MQ for a period of time, then there will be a lot of invalid Pull request RPC overhead, affecting the overall network performance of MQ.
1.2, PUSH mode
Message middleware (MQ message server agent) actively pushes messages to consumers; using Push mode, messages can be sent to consumers for consumption as real-time as possible. However, when the consumer's ability to process messages is weak (for example, the process of processing a message in the consumer-side business system is more complex, and there are more call links in it, the consumption time is longer. To sum up, it is the "slow consumption problem"), while MQ keeps Push messages to consumers, and the buffer on the consumer side may overflow, resulting in exceptions.
2. PUSH mode
The active push mode is easy to implement and avoids the complexity of the business logic of the consumer side pulled. The consumption of messages can be considered to be real-time, but it also has some disadvantages, which requires the consumer side to have a strong consumption capacity.
2.1.Code implementation public class Consumer1 {public static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("consumer_push"); consumer.setNamesrvAddr ("10.10.12.203 String 9876") Consumer.subscribe ("TopicTest", "*") Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List paramList) ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") SimpleDateFormat sd = new SimpleDateFormat ("YYYY-MM-dd HH:mm:ss"); Date date = new Date (msg.getStoreTimestamp ()) System.out.println ("Consumer1=== deposit time:" + sd.format (date) + "= = MessageBody:" + msgbody) / / output message content}} catch (Exception e) {e.printStackTrace (); return ConsumeConcurrentlyStatus.RECONSUME_LATER / / try again later} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / / consumption success}}); consumer.start () System.out.println ("Consumer1=== started successfully!");} catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}
For PUSH consumption, you need to register a listener Listener, to listen for the latest messages, conduct business processing, and feedback the consumption status of messages, consumption success (CONSUME_SUCCESS) and consumption retry (RECONSUME_LATER). The message retry will resend the consumption failure record regularly according to the time interval of the configured message delay level. (PS: the delay message will focus on)
Because the PUSH message method returns the status of the message, the server will maintain the consumption progress of each consumer, record the consumption progress internally, and update the consumption progress after the message is sent successfully.
The limitation of PUSH message method is that it takes up resources when HOLD resides in Consumer requests, so it is suitable to be used in scenarios where the number of client connections can be controlled.
The previous section describes the offset of each message queue for each consumer group corresponding to each topic stored on the server.
Check the consumption progress information on the server file: / usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json
3. PULL mode 3.1and code implementation (1) public class PullConsumer {private static final Map offseTable = new HashMap (); public static void main (String [] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer ("pullConsumer"); consumer.setNamesrvAddr ("10.10.12.203private static final Map offseTable 9876); consumer.start (consumer.start); Set mqs = consumer.fetchSubscribeMessageQueues (" TopicTest ") For (MessageQueue mq: mqs) {SINGLE_MQ: while (true) {try {PullResult pullResult = consumer.pullBlockIfNotFound (mq, null, getMessageQueueOffset (mq), 32); System.out.println ("=") System.out.println ("Consume from the queue:" + mq + "offset:" + getMessageQueueOffset (mq) + "result:" + pullResult.getPullStatus ()); putMessageQueueOffset (mq, pullResult.getNextBeginOffset ()); switch (pullResult.getPullStatus ()) {case FOUND: List messageExtList = pullResult.getMsgFoundList () For (MessageExt m: messageExtList) {System.out.print (new String (m.getBody ()) + "=");} System.out.println ("); case NO_MATCHED_MSG: break Case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break } catch (Exception e) {e.printStackTrace ();} consumer.shutdown ();} private static void putMessageQueueOffset (MessageQueue mq, long offset) {offseTable.put (mq, offset) } private static long getMessageQueueOffset (MessageQueue mq) {Long offset = offseTable.get (mq); if (offset! = null) return offset; return 0;}}
Results:
Each time you pull messages, you need to provide the offset and the number of messages pulled, and you need your business to achieve the consumption progress of the queues under each topic.
Code implementation (1) this way can only pull historical messages, the latest messages can not be pulled, can also be modified to achieve continuous pull.
3.2. Code implementation (2)
In the MQPullConsumer class, there is a MessageQueueListener that notifies Consumer when the queue changes. It is this excuse that helps us achieve load balancing in Pull mode.
Note that this interface does not exist in MQPushConsumer, but there is plenty of MessageListener in the above code.
Void registerMessageQueueListener (final String topic, final MessageQueueListener listener); public interface MessageQueueListener {void messageQueueChanged (final String topic, final Set mqAll, final Set mqDivided);}
With this Listener, we can dynamically know how many MessageQueue are allocated to the current Consumer. Then for these MessageQueue, we can open a thread pool to consume.
Public class PullConsumerExtend {public static void main (String [] args) throws MQClientException {/ / Consumer group final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService ("pullConsumer"); / / MQ NameService address scheduleService.getDefaultMQPullConsumer (). SetNamesrvAddr ("10.10.12.203 String 9876") / / load balancing mode scheduleService.setMessageModel (MessageModel.CLUSTERING); / / messages to be processed topic scheduleService.registerPullTaskCallback ("TopicTest", new PullTaskCallback () {@ Override public void doPullTask (MessageQueue mq, PullTaskContext context) {MQPullConsumer consumer = context.getPullConsumer () Try {long offset = consumer.fetchConsumeOffset (mq, false); if (offset < 0) offset = 0; PullResult pullResult = consumer.pull (mq, "*", offset, 32) System.out.println ("); System.out.println (" Consume from the queue: "+ mq +" offset: "+ offset +" result: "+ pullResult.getPullStatus ()) Switch (pullResult.getPullStatus ()) {case FOUND: List messageExtList = pullResult.getMsgFoundList () For (MessageExt m: messageExtList) {System.out.print (new String (m.getBody ()) + "=");} break Case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break Default: break;} consumer.updateConsumeOffset (mq, pullResult.getNextBeginOffset ()); / / set the next pull interval context.setPullNextDelayTimeMillis (10000) } catch (Exception e) {e.printStackTrace ();}); scheduleService.start ();}}
Results:
Compared with * * code implementation (1) * *, this method is greatly improved. There is no need for the business to maintain the consumption progress of each consumption queue, but can be updated to the server.
It is also obvious that the time interval between pulling messages in each queue leads to message squeezing and fewer messages in the time period, which affects the performance of the server.
The above content is what the RocketMQ push-pull mode is, have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.