Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand PUBLISH message processing

2025-03-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

In this issue, the editor will bring you about how to understand PUBLISH message processing. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

First of all, it is a two-way communication process for mqtt, that is, it allows both client to publish messages to broker and broker to publish messages to client.

Public void processPublish (Channel channel, MqttPublishMessage msg) {final MqttQoS qos = msg.fixedHeader (). QosLevel (); final String clientId = NettyUtils.clientID (channel); LOG.info ("Processing PUBLISH message. CId= {}, topic= {}, messageId= {}, qos= {} ", clientId, msg.variableHeader (). TopicName (), msg.variableHeader (). PacketId (), qos); switch (qos) {case AT_MOST_ONCE: this.qos0PublishHandler.receivedPublishQos0 (channel, msg); break; case AT_LEAST_ONCE: this.qos1PublishHandler.receivedPublishQos1 (channel, msg); break Case EXACTLY_ONCE: this.qos2PublishHandler.receivedPublishQos2 (channel, msg); break; default: LOG.error ("Unknown QoS-Type: {}", qos); break;}}

According to the qos of the published message, using different QosPublishHandler to deal with, QosPublishHandler has three specific implementations, which are Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler.

First of all, I will explain the handling of Qos1PublishHandler.

Void receivedPublishQos1 (Channel channel, MqttPublishMessage msg) {/ / verify if topic can be write final Topic topic = new Topic (msg.variableHeader (). TopicName ()); String clientID = NettyUtils.clientID (channel); String username = NettyUtils.userName (channel); if (! m_authorizator.canWrite (topic, username, clientID)) {LOG.error ("MQTT client is not authorized to publish on topic. CId= {}, topic= {} ", clientID, topic); return;} final int messageID = msg.variableHeader (). PacketId (); / / route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage (msg); toStoreMsg.setClientID (clientID); this.publisher.publish3Subscribers (toStoreMsg, topic, messageID); sendPubAck (clientID, messageID) If (msg.fixedHeader (). IsRetain ()) {if (! msg.payload (). IsReadable ()) {m_messagesStore.cleanRetained (topic);} else {/ / before wasn't stored m_messagesStore.storeRetained (topic, toStoreMsg) }} / / modify publish message, ByteBuf object from slice, memory leak MoquetteMessage moquetteMessage = new MoquetteMessage (msg.fixedHeader (), msg.variableHeader (), msg.content ()); m_interceptor.notifyTopicPublished (moquetteMessage, clientID, username); msg.content (). Release ();} 1. Authentication, whether the username under the client has permission to publish messages to the topic (write to topic). two。 Create an IMessagesStore.StoredMessage and push the message to all subscribers to the message. If (LOG.isTraceEnabled ()) {LOG.trace ("Sending publish message to subscribers. ClientId= {}, topic= {}, messageId= {}, payload= {}, "+" subscriptionTree= {} ", pubMsg.getClientID (), topic, messageID, DebugUtils.payload2Str (pubMsg.getPayload (), subscriptions.dumpTree ());} else if (LOG.isInfoEnabled ()) {LOG.info (" Sending publish message to subscribers. ClientId= {}, topic= {}, messageId= {} ", pubMsg.getClientID (), topic, messageID);} publish3Subscribers (pubMsg, topic); determine whether it is a trace mode. If so, all current subscription relationships will be printed to the log. Because this requires traversing the topic tree and consumes a lot, it is configurable and configured in moquette.cof. The processing logic of the core is below, and then move on to void publish3Subscribers (IMessagesStore.StoredMessage pubMsg, Topic topic) {List topicMatchingSubscriptions = subscriptions.matches (topic); final String topic1 = pubMsg.getTopic (); final MqttQoS publishingQos = pubMsg.getQos (); final ByteBuf origPayload = pubMsg.getPayload (); for (final Subscription sub: topicMatchingSubscriptions) {MqttQoS qos = lowerQosToTheSubscriptionDesired (sub, publishingQos); ClientSession targetSession = m_sessionsStore.sessionForClient (sub.getClientId ()) Boolean targetIsActive = this.connectionDescriptors.isConnected (sub.getClientId ()); / / TODO move all this logic into messageSender, which puts into the flightZone only the messages that pull out of the queue. If (targetIsActive) {if (LOG.isDebugEnabled ()) {LOG.debug ("Sending PUBLISH message to active subscriber. CId= {}, topicFilter= {}, qos= {} ", sub.getClientId (), sub.getTopicFilter (), qos);} / / we need to retain because duplicate only copy r indexes and don't retain () causing / / refCnt = 0 ByteBuf payload = origPayload.retainedDuplicate (); MqttPublishMessage publishMsg If (qos! = MqttQoS.AT_MOST_ONCE) {/ / QoS 1 or 2 int messageId = targetSession.inFlightAckWaiting (pubMsg); / / set the PacketIdentifier only for QoS > 0 publishMsg = notRetainedPublishWithMessageId (topic1, qos, payload, messageId);} else {publishMsg = notRetainedPublish (topic1, qos, payload) } this.messageSender.sendPublish (targetSession, publishMsg);} else {if (! targetSession.isCleanSession ()) {if (LOG.isDebugEnabled ()) {LOG.debug ("Storing pending PUBLISH inactive message. CId= {}, topicFilter= {}, qos= {} ", sub.getClientId (), sub.getTopicFilter (), qos);} / / store the message in targetSession queue to deliver targetSession.enqueue (pubMsg);}

It is roughly divided into the following steps

2.1. Find out the matching subscription set list according to topic, which is explained separately because it involves a relatively large calculation.

Public List matches (Topic topic) {Queue tokenQueue = new LinkedBlockingDeque (topic.getTokens ()); List matchingSubs = new ArrayList (); subscriptions.get () .matches (tokenQueue, matchingSubs); / / when the client requests a subscription using a wildcard topic filter, the client's subscription may be duplicated, so the published message may match multiple filters. For this situation, the server must distribute the message to the client with the highest QoS level that all subscriptions match. The server can then distribute copies of the message to each / / matching subscriber according to the QoS level of the subscription. Map subsForClient = new HashMap (); for (ClientTopicCouple matchingCouple: matchingSubs) {Subscription existingSub = subsForClient.get (matchingCouple.clientID); Subscription sub = this.subscriptionsStore.getSubscription (matchingCouple); if (sub = = null) {/ / if the m_sessionStore hasn't the sub because the client disconnected continue } / / update the selected subscriptions if not present or if has a greater qos if (existingSub = = null | | existingSub.getRequestedQos () .value () < sub.getRequestedQos () .value ()) {subsForClient.put (matchingCouple.clientID, sub) / / notice that the subscription obtained from session is finally saved here, rather than from the topic directory, because it is possible that client was not online at that time or the qos level of the subscription changed}} return new ArrayList (subsForClient.values ()). } as you can see, a queue will be created first, and the topic level such as / a/b/c will be stored in the queue, and there will be three transports in the queue. The reason why the queue is used instead of list is that when matching is carried out later, you need to ensure that the match starts at the first level, rather than the last void matches (Queue tokens, List matchingSubs) {Token t = tokens.poll () / / check if t is null tokens finished if (t = = null) {matchingSubs.addAll (m_subscriptions); / / check if it has got a MULTI child and add its subscriptions for (TreeNode n: m_children) {if (n.getToken ()) = = Token.MULTI | | n.getToken () = = Token.SINGLE) {matchingSubs.addAll (n.subscriptions ());} return } / / we are on MULTI, than add subscriptions and return if (m_token = = Token.MULTI) {matchingSubs.addAll (m_subscriptions); return } for (TreeNode n: m_children) {if (n.getToken (). Match (t)) {/ / Create a copy of token, else if navigate 2 sibling it / / consumes 2 elements on the queue instead of one n.matches (new LinkedBlockingQueue (tokens), matchingSubs); / / TODO don't create a copy n.matches (tokens, matchingSubs);}

This code is difficult to understand and involves iterating over the topic tree io.moquette.spi.impl.subscriptions.TreeNode, which is illustrated by a diagram below

In addition, students who are not familiar with topic matching rules can take a look at https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md here.

If there are five client of A, B, C, D, E, where A subscribes / test/#,B subscribes / test/hello/#,C subscribes / test/hello/beijing

D subscribed to / test/+/hello, and now E has posted a message to topic-name for / test/hello/shanghai asking which client should receive this message.

Draw the topic tree as follows (please forgive me for drawing by hand)

First analyze the whole process of E publishing messages:

2.1.1./test/hello/shanghai is put into queue and taken out in the order of test,hello,shanghai

2.1.2. In the first round, matching test,test is not empty, RootNode (actually null) is not #, and execute to the child node under traversal RootNode. RootNode has only one child node, test,test.equals (test), and then the current treenode becomes test.

2.1.3. The hello,hello is not empty and the test is not # from the queue. Traverse test, a child node of treenode. Test has three child nodes, namely (#, +, hello).

2.1. The child node is #, # .mathcs (hello), the current node is #, and then the shanghai,shanghai is not empty from the queue, # is #, the current iteration is terminated, and node A matching is put into the matching list

2.1.3.2 Child node is +, + .mathcs (hello) the current node is +, and then take the shanghai from the queue (some students here may have questions, why can they still take out the shanghai, because the next iteration is the new queue of new). Shanghai is not empty, + is not #, so it does not match, then match + the child node of the treenode, + only one child node.

2.1. 3.2.1 if the current node is hello,hello.mathcs (shanghai), the iteration is terminated, so D will not put the matching list.

1.3.3 the child node is hello,hello.equals (hello), and the current node becomes hello. Then take out from the queue that shanghai,shanghai is not empty and hello is not #. Traversing hello, a child node under treenode, hello first has two child nodes, namely (#, beijing).

2.1.3.3.1 the child node is #, # .mathcs (hello), and the current node is #. What is taken out of the queue is empty, so it goes directly to the first if branch, puts B into the matching list, and exits the method.

2.1. 3.3.2 Child node is beijing,beijing.equals (shanghai) does not hold, exit iteration

In the end, the only two client that can match successfully are Amai B.

That is to say, if it can be successfully matched, either every level of / test/hello/shanghai can be matched successfully, "+" can reach any single level, or a certain level is "#" (corresponding to the above two if branches)

At this point, the analysis of the whole process of having topic-name matching the topic tree composed of topic-filters is complete. Next, let's go back to the method of publish3Subscribers above to explain the action after matching client.

2.2 iterate through the matching client to determine that the qos,qos takes the minimum value of the qos required by the subscription request and the qos of the published message itself, which is stipulated by the mqtt protocol itself. this is because the basic unit of subscription is a certain topic, and the subscriber can only subscribe to a topic, not a message, while the basic unit of publishing a message is a message, and there can be many messages under a topic. Different messages can have different qos, so the real qos is determined by both the subscriber and the publisher, to minimize the qos for performance reasons.

2.3 determine whether the client is still online according to the connection descriptor. If not, and the client requests to keep the session, save the message to the BlockingQueue of the session of the client and send it to the client again after the client is online again. This corresponds to the action of a republish when the connection is established, as shown in step 10 of https://blog.51cto.com/13579730/2073630.

2.3. If online, different processing is done according to qos, if qos is 0, which is relatively simple, and qos is 1 or 2, then the message will be put into outboundFlightZone first, a messageId will be generated, and then sent through PersistentQueueMessageSender.

Io.moquette.spi.impl.PersistentQueueMessageSender#sendPublish, the specific distribution logic is relatively simple, which will not be explained in detail here. The relationship between classes is

ProtocolProcessor-- "qos1PublishHandler--" MessagesPublisher-- "PersistentQueueMessageSender, the basic logic is to send through ConnectionDescriptorStore. If you send a failed qos1 or 2 message that requires saving the session, you will keep it until the repeat is successful.

At this point, the core logic of the publish3Subscribers method is over, let's go back to the io.moquette.spi.impl.Qos1PublishHandler#receivedPublishQos1 method

3. Send PUBACK messages

4. If it is a retain message, but there is no paylod, clearing the retain message under the topic can be understood as the client actively asked for removal, because it sent an empty message. If there is a payload, the retain message is stored. For the reserved message, let me briefly summarize the https://github.com/mcxiaoke/mqtt/blob/master/mqtt/0303-PUBLISH.md here in detail. Only one retain message will always be stored under each topic. If two messages are sent, the latter will overwrite the previous one. 2. If the client sends a zero-byte retain message, broker will clean up the retain message under the topic, because broker will not store zero-byte retain message; 3. The reserved message on the server side is not part of the session state. The server should keep that message until the client deletes it. 4. When client receives a retain message sent by broker, it can be understood as the first message of a newly created subscription by client.

5. Wake up interceptor

At this point, moquette has finished explaining the processing of PUBLISH messages. It only explains the processing of qos1, because the processing of qos0 is relatively simple, while we don't have any application scenarios for qos2. In addition, here are some confusing concepts, cleanSession and retain messages.

1.retain messages are not part of session, they are not hooked to client, but to topic-filter. That is, when the clientsession that publishes the retain message no longer exists, but the retain message still exists, unless a client actively deletes it

two。 For the client that requires the retention of the session, there will be a process of broker actively resending the message. This action is really when the client re-establishes the connection. See step 10 of the https://blog.51cto.com/13579730/2073630 here, because broker has the responsibility to resend the qos1 and qos2 messages to the client that requires the retention of the session.

3. When publishing and subscribing to client, broker also has a process of actively sending messages, as shown here in step 8 of https://blog.51cto.com/13579730/2073914.

4.qos1 messages and qos2 are stored in a BlockingQueue in clientsession when message delivery fails or when client is not online, while retain messages are directly stored in IMessagesStore when broker receives them, and the underlying layer is a Map.

The above is the editor for you to share how to understand the PUBLISH message processing, if you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report