Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the principle and function of RocketMQTemplate

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article introduces the relevant knowledge of "what is the principle and function of RocketMQTemplate". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Order

This paper mainly studies RocketMQTemplate.

RocketMQTemplate

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

Public class RocketMQTemplate extends AbstractMessageSendingTemplate implements InitializingBean, DisposableBean {private static final Logger log = LoggerFactory.getLogger (RocketMQTemplate.class); private DefaultMQProducer producer; private ObjectMapper objectMapper; private String charset = "UTF-8"; private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash (); private final Map cache = new ConcurrentHashMap (); / / only put TransactionMQProducer by knowledge! / /. @ Override public void afterPropertiesSet () throws Exception {if (producer! = null) {producer.start ();}} @ Override protected void doSend (String destination, Message message) {SendResult sendResult = syncSend (destination, message); log.debug ("send message to `{}` finished. Result: {} ", destination, sendResult);} @ Override protected Message doConvert (Object payload, Map headers, MessagePostProcessor postProcessor) {String content; if (payload instanceof String) {content = (String) payload;} else {/ / If payload not as string, use objectMapper change it. Try {content = objectMapper.writeValueAsString (payload);} catch (JsonProcessingException e) {log.error ("convert payload to String failed. Payload: {} ", payload); throw new RuntimeException (" convert to payload to String failed. ", e);}} MessageBuilder builder = MessageBuilder.withPayload (content); if (headers! = null) {builder.copyHeaders (headers);} builder.setHeaderIfAbsent (MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); Message message = builder.build () If (postProcessor! = null) {message = postProcessor.postProcessMessage (message);} return message;} @ Override public void destroy () {if (Objects.nonNull (producer)) {producer.shutdown () } for (Map.Entry kv: cache.entrySet ()) {if (Objects.nonNull (kv.getValue () {kv.getValue () .shutdown ();}} cache.clear ();} /.}

RocketMQTemplate inherits spring-messaging 's AbstractMessageSendingTemplate, implements InitializingBean and DisposableBean interfaces, and provides syncSend, syncSendOrderly, asyncSend, asyncSendOrderly, sendOneWay, sendOneWayOrderly, sendMessageInTransaction and other methods.

The afterPropertiesSet method executes producer.start (); the destroy method executes producer.shutdown () and the shutdown of TransactionMQProducer and empties the cache collection

What is called inside the doSend method is the syncSend method, and the returned sendResult is only debug output; the doConvert method does no processing for payload of String type, other types use objectMapper.writeValueAsString to String as content, then construct message, execute postProcessor.postProcessMessage, and then return

SyncSend

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * Same to {@ link # syncSend (String, Message)} with send timeout specified in addition. * * @ param destination formats: `topicName: tags` * @ param message {@ link org.springframework.messaging.Message} * @ param timeout send timeout with millis * @ param delayLevel level for the delay message * @ return {@ link SendResult} * / public SendResult syncSend (String destination, Message message, long timeout Int delayLevel) {if (Objects.isNull (message) | | Objects.isNull (message.getPayload () {log.error ("syncSend failed. Destination: {}, message is null ", destination); throw new IllegalArgumentException (" `message`and `message.payload` cannot be null ");} try {long now = System.currentTimeMillis (); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); if (delayLevel > 0) {rocketMsg.setDelayTimeLevel (delayLevel) } SendResult sendResult = producer.send (rocketMsg, timeout); long costTime = System.currentTimeMillis ()-now; log.debug ("send message cost: {} ms, msgId: {}", costTime, sendResult.getMsgId ()); return sendResult;} catch (Exception e) {log.error ("syncSend failed. Destination: {}, message: {} ", destination, message); throw new MessagingException (e.getMessage (), e);}} / * * syncSend batch messages ina given timeout. * * @ param destination formats: `topicName: tags` * @ param messages Collection of {@ link org.springframework.messaging.Message} * @ param timeout send timeout with millis * @ return {@ link SendResult} * / public SendResult syncSend (String destination Collection msg:messages) {if (Objects.isNull (msg) | | Objects.isNull (msg.getPayload () {log.warn ("Found a message empty in the batch, skip it") Continue;} rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, msg); rmqMsgs.add (rocketMsg);} SendResult sendResult = producer.send (rmqMsgs, timeout); long costTime = System.currentTimeMillis ()-now Log.debug ("send messages cost: {} ms, msgId: {}", costTime, sendResult.getMsgId ()); return sendResult;} catch (Exception e) {log.error ("syncSend with batch failed. Destination: {}, messages.size: {} ", destination, messages.size (); throw new MessagingException (e.getMessage (), e);}}

The syncSend method supports single and multiple org.springframework.messaging.Message, among which the interface of a single Message supports delayLevel

SyncSendOrderly

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * Same to {@ link # syncSendOrderly (String, Message, String)} with send timeout specified in addition. * * @ param destination formats: `topicName: tags` * @ param message {@ link org.springframework.messaging.Message} * @ param hashKey use this key to select queue. For example: orderId, productId... * @ param timeout send timeout with millis * @ return {@ link SendResult} * / public SendResult syncSendOrderly (String destination, Message message, String hashKey, long timeout) {if (Objects.isNull (message) | | Objects.isNull (message.getPayload () {log.error ("syncSendOrderly failed. Destination: {}, message is null ", destination); throw new IllegalArgumentException (" `message`and `message.payload` cannot be null ");} try {long now = System.currentTimeMillis (); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); SendResult sendResult = producer.send (rocketMsg, messageQueueSelector, hashKey, timeout) Long costTime = System.currentTimeMillis ()-now; log.debug ("send message cost: {} ms, msgId: {}", costTime, sendResult.getMsgId ()); return sendResult;} catch (Exception e) {log.error ("syncSendOrderly failed. Destination: {}, message: {} ", destination, message); throw new MessagingException (e.getMessage (), e);}}

The syncSendOrderly method calls the producer.send (rocketMsg, messageQueueSelector, hashKey, timeout) method, and returns SendResult synchronously.

AsyncSend

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * Same to {@ link # asyncSend (String, Message, SendCallback)} with send timeout and delay level specified in addition. * * @ param destination formats: `topicName: tags` * @ param message {@ link org.springframework.messaging.Message} * @ param sendCallback {@ link SendCallback} * @ param timeout send timeout with millis * @ param delayLevel level for the delay message * / public void asyncSend (String destination, Message message, SendCallback sendCallback, long timeout Int delayLevel) {if (Objects.isNull (message) | | Objects.isNull (message.getPayload () {log.error ("asyncSend failed. Destination: {}, message is null ", destination); throw new IllegalArgumentException (" `message` and `message.payload` cannot be null ");} try {org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); if (delayLevel > 0) {rocketMsg.setDelayTimeLevel (delayLevel) } producer.send (rocketMsg, sendCallback, timeout);} catch (Exception e) {log.info ("asyncSend failed. Destination: {}, message: {} ", destination, message); throw new MessagingException (e.getMessage (), e);}}

The asyncSend method needs to pass SendCallback, and the internal execution is producer.send (rocketMsg, sendCallback, timeout).

AsyncSendOrderly

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * Same to {@ link # asyncSendOrderly (String, Message, String, SendCallback)} with send timeout specified in * addition. * * @ param destination formats: `topicName: tags` * @ param message {@ link org.springframework.messaging.Message} * @ param hashKey use this key to select queue. For example: orderId, productId... * @ param sendCallback {@ link SendCallback} * @ param timeout send timeout with millis * / public void asyncSendOrderly (String destination, Message message, String hashKey, SendCallback sendCallback, long timeout) {if (Objects.isNull (message) | | Objects.isNull (message.getPayload () {log.error ("asyncSendOrderly failed. Destination: {}, message is null ", destination); throw new IllegalArgumentException (" `message`and `message.payload` cannot be null ");} try {org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); producer.send (rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout) } catch (Exception e) {log.error ("asyncSendOrderly failed. Destination: {}, message: {} ", destination, message); throw new MessagingException (e.getMessage (), e);}}

The internal execution of the asyncSendOrderly method is producer.send (rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout)

SendOneWay

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * * Similar to UDP, this method won't wait for * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. *

* One-way transmission is used for cases requiring moderate reliability, such as log collection. * * @ param destination formats: `topicName: tags` * @ param message {@ link org.springframework.messaging.Message} * / public void sendOneWay (String destination, Message message) {if (Objects.isNull (message) | | Objects.isNull (message.getPayload () {log.error ("sendOneWay failed. Destination: {}, message is null ", destination); throw new IllegalArgumentException (" `message`and `message.payload` cannot be null ");} try {org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); producer.sendOneway (rocketMsg);} catch (Exception e) {log.error (" sendOneWay failed. Destination: {}, message: {} ", destination, message); throw new MessagingException (e.getMessage (), e);}}

The internal execution of the sendOneWay method is producer.sendOneway (rocketMsg)

SendOneWayOrderly

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * Same to {@ link # sendOneWay (String, Message)} with send orderly with hashKey by specified. * * @ param destination formats: `topicName: tags` * @ param message {@ link org.springframework.messaging.Message} * @ param hashKey use this key to select queue. For example: orderId, productId... * / public void sendOneWayOrderly (String destination, Message message, String hashKey) {if (Objects.isNull (message) | | Objects.isNull (message.getPayload () {log.error ("sendOneWayOrderly failed. Destination: {}, message is null ", destination); throw new IllegalArgumentException (" `message`and `message.payload` cannot be null ");} try {org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); producer.sendOneway (rocketMsg, messageQueueSelector, hashKey);} catch (Exception e) {log.error (" sendOneWayOrderly failed. Destination: {}, message: {} ", destination, message); throw new MessagingException (e.getMessage (), e);}}

The internal execution of the sendOneWayOrderly method is producer.sendOneway (rocketMsg, messageQueueSelector, hashKey)

SendMessageInTransaction

RocketmqFIQspringMusbootMutel 2.0.3Methods sources.jarAcheUnixorgUnixapacheUnixRocketmqqqSpringUniqqSpringUnix coreMQTemplate.java

/ * Send Spring Message in Transaction * * @ param txProducerGroup the validate txProducerGroup name, set null if using the default name * @ param destination destination formats: `topicName: tags` * @ param message message {@ link org.springframework.messaging.Message} * @ param arg ext arg * @ throws MessagingException * / public TransactionSendResult sendMessageInTransaction (final String txProducerGroup, final String destination, final Message message Final Object arg) throws MessagingException {try {TransactionMQProducer txProducer = this.stageMQProducer (txProducerGroup) Org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage (objectMapper, charset, destination, message); return txProducer.sendMessageInTransaction (rocketMsg, arg);} catch (MQClientException e) {throw RocketMQUtil.convert (e);}}

The internal execution of the sendMessageInTransaction method is txProducer.sendMessageInTransaction (rocketMsg, arg)

Summary

RocketMQTemplate inherits spring-messaging 's AbstractMessageSendingTemplate, implements InitializingBean and DisposableBean interfaces, and provides syncSend, syncSendOrderly, asyncSend, asyncSendOrderly, sendOneWay, sendOneWayOrderly, sendMessageInTransaction and other methods.

The afterPropertiesSet method executes producer.start (); the destroy method executes producer.shutdown () and the shutdown of TransactionMQProducer and empties the cache collection

What is called inside the doSend method is the syncSend method, and the returned sendResult is only debug output; the doConvert method does no processing for payload of String type, other types use objectMapper.writeValueAsString to String as content, then construct message, execute postProcessor.postProcessMessage, and then return

This is the end of the content of "what is the principle and function of RocketMQTemplate". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report