In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces you how to understand the RocketMQ consumption location, the content is very detailed, interested friends can use for reference, I hope it can be helpful to you.
RocketMQ specified the Topic theme and Tag when creating consumers. We found that the newly created consumers could not consume historical data, but could only consume the data sent by consumers after creation. What is the reason for this? can we consume all the news? Can we specify the time of the message to be consumed? The answer is yes, let's analyze it in detail.
Premise: our discussion is in cluster mode, and the broadcast mode is the same, but we use cluster mode to discuss the sample code.
The location of message consumption currently provides three ways: CONSUME_FROM_LAST_OFFSET (queue tail consumption), CONSUME_FROM_FIRST_OFFSET (queue head consumption), and CONSUME_FROM_TIMESTAMP (specified consumption time point).
Public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET, @ Deprecated CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, @ Deprecated CONSUME_FROM_MIN_OFFSET, @ Deprecated CONSUME_FROM_MAX_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP,}
There are 6 ways to analyze the source code, and the other three have been abandoned and will not be discussed.
1. Consume from the end of the queue (default)
When we create a new consumer group to consume messages under a topic, the historical message is not consumed, and when the producer resends the message, it will receive the latest one. Let's analyze where it is set.
Some parameters are built in when the consumer is created, which is consumed from the end of the queue.
Consumption from the tail of the queue leads to the consumption of historical messages, which leads to the loss of part of the data, which can be set if it is only status data, and if the business data causes the data loss.
Setting this parameter takes effect only when the consumer group is created for the first time, but not later, because the consumer group has recorded the progress of consumption on the server and has a progress position.
Check the location of the consumption progress file, and we view the progress of the message consumption of this consumer_test_clustering consumer group under the TopicTest topic according to the contents of the previous sections. View the information on the Broker-a server node.
Check the progress of consumption according to the visual interface.
Check the consumption progress information on the server file: / usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json
2. Consume from the queue header
Write Consumer
Public class Consumer1 {public static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("consumer_first_offset"); consumer.setNamesrvAddr ("10.10.12.203 String 9876") Consumer.subscribe ("TopicTest", "*"); consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET) Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List paramList) ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") SimpleDateFormat sd = new SimpleDateFormat ("YYYY-MM-dd HH:mm:ss"); Date date = new Date (msg.getStoreTimestamp ()) System.out.println ("Consumer1=== deposit time:" + sd.format (date) + "= = MessageBody:" + msgbody) / / output message content}} catch (Exception e) {e.printStackTrace (); return ConsumeConcurrentlyStatus.RECONSUME_LATER / / try again later} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / / consumption success}}); consumer.start () System.out.println ("Consumer1=== started successfully!");} catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}
Set up the consumption group: consumer.setConsumerGroup ("consumer_first_offset")
Set the consumption location: consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET)
View its results
Consumption from scratch means that all messages currently stored on broker are consumed once, because RocketMQ persists messages to disk files, and a long time will lead to a large number of disk files. RocketMQ has a mechanism that only retains messages for a period of time. Previous messages will be deleted and can be deleted at a specified time point (no matter whether the message is consumed or not, the file will be deleted at the point in time)
3. Consume from a specified point in time
Consumer code
Public class Consumer1 {public static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("consumer_time_offset"); consumer.setNamesrvAddr ("10.10.12.203 String 9876") Consumer.subscribe ("TopicTest", "*"); / / you can set when to start consumption and use the format yyyyMMddhhmmss consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) half an hour ago by default with setConsumeTimest consumer.setConsumeTimestamp (UtilAll.timeMillisToHumanString3 (System.currentTimeMillis ()-1800000L)) Consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List paramList) ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") SimpleDateFormat sd = new SimpleDateFormat ("YYYY-MM-dd HH:mm:ss"); Date date = new Date (msg.getStoreTimestamp ()) System.out.println ("Consumer1=== deposit time:" + sd.format (date) + "= = MessageBody:" + msgbody) / / output message content}} catch (Exception e) {e.printStackTrace (); return ConsumeConcurrentlyStatus.RECONSUME_LATER / / try again later} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; / / consumption success}}); consumer.start () System.out.println ("Consumer1=== started successfully!");} catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}
Set consumption location: consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); set consumption time point: consumer.setConsumeTimestamp ("20181222171201")
If the offset read from the message progress service OffsetStore to MessageQueue is greater than or equal to 0, the read offset is used, and the above strategy will take effect only if the read offset is less than 0.
On how to understand the location of RocketMQ consumption to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.