Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the trajectory of RocketMQ messages?

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "what is the track of RocketMQ message". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what the track of RocketMQ message is".

1. Send message track flow

First, let's take a look at how to enable the message trace on the message sender. The sample code is as follows:

Public class TraceProducer {public static void main (String [] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer ("ProducerGroupName", true); / / [@ 1] (https://my.oschina.net/u/1198) producer.setNamesrvAddr ("127.0.0.1 public static void main 9876"); producer.start (); for (int I = 0; I)

< 10; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }} 从上述代码可以看出其关键点是在创建DefaultMQProducer时指定开启消息轨迹跟踪。我们不妨浏览一下DefaultMQProducer与启用消息轨迹相关的构造函数: public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) 参数如下: String producerGroup 生产者所属组名。 boolean enableMsgTrace 是否开启跟踪消息轨迹,默认为false。 String customizedTraceTopic 如果开启消息轨迹跟踪,用来存储消息轨迹数据所属的主题名称,默认为:RMQ_SYS_TRACE_TOPIC。 1.1 DefaultMQProducer构造函数public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { // [@1](https://my.oschina.net/u/1198) this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { // @2 try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); // [@3](https://my.oschina.net/u/2648711) } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } }} 代码@1:首先介绍一下其局部变量。 String producerGroup 生产者所属组。 RPCHook rpcHook 生产者发送钩子函数。 boolean enableMsgTrace 是否开启消息轨迹跟踪。 String customizedTraceTopic 定制用于存储消息轨迹的数据。 代码@2:用来构建AsyncTraceDispatcher,看其名:异步转发消息轨迹数据,稍后重点关注。 代码@3:构建SendMessageTraceHookImpl对象,并使用AsyncTraceDispatcher用来异步转发。 1.2 SendMessageTraceHookImpl钩子函数1.2.1 SendMessageTraceHookImpl类图 SendMessageHook 消息发送钩子函数,用于在消息发送之前、发送之后执行一定的业务逻辑,是记录消息轨迹的最佳扩展点。 TraceDispatcher 消息轨迹转发处理器,其默认实现类AsyncTraceDispatcher,异步实现消息轨迹数据的发送。下面对其属性做一个简单的介绍: int queueSize 异步转发,队列长度,默认为2048,当前版本不能修改。 int batchSize 批量消息条数,消息轨迹一次消息发送请求包含的数据条数,默认为100,当前版本不能修改。 int maxMsgSize 消息轨迹一次发送的最大消息大小,默认为128K,当前版本不能修改。 DefaultMQProducer traceProducer 用来发送消息轨迹的消息发送者。 ThreadPoolExecutor traceExecuter 线程池,用来异步执行消息发送。 AtomicLong discardCount 记录丢弃的消息个数。 Thread worker woker线程,主要负责从追加队列中获取一批待发送的消息轨迹数据,提交到线程池中执行。 ArrayBlockingQueue< TraceContext>

TraceContextQueue message track TraceContext queue, which is used to store messages to be sent to the server.

ArrayBlockingQueue

< Runnable>

AppenderQueue thread pool internal queue, default length 1024.

DefaultMQPushConsumerImpl hostConsumer consumer information, recording the trajectory information when the message is consumed.

The name of the topic used by String traceTopicName to trace the message track.

1.2.2 Source code analysis SendMessageTraceHookImpl1.2.2.1 sendMessageBeforepublic void sendMessageBefore (SendMessageContext context) {/ / if it is message trace data,then it doesn't recorded if (context = = null | | context.getMessage () .getTopic () .startsWith (AsyncTraceDispatcher) localDispatcher) .getTraceTopicName ()) {/ / @ 1 return;} / / build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext (); tuxeContext.setTraceBeans (new ArrayList (1)); context.setMqTraceContext (tuxeContext) TuxeContext.setTraceType (TraceType.Pub); tuxeContext.setGroupName (context.getProducerGroup ()); / / @ 2 / / build the data bean object of message trace TraceBean traceBean = new TraceBean () / / @ 3 traceBean.setTopic (context.getMessage () .getTopic ()); traceBean.setTags (context.getMessage () .getTags ()); traceBean.setKeys (context.getMessage () .getKeys ()) TraceBean.setStoreHost (context.getBrokerAddr ()); traceBean.setBodyLength (context.getMessage (). GetBody () .length); traceBean.setMsgType (context.getMsgType ()); tuxeContext.getTraceBeans (). Add (traceBean);}

Code @ 1: if the topic topic is the Topic of the message track, return it directly.

Code @ 2: in the context of message sending, set the context for tracking the message trajectory, which mainly consists of a TraceBean collection, tracking type (TraceType.Pub), and the group to which the producer belongs.

Code @ 3: build a trace message, represented by TraceBean, recording the topic, tags, keys, sent to broker address, message body length, etc., of the original message.

As can be seen from the above, the main purpose of sendMessageBefore is to prepare part of the message tracking log and store it in the sending context when the message is sent, and the message track data will not be sent at this time.

1.2.2.2 sendMessageAfterpublic void sendMessageAfter (SendMessageContext context) {/ / if it is message trace data,then it doesn't recorded if (context = = null | | context.getMessage () .getTopic () .startsWith (AsyncTraceDispatcher) localDispatcher) .getTraceTopicName ()) / / @ 1 | | context.getMqTraceContext () = = null) {return;} if (context.getSendResult () = = null) {return } if (context.getSendResult (). GetRegionId () = = null | |! context.getSendResult () .isTraceOn ()) {/ / if switch is false,skip it return;} TraceContext tuxeContext = (TraceContext) context.getMqTraceContext (); TraceBean traceBean = tuxeContext.getTraceBeans () .get (0) / / @ 2 int costTime = (int) ((System.currentTimeMillis ()-tuxeContext.getTimeStamp ()) / tuxeContext.getTraceBeans () .size ()); / / @ 3 tuxeContext.setCostTime (costTime) / / @ 4 if (context.getSendResult () .getSendStatus () .equals (SendStatus.SEND_OK)) { TuxeContext.setSuccess (true) } else {tuxeContext.setSuccess (false);} tuxeContext.setRegionId (context.getSendResult (). GetRegionId ()); traceBean.setMsgId (context.getSendResult (). GetMsgId ()); traceBean.setOffsetMsgId (context.getSendResult (). GetOffsetMsgId ()); traceBean.setStoreTime (tuxeContext.getTimeStamp () + costTime / 2); localDispatcher.append (tuxeContext) / / @ 5}

Code @ 1: if the topic topic is the Topic of the message track, return it directly.

Code @ 2: although the TraceBean for obtaining tracking from MqTraceContext is designed as a List structure, there is always only one piece of data in the message delivery scenario, even if it is sent in batches.

Code @ 3: get the time it takes for the message to be sent to receive the response result.

Code @ 4: set costTime (time consuming), success (whether it is sent successfully), regionId (partition where broker is sent), msgId (message ID, globally unique), offsetMsgId (physical offset of the message, if it is a batch message, physical offset of the last message), storeTime. Here, the storage time of the message is represented by (the time spent by the client sending time + 1/2). Here is an estimate.

Code @ 5: forward the information that needs to be tracked to the Broker server via TraceDispatcher. The code is as follows:

Public boolean append (final Object ctx) {boolean result = traceContextQueue.offer ((TraceContext) ctx); if (! result) {log.info ("buffer full" + discardCount.incrementAndGet () + ", context is" + ctx);} return result;}

A key point here is the use of the offer method, which returns false immediately when the queue cannot hold the new element and does not block.

Next, we turn our attention to the implementation of TraceDispatcher.

1.3principles of TraceDispatcher implementation

TraceDispatcher, which is used to forward the client message trace data to Broker, and its default implementation class is AsyncTraceDispatcher.

1.3.1 TraceDispatcher constructor public AsyncTraceDispatcher (String traceTopicName, RPCHook rpcHook) throws MQClientException {/ / queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong (0L); this.traceContextQueue = new ArrayBlockingQueue (1024); this.appenderQueue = new ArrayBlockingQueue (queueSize) If (! UtilAll.isBlank (traceTopicName)) {this.traceTopicName = traceTopicName;} else {this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC } / / @ 1 this.traceExecuter = new ThreadPoolExecutor (/ /: 10, / / 20, / / 1000 * 60, / / TimeUnit.MILLISECONDS, / / this.appenderQueue, / / new ThreadFactoryImpl ("MQTraceSendThread_"); traceProducer = getAndCreateTraceProducer (rpcHook); / / @ 2}

Code @ 1: initializes the core properties, and in this version these values are "solidified" and cannot be modified by the user.

The queueSize queue length, which defaults to 2048, is the number of message tracks that the asynchronous thread pool can backlog.

The number of messages that batchSize sends to Broker in batches at a time. The default is 100.

When maxMsgSize reports the message track to Broker, the total size of the message body cannot exceed this value, and the default is 128k.

The message track data that is discarded during the whole operation of discardCount is that if the message TPS is too large to be processed by the asynchronous forwarding thread, the message track data will be actively discarded.

TraceContextQueue traceContext has a backlog of queues. After receiving the processing result, the client (message sender, message consumer) submits the message track to the queue and returns immediately.

AppenderQueue is submitted to the queue in the Broker thread pool.

TraceTopicName is used to receive the Topic of the message track. The default is RMQ_SYS_TRANS_HALF_TOPIC.

TraceExecuter is used for the asynchronous thread pool sent to the Broker service. The default number of core threads is 10, the maximum thread pool is 20, the queue stacking length is 2048, and the thread name is MQTraceSendThread_. 、

The Producer of the track of the message sent by traceProducer.

Code @ 2: call the getAndCreateTraceProducer method to create a Producer (message sender) for sending message tracks, and its implementation is described in more detail below.

1.3.2 getAndCreateTraceProducer explains private DefaultMQProducer getAndCreateTraceProducer (RPCHook rpcHook) {DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance = = null) {/ / @ 1 traceProducerInstance = new DefaultMQProducer (rpcHook); traceProducerInstance.setProducerGroup (TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout (5000); traceProducerInstance.setVipChannelEnabled (false); / / The max size of message is 128K traceProducerInstance.setMaxMessageSize (maxMsgSize-10 * 1000) } return traceProducerInstance;}

Code @ 1: if a sender has not been established, create a message sender for sending message tracks, whose GroupName is: _ INNER_TRACE_PRODUCER, the message sending timeout is 5s, and the maximum allowable message size is 118K.

1.3.3 startpublic void start (String nameSrvAddr) throws MQClientException {if (isStarted.compareAndSet (false, true)) {/ / @ 1 traceProducer.setNamesrvAddr (nameSrvAddr); traceProducer.setInstanceName (TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start ();} this.worker = new Thread (new AsyncRunnable (), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); / / @ 2 this.worker.setDaemon (true) This.worker.start (); this.registerShutDownHook ();}

It starts and is called when DefaultMQProducer is started, and if the trace message trace is enabled, it is called.

Code @ 1: if the sender used to send the message track is not started, set the nameserver address and start it.

Code @ 2: start a thread to execute the AsyncRunnable task, which we'll focus on next.

1.3.4 AsyncRunnableclass AsyncRunnable implements Runnable {private boolean stopped; public void run () {while (! stopped) {List contexts = new ArrayList (batchSize); / / @ 1 for (int I = 0; I

< batchSize; i++) { TraceContext context = null; try { //get trace data element from blocking Queue - traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); // @2 } catch (InterruptedException e) { } if (context != null) { contexts.add(context); } else { break; } } if (contexts.size() >

0) {: AsyncAppenderRequest request = new AsyncAppenderRequest (contexts); / / @ 3 traceExecuter.submit (request) } else if (AsyncTraceDispatcher.this.stopped) {this.stopped = true;}

Code @ 1: build a message tracking Bean to be submitted, sending a maximum of batchSize at a time, with a default of 100.

Code @ 2: take a TraceContext to be submitted from the traceContextQueue and set the timeout to 5s, that is, if there is no TraceContext to be submitted in the queue, the maximum waiting time is 5s.

Code @ 3: submit the task AsyncAppenderRequest to the thread pool.

1.3.5 AsyncAppenderRequest#sendTraceDatapublic void sendTraceData (List contextList) {Map transBeanMap = new HashMap (); for (TraceContext context: contextList) {/ / @ 1 if (context.getTraceBeans () .isEmpty ()) {continue;} / / Topic value corresponding to original message entity content String topic = context.getTraceBeans () .get (0) .getTopic () / / @ 2 / / Use original message entity's topic as key String key = topic; List transBeanList = transBeanMap.get (key); if (transBeanList = = null) {transBeanList = new ArrayList (); transBeanMap.put (key, transBeanList);} TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean (context); / / @ 3 transBeanList.add (traceData) } for (Map.Entry entry: transBeanMap.entrySet ()) {/ / @ 4 flushData (entry.getValue ());}}

Code @ 1: traverses the collected message track data.

Code @ 2: gets the Topic where the message trace is stored.

Code @ 3: encode the TraceContext. Here is the transmission data of the message track. Take a closer look at it later to understand its upload format.

Code @ 4: sends the encoded data to the Broker server.

1.3.6 TraceDataEncoder#encoderFromContextBean

Depending on the type of message trace, there will be some differences in format, which are described below.

1.3.6.1 PUB (message sending) case Pub: {TraceBean bean = ctx.getTraceBeans () .get (0) / / append the content of context and traceBean to transferBean's TransData sb.append (ctx.getTraceType ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getTimeStamp ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getRegionId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getGroupName ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getTopic ()). Append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getMsgId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getTags ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getKeys ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getStoreHost ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getBodyLength ()) .append (TraceConstants.CONTENT) _ SPLITOR) / / .append (ctx.getCostTime ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getMsgType (). Ordinal ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getOffsetMsgId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.isSuccess ()) .append (TraceConstants.FIELD_SPLITOR) }

The protocol of message track data uses string concatenation, the separator of the field is 1, and the whole data ends with 2. This design is still a bit "incredible". Why not just use the json protocol?

1.3.6.2 SubBefore (before message consumption) for (TraceBean bean: ctx.getTraceBeans ()) {sb.append (ctx.getTraceType ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getTimeStamp ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getRegionId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getGroupName ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getRequestId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getMsgId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getRetryTimes ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getKeys ()) .append (TraceConstants.FIELD_SPLITOR) / /}}

The tracks are spliced in the above order, with fields separated by 1 and each record ending with 2.

1.3.2.3 SubAfter (after message consumption) case SubAfter: {for (TraceBean bean: ctx.getTraceBeans ()) {sb.append (ctx.getTraceType ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getRequestId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getMsgId ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx. GetCostTime () .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.isSuccess ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (bean.getKeys ()) .append (TraceConstants.CONTENT_SPLITOR) / / .append (ctx.getContextCode ()) .append (TraceConstants.FIELD_SPLITOR) }}}

If the format coding is the same, there is no need to repeat it.

After the above source code tracking, the message track tracking process and the message track data coding protocol on the message sender are clear. Then we use a sequence diagram to end the explanation of this part.

In fact, at this point, we only pay attention to the message tracking of message sending, but what about the tracking of message consumption? In fact, the implementation principle is the same, that is, the implementation of specific hook functions before and after message consumption, in fact, the current class is ConsumeMessageTraceHookImpl, because its implementation is similar to the idea of sending messages, so it will not be introduced in detail.

2. How to store message track data

In fact, from the above analysis, we already know that the message track data of RocketMQ is stored on Broker, so how to specify the subject name of the message track? How to distribute its routing information? Is it created on each Broker or only on one of the platforms? RocketMQ supports the theme of system default and custom message tracks.

2.1 use the default theme name of the system

The default message track theme of RocketMQ is: RMQ_SYS_TRACE_TOPIC, so does the Topic need to be created manually? What about its routing information?

{if (this.brokerController.getBrokerConfig () .isTraceTopicEnable ()) {/ / @ 1 String topic = this.brokerController.getBrokerConfig () .getMsgTraceTopicName (); TopicConfig topicConfig = new TopicConfig (topic); this.systemTopicList.add (topic); topicConfig.setReadQueueNums (1); / / @ 2 topicConfig.setWriteQueueNums (1) This.topicConfigTable.put (topicConfig.getTopicName (), topicConfig);}}

The above code comes from the constructor of TopicConfigManager, which creates a topicConfigManager object when Broker starts, which is used to manage the routing information of topic.

Code @ 1: if Broker enables message tracking (traceTopicEnable=true), the topic routing information for the default message track is automatically created. Note that the number of read and write queues is 1.

2.2 user-defined message track topic

When you create a message sender and message consumer, you can display the Topic of the specified message track, for example:

Public DefaultMQProducer (final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) public DefaultMQPushConsumer (final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,final String customizedTraceTopic)

Specify the message track Topic through customizedTraceTopic.

Warm reminder: usually in the production environment, automatic theme creation will not be enabled, so RocketMQ operation and maintenance personnel are required to create the Topic in advance.

Thank you for your reading, the above is the content of "what is the track of RocketMQ messages?" after the study of this article, I believe you have a deeper understanding of how the track of RocketMQ messages is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report