In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces "how to solve the abnormal consumption of RocketMQ messages". In the daily operation, I believe many people have doubts about how to solve the problem of abnormal consumption of RocketMQ messages. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts of "how to solve the abnormal consumption of RocketMQ messages"! Next, please follow the editor to study!
Problem found:
In development, messages are repeatedly consumed when the project is restarted, but in fact, the messages have already been consumed.
Find the problem: 1.RocketMq console view, discover subscription group message delay
two。 From the message, the corresponding consumerGroup trackType of message Detail is not conume yet.
3. There is no error log in the project log, but according to the relevant business query database, it is found that the data has been processed 4. 5%. Business code breakpoint, did not throw any exception, through the resend message can also consume 5. 5. It is suspected that the scheduled task of rocketMq updating offset did not start.
However, the MQClientInstance timing task is normal through the source code breakpoint, but the offset updated each time is the original offet.
6. See if there is something wrong with consumption.
Since you are using spring-boot integrated client, trace the consumer source code, which is in the DefaultRocketMQListenerContainer.handleMessage method
However, everything is fine, and then follow up to DefaultMessageListenerConcurrently.
Public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@ SuppressWarnings ("unchecked") @ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt: msgs) {log.debug ("received msg: {}", messageExt); try {long now = System.currentTimeMillis (); handleMessage (messageExt) Long costTime = System.currentTimeMillis ()-now; log.debug ("consume {} cost: {} ms", messageExt.getMsgId (), costTime);} catch (Exception e) {log.warn ("consume message failed. MessageExt: {}, error: {} ", messageExt, e); context.setDelayLevelWhenNextConsume (delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER;}} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}
First of all, in the catch code block break point to see if there is a problem, the result found that did not come here, this is a fool, causing me to check the reasons from other aspects, wasting a lot of time. After debugging step by step, an exception is thrown in log.debug ("consume {} cost: {} ms", messageExt.getMsgId (), costTime); logging. This Nima can still make an exception when logging, but it is not an exception of Exception. I was relieved and thought it would be easy to solve the problem if I found the reason, but I didn't expect that this was just the beginning.
7. Check the exception information at the breakpoint in the upper-level call code ConsumeMessageConcurrentlyService
Java.lang.NoClassDefFoundError:Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter
The reason is that the exception thrown directly by MessageClientIDSetter.getUniqID (this) is called in the getMsgId method called in the MessageClientExt class.
Judging from the abnormal information, there is something wrong with MessageClientIDSetter during initialization.
8. Look at the MessageClientIDSetter source code, there is a broken static code block, and then trace the breakpoint here. Static {byte [] ip; try {ip = UtilAll.getIP ();} catch (Exception e) {ip = createFakeIP ();} LEN = ip.length + 2 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate (ip.length + 2 + 4); tempBuffer.position (0); tempBuffer.put (ip) TempBuffer.position (ip.length); tempBuffer.putInt (UtilAll.getPid ()); tempBuffer.position (ip.length + 2); tempBuffer.putInt (MessageClientIDSetter.class.getClassLoader (). HashCode ()); FIX_STRING = UtilAll.bytes2string (tempBuffer.array ()); setStartTime (System.currentTimeMillis ()); COUNTER = new AtomicInteger (0);}
Ip = UtilAll.getIP (); there is a problem, but instead of going to the catch code block, it jumps to the DefaultMqPushconsumerImpl.Another problem here is that the exception block does not have any handling and does not see the exception information. well, we can only continue the breakpoint debugging step by step.
Finally, the ipV6Check method of the UtillAll class executes to InetAddressValidator.getInstance (); there is a problem.
Private static boolean ipV6Check (byte [] ip) {if (ip.length! = 16) {throw new RuntimeException ("illegal ipv6 bytes");} InetAddressValidator validator = InetAddressValidator.getInstance (); return validator.isValidInet6Address (ipToIPv6Str (ip));}
But there is no problem debugging this code locally. Therefore, you can only debug during debug, and the error reported is a classNotFound exception.
9. This is another tricky problem, the source code looks at the class can also be found, and finally look at the maven dependency, only to find that there is a jar package conflict.
Judging from rocketMq's dependency, the version he needs is 1.6.
So we need to remove the 1.3.1 version
10. Find commons-validator that depends on version 1.3.1
Looking at this dense dependency, Idea does not have the function to find it, so it can only be found slowly. In the end, people who are no longer committed to it will remove the dependency and restart everything.
At this point, the study on "how to solve the abnormal consumption of RocketMQ messages" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un