In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article shares with you the content of an example analysis of the fault avoidance mechanism designed by RocketMQ. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
In order to simplify communication with the client, NameServer does not notify the client immediately when it finds a Broker failure. The fault avoidance mechanism is used to solve the problem that when the Broker fails, the Producer cannot perceive it in time, which leads to the failure of message transmission. It is not enabled by default. If it is enabled, the failed Broker will be temporarily excluded from the queue selection list when message delivery fails.
Class MQFaultStrategy:
Public class MQFaultStrategy {private final static InternalLogger log = ClientLogger.getLog (); private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl (); private boolean sendLatencyFaultEnable = false; private long [] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long [] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long [] getNotAvailableDuration () {return notAvailableDuration;} public void setNotAvailableDuration (final long [] notAvailableDuration) {this.notAvailableDuration = notAvailableDuration } public long [] getLatencyMax () {return latencyMax;} public void setLatencyMax (final long [] latencyMax) {this.latencyMax = latencyMax;} public boolean isSendLatencyFaultEnable () {return sendLatencyFaultEnable;} public void setSendLatencyFaultEnable (final boolean sendLatencyFaultEnable) {this.sendLatencyFaultEnable = sendLatencyFaultEnable } public MessageQueue selectOneMessageQueue (final TopicPublishInfo tpInfo, final String lastBrokerName) {/ / whether to enable the failure delay mechanism if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue () .getAndIncrement (); for (int I = 0; I)
< tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //判断Queue是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums >0) {final MessageQueue mq = tpInfo.selectOneMessageQueue (); if (notBestBroker! = null) {mq.setBrokerName (notBestBroker); mq.setQueueId (tpInfo.getSendWhichQueue (). GetAndIncrement ()% writeQueueNums);} return mq } else {latencyFaultTolerance.remove (notBestBroker);}} catch (Exception e) {log.error ("Error occurred when selecting message queue", e);} return tpInfo.selectOneMessageQueue ();} / / default polling return tpInfo.selectOneMessageQueue (lastBrokerName) } public void updateFaultItem (final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {long duration = computeNotAvailableDuration (isolation? 30000: currentLatency); this.latencyFaultTolerance.updateFaultItem (brokerName, currentLatency, duration);} private long computeNotAvailableDuration (final long currentLatency) {for (int I = latencyMax.length-1; I > = 0 If -) {if (currentLatency > = latencyMax [I]) return this.notAvailableDuration [I];} return 0;}}
When choosing a lookup route, select the key steps of message queuing:
First select a message queue according to the polling algorithm
Determine whether the message queue is available from the fault list
Determine whether it is available in LatencyFaultToleranceImpl:
@ Overridepublic boolean isAvailable (final String name) {final FaultItem faultItem = this.faultItemTable.get (name); if (faultItem! = null) {return faultItem.isAvailable ();} return true;} public boolean isAvailable () {return (System.currentTimeMillis ()-startTimestamp) > = 0;}
Determine whether it is in the fault list and does not represent availability in the fault list.
Determine whether the current time is greater than or equal to the fault avoidance start time startTimestamp in the fault list
The updateFaultItem () method is called to update the fault list after the message is sent and when an exception occurs. ComputeNotAvailableDuration () calculates the length of the fault cycle according to the response time. The longer the response time, the longer the fault period. Network exception, Broker exception and client exception all have a fixed response time of 30s, and their failure cycle is 10 minutes. The message is sent successfully or the thread interrupt exception response time is less than 100 milliseconds, and the failure period is 0.
The updateFaultItem method of the LatencyFaultToleranceImpl class:
@ Overridepublic void updateFaultItem (final String name, final long currentLatency, final long notAvailableDuration) {FaultItem old = this.faultItemTable.get (name); if (null = = old) {final FaultItem faultItem = new FaultItem (name); faultItem.setCurrentLatency (currentLatency); faultItem.setStartTimestamp (System.currentTimeMillis () + notAvailableDuration); / / add fault list old = this.faultItemTable.putIfAbsent (name, faultItem) If (old! = null) {old.setCurrentLatency (currentLatency); old.setStartTimestamp (System.currentTimeMillis () + notAvailableDuration);}} else {old.setCurrentLatency (currentLatency); old.setStartTimestamp (System.currentTimeMillis () + notAvailableDuration);}}
FaultItem stores the Broker name, response time, fault avoidance start time and, most importantly, fault avoidance start time, which is used to determine whether the Queue is available
Thank you for reading! This is the end of the article on "example Analysis of the Fault avoidance Mechanism of RocketMQ Design". I hope the above content can be helpful to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 284
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.