Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the RocketMQ of 14.1k Star on Github

2025-01-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains "how to understand the RocketMQ of 14.1k Star on Github". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to understand the RocketMQ of 14.1k Star on Github".

A macro overview

RocketMQ transaction message delivery process:

From the point of view of the source code, the sendMessageInTransaction method of RocketMQ's transaction message TransactionMQProducer actually calls the sendMessageInTransaction method of DefaultMQProducerImpl. We enter the sendMessageInTransaction method, and the whole process of sending the transaction message is clearly visible.

First, do a pre-send check and fill in the necessary parameters, including setting the prepare transaction message.

Source code list-1

Public TransactionSendResult sendMessageInTransaction (final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {TransactionListener transactionListener = getCheckListener (); if (null = = localTransactionExecuter & & null = = transactionListener) {throw new MQClientException ("tranExecutor is null", null);} / / ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel ()! = 0) {MessageAccessor.clearProperty (msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);} Validators.checkMessage (msg, this.defaultMQProducer) SendResult sendResult = null; MessageAccessor.putProperty (msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty (msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup ())

Enter the sending process:

Source code list-2

Try {sendResult = this.send (msg);} catch (Exception e) {throw new MQClientException ("send message Exception", e);}

Decide whether the local transaction is executed according to the processing result returned by broker. If the semi-message is sent successfully, the local transaction execution starts:

Source code list-3

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus ()) {case SEND_OK: {try {if (sendResult.getTransactionId ()! = null) {msg.putUserProperty ("_ _ transactionId__", sendResult.getTransactionId ()) } String transactionId = msg.getProperty (MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null! = transactionId & &! ".equals (transactionId)) {msg.setTransactionId (transactionId);} if (null! = localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch (msg, arg) } else if (transactionListener! = null) {log.debug ("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction (msg, arg);} if (null = = localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW } if (localTransactionState! = LocalTransactionState.COMMIT_MESSAGE) {log.info ("executeLocalTransactionBranch return {}", localTransactionState); log.info (msg.toString ());}} catch (Throwable e) {log.info ("executeLocalTransactionBranch exception", e) Log.info (msg.toString ()); localException = e;} break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: / / when the standby broker status is not available, the half message will be rolled back and the local transaction localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break will not be executed. Default: break;}

The execution of the local transaction ends, and the two-phase processing is carried out according to the status of the local transaction:

Source code list-4

Try {this.endTransaction (sendResult, localTransactionState, localException);} catch (Exception e) {log.warn ("local transaction execute" + localTransactionState + ", but end broker transaction failed", e);} / / assemble and send results / /. Return transactionSendResult;}

Next, we delve into each stage of the code analysis.

Send the inside story in Ⅰ stage

Focus on the analysis of send method. After entering the send method, we find that the SYNC synchronization mode is used in the transaction message phase of RocketMQ:

Source code list-5

Public SendResult send (Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl (msg, CommunicationMode.SYNC, null, timeout);}

This is easy to understand. After all, transaction messages decide whether or not to execute a local transaction based on the result of a phase, so be sure to block the ack waiting for broker.

We go into DefaultMQProducerImpl.java to see the implementation of the sendDefaultImpl method, and by reading the code of this method, we try to understand the behavior of producer during the first phase of sending a transaction message.

It is worth noting that this method is not customized for transaction messages, or even for SYNC synchronization mode, so after reading this code, we can basically have a more comprehensive understanding of the message sending mechanism of RocketMQ.

This code is so logical that I can't bear to slice it. In order to save space, replace the more complicated but less informative parts of the code with comments to preserve the integrity of the process as much as possible. The parts that I think are more important or easy to be ignored are marked with notes, and some details are interpreted in detail later.

Source code list-6

Private SendResult sendDefaultImpl (Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK (); / / 1. Message validity verification. See Validators.checkMessage (msg, this.defaultMQProducer); final long invokeID = random.nextLong (); long beginTimestampFirst = System.currentTimeMillis (); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; / / to get the routing information of the current topic, mainly to broker. If not found, get TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo (msg.getTopic ()) from namesrv; if (topicPublishInfo! = null & & topicPublishInfo.ok ()) {boolean callTimeout = false MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; / / II. Send retry mechanism. See int timesTotal = communicationMode = = CommunicationMode.SYNC? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed (): 1; int times = 0; String [] brokersSent = new String [timesTotal]; for (; times

< timesTotal; times++) { // 第一次发送是mq == null, 之后都是有broker信息的 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 三、rocketmq发送消息时如何选择队列?--broker异常规避机制 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times >

0) {/ / Reset topic with namespace during resend. Msg.setTopic (this.defaultMQProducer.withNamespace (msg.getTopic ();} long costTime = beginTimestampPrev-beginTimestampFirst; if (timeout

< costTime) { callTimeout = true; break; } // 发送核心代码 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // rocketmq 选择 broker 时的规避机制,开启 sendLatencyFaultEnable == true 才生效 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { // 四、RocketMQ的三种CommunicationMode。见后文 case ASYNC: // 异步模式 return null; case ONEWAY: // 单向模式 return null; case SYNC: // 同步模式 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // ... // 自动重试 } catch (MQClientException e) { // ... // 自动重试 } catch (MQBrokerException e) { // ... // 仅返回码==NOT_IN_CURRENT_UNIT==205 时自动重试 // 其他情况不重试,抛异常 } catch (InterruptedException e) { // ... // 不重试,抛异常 } } else { break; } } if (sendResult != null) { return sendResult; } // 组装返回的info信息,最后以MQClientException抛出 // ... ... // 超时场景抛RemotingTooMuchRequestException if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } // 填充MQClientException异常信息 // ... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}一、消息有效性校验 源码清单-7 Validators.checkMessage(msg, this.defaultMQProducer); 在此方法中校验消息的有效性,包括对 topic 和消息体的校验。topic 的命名必须符合规范,且避免使用内置的系统消息 TOPIC。消息体长度 >

0 & & message body length defaultMQProducer.getMaxMessageSize () {throw new MQClientException (ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX:" + defaultMQProducer.getMaxMessageSize ());}} II. Send retry mechanism

Producer automatically retries when the message is sent unsuccessfully, up to a maximum of 3 times as retryTimesWhenSendFailed + 1.

It is worth noting that not all abnormal cases will be retried. The information that can be extracted from the above source code tells us that it will be retried automatically in the following three cases:

When one of the two RemotingException,MQClientException exceptions occurs

When a MQBrokerException exception occurs and ResponseCode is NOT_IN_CURRENT_UNIT = 205,

In SYNC mode, no exception occurs and the sending result status is not SEND_OK

Before each message is sent, check whether the time spent in the previous two steps is too long (the timeout default is 3000ms). If so, the message will not be sent and the timeout will be returned directly without retrying. Here are two problems:

Producer internal automatic retry is imperceptible to business applications, and the sending time seen by the application includes the time consuming of all retries.

Once the timeout means that the message transmission has ended in failure, because of the timeout. This information will eventually be thrown in the form of RemotingTooMuchRequestException.

It should be pointed out here that in the RocketMQ official document, the sending timeout is 10s, that is, 10000ms, and many people on the Internet also think that the timeout of rocketMQ is 10s. However, 3000ms is clearly written in the code, and finally I confirmed after debug that the default timeout is indeed 3000ms.

Third, the exception avoidance mechanism of broker.

Source code list-9

MessageQueue mqSelected = this.selectOneMessageQueue (topicPublishInfo, lastBrokerName)

This line of code is the process of selecting queue before sending.

This involves a core mechanism that is highly available for sending RocketMQ messages, latencyFaultTolerance. This mechanism is part of Producer load balancer and is controlled by the value of sendLatencyFaultEnable. False is disabled by default, and broker failure delay mechanism is not enabled. When the value is true, broker failure delay mechanism is enabled, which can be actively enabled by Producer.

If the exception avoidance mechanism is enabled when the queue is selected, the current poor broker proxy is avoided according to the working status of the broker. Unhealthy broker will be avoided within a period of time. If the exception avoidance mechanism is not enabled, the next queue will be selected sequentially. However, in the retry scenario, the queue that is different from the last broker sent will be selected as far as possible. Each time the message is sent, the state information of the broker is maintained through the updateFaultItem method.

Source code list-10

Public void updateFaultItem (final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {/ / calculate the delay. Isolation indicates whether the broker needs to be isolated. If so, find the first delay value less than 30s from 30s, and then press the subscript to determine the evasion period. If 30s, it is 10min evasion; / / otherwise, determine the evasion period according to the last sending time. Long duration = computeNotAvailableDuration (isolation? 30000: currentLatency); this.latencyFaultTolerance.updateFaultItem (brokerName, currentLatency, duration);}}

Dig inside the selectOneMessageQueue method to find out:

Source code list-11

Public MessageQueue selectOneMessageQueue (final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {/ / enable exception avoidance try {int index = tpInfo.getSendWhichQueue () .getAndIncrement (); for (int I = 0; I

< tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; // 按顺序取下一个message queue作为发送的queue MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 当前queue所在的broker可用,且与上一个queue的broker相同, // 或者第一次发送,则使用这个queue if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums >

0) {final MessageQueue mq = tpInfo.selectOneMessageQueue (); if (notBestBroker! = null) {mq.setBrokerName (notBestBroker); mq.setQueueId (tpInfo.getSendWhichQueue (). GetAndIncrement ()% writeQueueNums);} return mq;} else {latencyFaultTolerance.remove (notBestBroker) }} catch (Exception e) {log.error ("Error occurred when selecting message queue", e);} return tpInfo.selectOneMessageQueue ();} / / if exception avoidance is not enabled, Queue return tpInfo.selectOneMessageQueue (lastBrokerName) is selected randomly and incrementally;} four, three CommunicationMode of RocketMQ

Source code list-12

Public enum CommunicationMode {SYNC, ASYNC, ONEWAY,}

The above three modes all refer to the stage in which the message arrives at broker from the sender, and does not include the process of broker delivering the message to the subscriber. The difference between the three modes of transmission:

* * unidirectional mode: * * ONEWAY. The sender of the message just sends it and does not care about the result of the broker processing. In this mode, due to the small processing flow, the sending time is very small, and the throughput is large, but it can not guarantee that the message is reliable and not lost. It is often used in message scenarios with large but unimportant traffic, such as heartbeat transmission.

* * Asynchronous mode: * * ASYNC. After the message sender sends the message to broker, it does not need to wait for broker processing, it gets the return value of null, and an asynchronous thread processes the message. After processing, the sender tells the sender the result in the form of callback. If there is an exception during asynchronous processing, there will be an internal retry before returning the failed result of the sender (default 3 times, the sender is not aware of it). In this mode, the sender has the advantages of short waiting time, large throughput and reliable message, which is used in large but important message scenarios.

* * synchronization mode: * * SYNC. The message sender needs to wait for the broker processing to complete and clearly return the success or failure. Before the message sender gets the result of the message delivery failure, it will also undergo an internal retry (default 3 times, the sender is not aware of it). In this mode, the sender will block waiting for the message processing result. The waiting time is long and the message is reliable, which is used in message scenarios where the traffic is small but important. It should be emphasized that the processing of one-and-a-half transaction messages in transaction messages is synchronous mode.

You can also see specific implementation differences in the sendKernelImpl method. The ONEWAY mode is the simplest and does no processing. Among the sendMessage method parameters responsible for sending, compared with synchronous mode, asynchronous mode has more callback methods, topicPublishInfo containing topic sending route meta information, instance containing sending broker information, producer containing sending queue information, and number of retries. In addition, in asynchronous mode, messages with compression will be copy first.

Source code list-13

Switch (communicationMode) {case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) {/ / If msgbody was compressed, msgbody should be reset using prevBody. / / Clone new message using commpressed message body and recover origin massage. / / Fix bug: https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage (msg); messageCloned = true; msg.setBody (prevBody) } if (topicWithNamespace) {if (! messageCloned) {tmpMessage = MessageAccessor.cloneMessage (msg); messageCloned = true } msg.setTopic (NamespaceUtil.withoutNamespace (msg.getTopic (), this.defaultMQProducer.getNamespace ();} long costTimeAsync = System.currentTimeMillis ()-beginStartTime; if (timeout)

< costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } 官方文档中有这样一张图,十分清晰的描述了异步通信的详细过程:

Ⅱ phase sending

Source code listing-3 reflects the execution of the local transaction, and localTransactionState associates the execution result of the local transaction with the two-phase sending of the transaction message.

It is worth noting that if the result of the first phase is SLAVENOTAVAILABLE, even if the broker is not available, the localTransactionState will be set to Rollback, and the local transaction will not be executed. After that, the endTransaction method is responsible for the two-phase submission, as shown in source code listing-4. Specific to the implementation of endTransaction:

Source code list-14

Public void endTransaction (final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id; if (sendResult.getOffsetMsgId ()! = null) {id = MessageDecoder.decodeMessageId (sendResult.getOffsetMsgId ());} else {id = MessageDecoder.decodeMessageId (sendResult.getMsgId ());} String transactionId = sendResult.getTransactionId (); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish (sendResult.getMessageQueue (). GetBrokerName ()) EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader (); requestHeader.setTransactionId (transactionId); requestHeader.setCommitLogOffset (id.getOffset ()); switch (localTransactionState) {case COMMIT_MESSAGE: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break Case UNKNOW: requestHeader.setCommitOrRollback (MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break;} requestHeader.setProducerGroup (this.defaultMQProducer.getProducerGroup ()); requestHeader.setTranStateTableOffset (sendResult.getQueueOffset ()); requestHeader.setMsgId (sendResult.getMsgId ()); String remark = localException! = null? ("executeLocalTransactionBranch exception:" + localException.toString ()): null; / / sends the two-phase message this.mQClientFactory.getMQClientAPIImpl (). EndTransactionOneway (brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout ()) by oneway;}

In the second phase of sending, the reason why it is sent in the way of oneway, I understand that this is precisely because the transaction message has a special reliable mechanism-backcheck.

Message back check

When Broker has passed a specific time and found that it still does not get the exact information about whether the second phase of the transaction message is to be committed or rolled back, Broker does not know what happened to Producer (maybe producer is dead, maybe producer sent commit, but network jitter is lost, or it is possible. So he took the initiative to initiate a reinvestigation.

The lookback mechanism of transaction messages is more reflected in the broker side. The broker of RocketMQ uses three different topic: Half message, Op message and real message to isolate the transaction messages at different sending stages, so that Consumer can only see the messages that need to be delivered by the final confirmation commit. The detailed implementation logic will not be discussed in detail in this article, and another article can be opened to interpret it from the perspective of Broker.

From the perspective of Producer, when a retrace request from Broker is received, Producer will check the status of the local transaction according to the message and decide whether to commit or rollback according to the result, which requires Producer to specify the lookback implementation in case of emergency. Of course, under normal circumstances, it is not recommended to actively send UNKNOW status, which will undoubtedly bring additional recheck overhead to broker. It is a reasonable choice to activate the recheck mechanism only in the event of unpredictable anomalies.

In addition, the transaction review in version 4.7.1 is not unlimited, but a maximum of 15 times:

Source code list-15

/ * The maximum number of times the message was checked, if exceed this value, this message will be discarded. * / @ ImportantFieldprivate int transactionCheckMax = 15; thank you for reading. The above is the content of "how to understand the RocketMQ of 14.1k Star on Github". After the study of this article, I believe you have a deeper understanding of how to understand the RocketMQ of 14.1k Star on Github, and the specific usage needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report