Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Several lines of code closely related to message sending and their usage in java rocketmq

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

In this issue, the editor will bring you a few lines of code and usage closely related to message sending in java rocketmq. The article is rich in content and analyzed and described from a professional point of view. I hope you can get something after reading this article.

Preface

Several lines of code closely related to message delivery:

1. DefaultMQProducer producer = new DefaultMQProducer ("ProducerGroupName")

2. Producer.start ()

3. Message msg = new Message (...)

4. SendResult sendResult = producer.send (msg)

5. Producer.shutdown ()

So what is done behind these lines of code when they are executed?

one。 The first is DefaultMQProducer.start.

@ Overridepublic void start () throws MQClientException {this.defaultMQProducerImpl.start ();}

The implementation class DefaultMQProducerImpl, which generates messages by default, is called.

Calling the defaultMQProducerImpl.start () method, DefaultMQProducerImpl.start () initializes the MQClientInstance instance object, and the MQClientInstance instance object calls its own start method to start some services, such as pulling the message service PullMessageService.Start () and starting the load balancing service RebalanceService.Start (), such as the network communication service MQClientAPIImpl.Start ().

In addition, the information related to the production message is executed, such as registering produceGroup, new a TopicPublishInfo object and using the default TopicKey as the key value to form a key-value pair to be stored in the topicPublishInfoTable of DefaultMQProducerImpl.

After efaultMQProducerImpl.start (), the acquired MQClientInstance instance object calls the sendHeartbeatToAllBroker () method and continuously sends heartbeats to broker. Yin'b can use the following image to roughly describe the DefaultMQProducerImpl.start () process:

What is involved in the three sections in the figure above:

1.1 initialize MQClientInstance

Only one MQClientInstance instance object can be generated by a client, using factory mode and singleton pattern. The MQClientInstance.start () method starts some services. The source code is as follows:

Public void start () throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null = = this.clientConfig.getNamesrvAddr ()) {this.mQClientAPIImpl.fetchNameServerAddr ();} / Start request-response channelthis.mQClientAPIImpl.start (); / / Start various schedule tasksthis.startScheduledTask (); / / Start pull servicethis.pullMessageService.start (); / / Start rebalance servicethis.rebalanceService.start () / / Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl () .start (false); log.info ("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException ("The Factory object [" + this.getClientId () + "] has been created before, and failed., null); default:break;}

1.2 registering producer

This process registers the current producer object with the producerTable of the MQClientInstance instance object. There can be only one instance of a producerGroup in a jvm (a client). There are probably the following methods for MQClientInstance to operate producerTable:

-selectProducer-- updateTopicRouteInfoFromNameServer-- prepareHeartbeatData-- isNeedUpdateTopicRouteInfo-- shutdown

Note:

Different clientId,MQClientManager will be given according to different MQClientInstance

Different MQProducer and MQConsumer will be given according to different group,MQClientInstance

1.3 add routes to the routing information table

TopicPublishInfoTable definition:

Public class DefaultMQProducerImpl implements MQProducerInner {private final Logger log = ClientLogger.getLog (); private final Random random = new Random (); private final DefaultMQProducer defaultMQProducer;private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap ()

It is a Maptype data structure with topic as key. When DefaultMQProducerImpl.start (), a TopicPublishInfo of key=MixAll.DEFAULT_TOPIC is created by default and stored in topicPublishInfoTable.

1.4 send heartbeat packet

When MQClientInstance sends heartbeats to broker, it calls sendHeartbeatToAllBroker () and gets all the broker addresses from the brokerAddrTable of the MQClientInstance instance object to send heartbeats to these broker.

SendHeartbeatToAllBroker involves the prepareHeartbeatData () method, which generates heartbeatData data, and when sending a heartbeat, heartbeatData serves as the body of the heartbeat. Some of the code related to producer is as follows:

/ / Producerfor (Map.Entry entry: this.producerTable.entrySet ()) {MQProducerInner impl = entry.getValue (); if (impl! = null) {ProducerData producerData = new ProducerData (); producerData.setGroupName (entry.getKey ()); heartbeatData.getProducerDataSet () .add (producerData);}

2. SendResult sendResult = producer.send (msg)

DefaultMQProducer.send (msg) is called first, followed by sendDefaultImpl:

Public SendResult send (Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl (msg, CommunicationMode.SYNC, null, timeout);}

What did sendDefaultImpl do?

2.1. Get topicPublishInfo

Obtain the corresponding topicPublishInfo from the topicPublishInfoTable according to the topic of the msg, update the routing information if not, and pull the latest routing information from the nameserver side. Pulling the latest routing information from the nameserver side is roughly as follows:

First getTopicRouteInfoFromNameServer, then topicRouteData2TopicPublishInfo.

2.2 Select the queue to send the message

Ordinary messages: by default, selectOneMessageQueue selects a queue (MessageQueue) from the messageQueueList in topicPublishInfo to send messages, and the queue is selected by long polling by default.

Its mechanism is as follows: normally, queue is selected sequentially to send; if a node times out, the same broker is skipped the next time queue is selected. Different queue selection strategies form several modes of production messages, such as sequential messages and transaction messages.

Sequential messages: send a group of messages that need to be consumed sequentially to the same queue in the same broker. Assuming the payment of the same order number and the refund need to be placed in the same queue, you can implement MessageQueueSelector yourself during send and select queue according to the parameter arg field.

Private SendResult sendSelectImpl (Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {. }

Transaction message: only when the message is sent successfully and the local operation is executed successfully, the commit transaction message is sent, the transaction commits, the message fails, the rollback message is sent directly, and the rollback is carried out. The specific implementation will be analyzed separately.

2.3 encapsulate the message body communication packet and send the packet

First, according to the getBrokerName in the acquired MessageQueue, call findBrokerAddressInPublish to get the message to store the corresponding broker address, and if not find it, reacquire the address with the new routing information:

BrokerAddrTable.get (brokerName) .get (MixAll.MASTER_ID)

It is known that all the acquired broker are master (id=0)

Then, the information related to the message is packaged into a RemotingCommand packet whose RequestCode.SEND_MESSAGE

According to the obtained broke address, the packet is sent to the corresponding broker. The default is to send a timeout of 3 seconds.

Encapsulates the header of the message request packet:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader (); requestHeader.setProducerGroup (this.defaultMQProducer.getProducerGroup ()); requestHeader.setTopic (msg.getTopic ()); requestHeader.setDefaultTopic (this.defaultMQProducer.getCreateTopicKey ()); requestHeader.setDefaultTopicQueueNums (this.defaultMQProducer.getDefaultTopicQueueNums ()); requestHeader.setQueueId (mq.getQueueId ()); requestHeader.setSysFlag (sysFlag); requestHeader.setBornTimestamp (System.currentTimeMillis ()); requestHeader.setFlag (msg.getFlag ()); requestHeader.setProperties (MessageDecoder.messageProperties2String (msg.getProperties (); requestHeader.setReconsumeTimes (0); requestHeader.setUnitMode (this.isUnitMode ()) RequestHeader.setBatch (msg instanceof MessageBatch)

Send message packets (normal messages default to synchronization):

SendResult sendResult = null;switch (communicationMode) {case SYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl (). SendMessage (brokerAddr, mq.getBrokerName (), msg, requestHeader, timeout, communicationMode, context, this); break

Process the response packet from the broker side:

Private SendResult sendMessageSync (final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync (addr, request, timeoutMillis); assert response! = null;return this.processSendResponse (brokerName, msg, response);}

After the broker side processes the request packet, it stores the message to commitLog.

The above are several lines of code and usage closely related to message sending in java rocketmq shared by Xiaobian. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report