Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Principle and Application of retryAnotherBrokerWhenNotStoreOK in rocketmq

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "the principle and application of retryAnotherBrokerWhenNotStoreOK in rocketmq". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "the principle and application of retryAnotherBrokerWhenNotStoreOK in rocketmq".

Order

This paper mainly studies the retryAnotherBrokerWhenNotStoreOK of rocketmq.

DefaultMQProducer

RocketmqMuyclientMuo4.5.2Mursources.jarAcheUnigram org.apache.rocketmqbank clientplicarproducerbank DefaultMQProducer.java

Public class DefaultMQProducer extends ClientConfig implements MQProducer {private final InternalLogger log = ClientLogger.getLog (). / * Indicate whether to retry another broker on sending failure internally. * / private boolean retryAnotherBrokerWhenNotStoreOK = false; public boolean isRetryAnotherBrokerWhenNotStoreOK () {return retryAnotherBrokerWhenNotStoreOK;} public void setRetryAnotherBrokerWhenNotStoreOK (boolean retryAnotherBrokerWhenNotStoreOK) {this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;} / /.}

DefaultMQProducer has a retryAnotherBrokerWhenNotStoreOK attribute, which defaults to false

DefaultMQProducerImpl

RocketmqMusclientMube 4.5.2Mutual sources.jarpickUniqorgUniverse apacheUnigram Rocketmqbank clientimplimplplableproducerstop DefaultMQProducerImpl.java

Public class DefaultMQProducerImpl implements MQProducerInner {private final InternalLogger log = ClientLogger.getLog (); private final Random random = new Random (); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap (); private final ArrayList sendMessageHookList = new ArrayList (); private final RPCHook rpcHook; protected BlockingQueue checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList checkForbiddenHookList = new ArrayList () Private int zipCompressLevel = Integer.parseInt (System.getProperty (MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy (); private final BlockingQueue asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; / /. Private SendResult sendDefaultImpl (Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK (); Validators.checkMessage (msg, this.defaultMQProducer); final long invokeID = random.nextLong (); long beginTimestampFirst = System.currentTimeMillis (); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo (msg.getTopic ()); if (topicPublishInfo! = null & & topicPublishInfo.ok ()) {boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode = = CommunicationMode.SYNC? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed (): 1; int times = 0 String [] brokersSent = new String [timesTotal]; for (; times

< timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times >

0) {/ / Reset topic with namespace during resend. Msg.setTopic (this.defaultMQProducer.withNamespace (msg.getTopic ();} long costTime = beginTimestampPrev-beginTimestampFirst; if (timeout

< costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } //......} DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法 MQFaultStrategy rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums >

0) {final MessageQueue mq = tpInfo.selectOneMessageQueue (); if (notBestBroker! = null) {mq.setBrokerName (notBestBroker); mq.setQueueId (tpInfo.getSendWhichQueue (). GetAndIncrement ()% writeQueueNums);} return mq } else {latencyFaultTolerance.remove (notBestBroker);}} catch (Exception e) {log.error ("Error occurred when selecting message queue", e);} return tpInfo.selectOneMessageQueue ();} return tpInfo.selectOneMessageQueue (lastBrokerName) } public void updateFaultItem (final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {long duration = computeNotAvailableDuration (isolation? 30000: currentLatency); this.latencyFaultTolerance.updateFaultItem (brokerName, currentLatency, duration);} private long computeNotAvailableDuration (final long currentLatency) {for (int I = latencyMax.length-1; I > = 0 If -) {if (currentLatency > = latencyMax [I]) return this.notAvailableDuration [I];} return 0;}}

MQFaultStrategy's selectOneMessageQueue method first determines whether to enable sendLatencyFaultEnable. The default is false, and go directly to tpInfo.selectOneMessageQueue (lastBrokerName).

TopicPublishInfo

RocketmqlichInfo.java. RocketmqMuzclientMo4.5.2lur sources.jarAcheUniverse orgUniverse apacheUnigram RocketmqUniverse impltionable producermarket TopicPublishInfo.Java

Public class TopicPublishInfo {private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List messageQueueList = new ArrayList (); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex (); private TopicRouteData topicRouteData; / /. Public MessageQueue selectOneMessageQueue (final String lastBrokerName) {if (lastBrokerName = = null) {return selectOneMessageQueue ();} else {int index = this.sendWhichQueue.getAndIncrement (); for (int I = 0; I < this.messageQueueList.size (); iTunes +) {int pos = Math.abs (index++)% this.messageQueueList.size () If (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get (pos); if (! mq.getBrokerName (). Equals (lastBrokerName)) {return mq;}} return selectOneMessageQueue () }} public MessageQueue selectOneMessageQueue () {int index = this.sendWhichQueue.getAndIncrement (); int pos = Math.abs (index)% this.messageQueueList.size (); if (pos < 0) pos = 0; return this.messageQueueList.get (pos);} /.}

SelectOneMessageQueue of TopicPublishInfo executes selectOneMessageQueue when lastBrokerName is null. When MessageQueue;lastBrokerName is not null, cycle messageQueueList.size () times at most, and select a MessageQueue; whose brokerName is not lastBrokerName if none of them is selected finally through selectOneMessageQueue with no parameter.

Summary

DefaultMQProducerImpl's sendDefaultImpl method determines whether sendResult.getSendStatus () is SendStatus.SEND_OK when communicationMode is SYNC, and then determines defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK (). If so, execute continue, otherwise directly return to the sendResult;for loop to maintain lastBrokerName, and pass it every time selectOneMessageQueue (topicPublishInfo, lastBrokerName) is executed. SelectOneMessageQueue method executes mqFaultStrategy.selectOneMessageQueue (tpInfo, lastBrokerName) method.

Thank you for reading, the above is the content of "the principle and application of retryAnotherBrokerWhenNotStoreOK in rocketmq". After the study of this article, I believe you have a deeper understanding of the principle and application of retryAnotherBrokerWhenNotStoreOK in rocketmq, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report