In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains the "RocketMQ message retry scenario analysis and code implementation", the content of the article is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "RocketMQ message retry scenario analysis and code implementation"!
1. Analysis
Let's analyze the scenario in which the message will be retried.
The business consumer explicitly returns ConsumeConcurrentlyStatus.RECONSUME_LATER, that is, when the consumer processes the message business, his own business logic explicitly requires that the message be resent.
Business consumer actively / passively throws an exception
The message has not been confirmed due to network problems.
Note that in the case where an exception is thrown, broker will also resend the message as long as we explicitly or unexplicitly throw an exception in the business logic. If the business catches the exception, the message will not initiate a retry. Therefore, for businesses that need to be retried, consumers should pay attention to return ConsumeConcurrentlyStatus.RECONSUME_LATER or null when catching exceptions, output logs and print the current number of retries. It is recommended to return ConsumeConcurrentlyStatus.RECONSUME_LATER.
Broker will retry automatically only when the consumption mode is MessageModel.CLUSTERING (cluster mode), but not for broadcast messages.
For messages that have been unable to consume successfully, RocketMQ will default to 16 by default after the maximum number of retries has been reached, and deliver the message to the dead letter queue. Then we need to pay attention to the dead letter queue and do manual service compensation for the messages in the dead letter queue.
The number of retries is in the delay level, and the interval is different when the number of retries increases.
Private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
You can configure messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h in brocker and customize its time level.
2. Code implementation 2.1.Producers public class Producer {public static void main (String [] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer ("gumx_test_delay"); producer.setNamesrvAddr ("10.10.15.205 rime 9876 producer.setNamesrvAddr"); producer.start (); for (int I = 0; I)
< 1; i++) { try { Message msg = new Message("TopicDelayTest" /* Topic */, "TagA" /* Tag */, ("测试延迟消息==Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }}2.2、消费者public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("gumx_test_delay_1"); consumer.setNamesrvAddr("10.10.15.205:9876;10.10.15.206:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicDelayTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { try{ SimpleDateFormat sf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); System.out.printf("当前时间:%s 延迟级别:%s 重试次数:%s 主题:%s 延迟主题:%s 消息内容:%s %n",sf.format(new Date()),msgs.get(0).getDelayTimeLevel(),msgs.get(0).getReconsumeTimes(),msgs.get(0).getTopic(),msgs.get(0).getProperties().get("REAL_TOPIC"), new String(msgs.get(0).getBody(),"UTF-8")); int i = 1/0; //故意报错 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.printf("Consumer Started.%n"); }} 查看结果:The time rule 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h is the corresponding delay level of the default configuration. We found that there is a problem that the delay level goes directly from 0 to 3. We know that the delay level of ordinary messages is 0 by default, and the second is the message that really starts to retry. Why start with 3? Let's analyze the source code and find out.
3. Source code analysis
Let's take a look at the processing flow first.
3.1. Client code analysis
In the client source code DefaultMQPushConsumerImpl.java of RocketMQ, the retry mechanism is explained. The source code is as follows:
Private int getMaxReconsumeTimes () {/ / default reconsume times: 16 if (this.defaultMQPushConsumer.getMaxReconsumeTimes () =-1) {return 16;} else {return this.defaultMQPushConsumer.getMaxReconsumeTimes ();}}
Consumers can set the maximum number of consumption times MaxReconsumeTimes. If not, the default number of consumption times is 16 times, which is the maximum number of retries. Let's check the client code.
The inner class method ConsumeRequest.run () entry method of ConsumeMessageConcurrentlyService
Long beginTimestamp = System.currentTimeMillis (); boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {ConsumeMessageConcurrentlyService.this.resetRetryTopic (msgs); if (msgs! = null & &! msgs.isEmpty ()) {for (MessageExt msg: msgs) {MessageAccessor.setConsumeStartTimeStamp (msg, String.valueOf (System.currentTimeMillis ());} status = listener.consumeMessage (Collections.unmodifiableList (msgs), context) } catch (Throwable e) {log.warn ("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc (e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true;}
Getting the status of this batch of messages calls the ConsumeMessageConcurrentlyService.processConsumeResult () core method to process the status information it returns.
/ / ackIndex = Integer.MAX_VALUEint ackIndex = context.getAckIndex (); if (consumeRequest.getMsgs (). IsEmpty ()) return;// consumption status switch (status) {case CONSUME_SUCCESS: / / set success message subscript if (ackIndex > = consumeRequest.getMsgs (). Size ()) {ackIndex = consumeRequest.getMsgs (). Size ()-1;} int ok = ackIndex + 1 Int failed = consumeRequest.getMsgs (). Size ()-ok; this.getConsumerStatsManager (). IncConsumeOKTPS (consumerGroup, consumeRequest.getMessageQueue (). GetTopic (), ok); this.getConsumerStatsManager (). IncConsumeFailedTPS (consumerGroup, consumeRequest.getMessageQueue (). GetTopic (), failed); break; case RECONSUME_LATER: ackIndex =-1 This.getConsumerStatsManager (). IncConsumeFailedTPS (consumerGroup, consumeRequest.getMessageQueue (). GetTopic (), consumeRequest.getMsgs (). Size ()); break; default: break;} switch (this.defaultMQPushConsumer.getMessageModel ()) {case BROADCASTING: for (int I = ackIndex + 1; I)
< consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); //给broker反馈消费的进度 boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break;} 如果返回结果是 CONSUME_SUCCESS,此时 ackIndex = msg.size() - 1,,再看发送sendMessageBack 循环的条件,for (int i = ackIndex + 1; i < msg.size() ;;)从这里可以看出如果消息成功,则无需发送sendMsgBack给broker 如果返回结果是RECONSUME_LATER, 此时 ackIndex = -1 ,则这批所有的消息都会发送消息给Broker,也就是这一批消息都得重新消费。 如果发送ack消息失败,则会延迟5S后重新在消费端重新消费。 首先消费者向Broker发送ACK消息,如果发生成功,重试机制由broker处理,如果发送ack消息失败,则将该任务直接在消费者这边,再次将本次消费任务,默认演出5S后在消费者重新消费。 1)根据消费结果,设置ackIndex的值 2)如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送sendMessageBack 3) 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息) this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue())给broker发送消费状态失败则将本次失败的消息放入msgBackFailed集合中,5秒后供消费端消费。 private void submitConsumeRequestLater(final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); } }, 5000, TimeUnit.MILLISECONDS);}3.2、服务端代码分析 当消息消费失败,客户端会反馈其消费状态,Broker服务端会接收其反馈的消息消费状态的处理逻辑代码在 SendMessageProcessor.consumerSendMsgBack()方法,我们查看部分的核心源码: //设置主题%RETRY% + consumerGroupString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response;}if (!PermName.isWriteable(topicConfig.getPerm())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic)); return response;}MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); return response;}final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);//延迟级别int delayLevel = requestHeader.getDelayLevel();int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal () {maxReconsumeTimes = requestHeader.getMaxReconsumeTimes ();} / / the maximum is equal to the maximum number of retries of the message, and the message is thrown into the dead-letter queue if (msgExt.getReconsumeTimes () > = maxReconsumeTimes) | | delayLevel
< 0) { //重新设置其主题: %DLQ% + consumerGroup newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; //基础参数设置 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; }} else { //第一次delayLevel==0时则下一次默认的延迟级别是3 if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel);} 判断消息当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则重新创建Topic,规则是 %DLQ% + consumerGroup,后续处理消息send到死信队列中。 正常的消息会进入else分支,对于首次重试的消息,默认的delayLevel是0,rocketMQ会将给该level + 3,也就是加到3,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,也就是距离首次发送10s后重,其主题的默认规则是**%RETRY% + consumerGroup**。 当延时级别设置完成,刷新消息的重试次数为当前次数加1,broker将该消息刷盘,逻辑如下: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(this.getStoreHost());//刷新消息的重试次数为当前次数加msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);//将消息持久化到commitlog文件中PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); 那么什么是msgInner呢,即:MessageExtBrokerInner,也就是对重试的消息,rocketMQ会创建一个新的 MessageExtBrokerInner 对象,它实际上是继承了MessageExt。 我们继续进入消息刷盘逻辑,即putMessage(msgInner)方法,实现类为:DefaultMessageStore.java, 核心代码如下: PutMessageResult result = this.commitLog.putMessage(msg); 主要关注 this.commitLog.putMessage(msg); 这句代码,通过commitLog我们可以认为这里是真实刷盘操作,也就是消息被持久化了。 我们继续进入commitLog的putMessage方法,看到如下核心代码段: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery消息的延迟级别是否大于0 if (msg.getDelayTimeLevel() >0) {/ / if the delay level of the message is greater than the maximum delay level, set to the maximum delay level if (msg.getDelayTimeLevel () > this.defaultMessageStore.getScheduleMessageService () .getMaxDelayLevel ()) {msg.setDelayTimeLevel (this.defaultMessageStore.getScheduleMessageService (). GetMaxDelayLevel ());} / set the message subject to SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC / / ID queueId = ScheduleMessageService.delayLevel2QueueId (msg.getDelayTimeLevel ()) of the message queue set to the delayed message queue; / / the original topic of the message and the message queue are stored in the properties MessageAccessor.putProperty (msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic (); MessageAccessor.putProperty (msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf (msg.getQueueId () Msg.setPropertiesString (MessageDecoder.messageProperties2String (msg.getProperties (); msg.setTopic (topic); msg.setQueueId (queueId);}}
As you can see, if the message is retried and true is returned during the delay level judgment, the branch logic will be entered. Through this logic, we can know that for the retried message, rocketMQ will not get the message from the original queue, but will create a new Topic for message storage. That is, SCHEDULE_TOPIC in the code, take a look at what it is:
Public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"
The name of the topic is changed to: SCHEDULE_TOPIC_XXXX.
Here we can come to a conclusion:
For all consumer consumption failure messages, rocketMQ re-new the retried messages (that is, the MessageExtBrokerInner object mentioned above), then deliver them to the queue under the topic SCHEDULE_TOPIC_XXXX, and then schedule the retry by a scheduled task. The retry period corresponds to the delayLevel cycle mentioned above, that is:
Private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
At the same time, in order to ensure that the message can be found, the original topic is also stored in properties, that is, the following code
MessageAccessor.putProperty (msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic ()); MessageAccessor.putProperty (msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf (msg.getQueueId (); msg.setPropertiesString (MessageDecoder.messageProperties2String (msg.getProperties ()
Here, the original topic and queue id are backed up.
Refer to the article "RocketMQ deferred message", there is a specific analysis, message retry and delayed message processing process is the same need to create a delayed message topic queue. The background starts the scheduled task to regularly scan the messages needed to send them to the original topic and message queue for consumption, but the topic of the retry message is% RETRY_TOPIC%+ consumerGroup and its queue has only one queue0, and the delayed message is sent to the original queue of the original topic like ordinary messages.
3.3. Business processing of Dead letter
In the default processing mechanism, if we only consume the message repeatedly, the message will enter the dead letter queue after the maximum number of retries is reached.
We can also define the maximum number of retries for consumption according to the needs of the business, and determine whether the current number of consumption is equal to the threshold of the maximum number of retries for each consumption.
For example, if you think that there is an exception in the current business after three retries, there is no point in trying again. Then we can submit the current message and return it to the broker status ConsumeConcurrentlyStatus.CONSUME_SUCCES, so that the message will not be retransmitted. At the same time, the message is stored in the dead letter message table customized by our business, and the business parameters are stored in the database. The relevant operators perform the corresponding business compensation operation by querying the dead letter table.
The processing method of RocketMQ is to mark the message with the maximum number of retries (16 times) as a dead-letter message, and deliver the dead-letter message to the DLQ dead-letter queue, and the service needs to carry out human intervention. In the consumerSendMsgBack method of SendMessageProcessor, the general idea is to first determine whether the number of retries is more than 16 or whether the message sending delay level is less than 0. If it has exceeded 16 or the sending delay level is less than 0, the message is set to a new dead letter. Dead letter topic is:% DLQ%+consumerGroup.
The figure shows the flow of messages involved in the whole message retry between related topics.
Thank you for your reading. The above is the content of "scenario analysis and code implementation of message retry in RocketMQ". After the study of this article, I believe you have a deeper understanding of the scenario analysis and code implementation of message retry in RocketMQ, and the specific usage needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.