In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
one。 Preface
RocketMQ supports re-consumption when message consumption fails. The specific code is as follows:
/ * Register callback to execute on arrival of messages fetched from brokers. * / consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {System.out.printf (Thread.currentThread () .getName () + "% n"); return ConsumeConcurrentlyStatus.RECONSUME_LATER;}})
That is, you need to return ConsumeConcurrentlyStatus.RECONSUME_LATER, so how does this happen?
two。 Code flow
Take push asynchronous messages as an example. The process of message consumption can refer to https://blog.51cto.com/483181/2056301.
We successfully got a message from the client, that is, DefaultMQPushConsumerImpl.pullMessage.
2.1. Get message callback public void pullMessage (final PullRequest pullRequest) {... PullCallback pullCallback = new PullCallback () {@ Override public void onSuccess (PullResult pullResult) {if (pullResult! = null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult (pullRequest.getMessageQueue (), pullResult, subscriptionData) Switch (pullResult.getPullStatus ()) {case FOUND:... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest (pullResult.getMsgFoundList (), processQueue, pullRequest.getMessageQueue (), dispathToConsume); Break;} @ Override public void onException (Throwable e) {...}};... Try {this.pullAPIWrapper.pullKernelImpl (pullRequest.getMessageQueue (), subExpression, subscriptionData.getExpressionType (), subscriptionData.getSubVersion (), pullRequest.getNextOffset (), this.defaultMQPushConsumer.getPullBatchSize (), sysFlag, commitOffsetValue) BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback) } catch (Exception e) {}}
After pullKernelImpl gets the message, if it is an asynchronous request, the pullCallback will be called back, and we assume that we successfully got the message, that is, the FOUND branch.
Then the submit consumption request: consumeMessageService.submitConsumeRequest will be invoked
2.2 submit message request
SubmitConsumeRequest has two implementation classes, one is pull and the other is push.
Let's take push as an example.
2.3 push submit message request
ConsumeMessageConcurrentlyService.java
Public void submitConsumeRequest (final List msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize ();... ConsumeRequest consumeRequest = new ConsumeRequest (msgThis, processQueue, messageQueue); try {this.consumeExecutor.submit (consumeRequest);} catch (RejectedExecutionException e) {}}
Initialize a ConsumeRequest Runnable object and submit it to the thread pool consumeExecutor, so let's move on to ConsumeRequest.
2.4 ConsumeRequestclass ConsumeRequest implements Runnable {private final List msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; @ Override public void run () {... Try {... Status = listener.consumeMessage (Collections.unmodifiableList (msgs), context);} catch (Throwable e) {.... }.... If (! processQueue.isDropped ()) {ConsumeMessageConcurrentlyService.this.processConsumeResult (status, context, this);} else {log.warn ("processQueue is dropped without process consume result. MessageQueue= {}, msgs= {}", messageQueue, msgs);}}
First of all, take a look at the try-catch, which is the callback client to consume messages.
Listener.consumeMessage (Collections.unmodifiableList (msgs), context)
Just as our message consumption is rewritten as follows:
/ * Register callback to execute on arrival of messages fetched from brokers. * / consumer.registerMessageListener (new MessageListenerConcurrently () {@ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {System.out.printf (Thread.currentThread () .getName () + "% n"); return ConsumeConcurrentlyStatus.RECONSUME_LATER;}})
Then call processConsumeResult according to the returned status to process the returned result
2.5 processConsumeResultpublic void processConsumeResult (final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) {... Switch (this.defaultMQPushConsumer.getMessageModel ()) {case BROADCASTING: for (int I = ackIndex + 1; I
< consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: log.info("CLUSTERING..."); List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } log.info("msgBackFailed.isEmpty() [{}]", msgBackFailed.isEmpty()); if (!msgBackFailed.isEmpty()) { log.info("msgBackFailed [{}]", msgBackFailed); consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } ... }2.5.1 首先会判断消费模式是集群还是广播模式,如果广播模式,就日志记录下不处理了。 如果是集群模式,那么把调用sendMessageBack发送消息到broker。等待下一次broker重新消费消息。 如果发送失败,那么立即就会消费消息。 2.6 发送消息到Brokerpublic boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); try { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; } catch (Exception e) { ... } return false; }public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { .... } }public void consumerSendMessageBack( final String addr, final MessageExt msg, final String consumerGroup, final int delayLevel, final long timeoutMillis, final int maxConsumeRetryTimes ) throws RemotingException, MQBrokerException, InterruptedException { ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); requestHeader.setGroup(consumerGroup); requestHeader.setOriginTopic(msg.getTopic()); requestHeader.setOffset(msg.getCommitLogOffset()); requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); log.info("addr [{}] request [{}] timeoutMillis [{}]", addr, request, timeoutMillis); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); } 简单的调用api通过netty发到broker而已,请求码是RequestCode.CONSUMER_SEND_MSG_BACK public static final int CONSUMER_SEND_MSG_BACK = 36; Broker会把消息存储到文件中,当然也会让它的reconsume次数+1 具体可以参考SendMessageProcessor.proce***equest方法,这个后续再讲Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 205
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.