In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the principle and usage of DefaultRocketMQListenerContainer in rocketmq". In daily operation, I believe that many people have doubts about the principle and usage of DefaultRocketMQListenerContainer in rocketmq. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "the principle and usage of DefaultRocketMQListenerContainer in rocketmq". Next, please follow the editor to study!
Order
This paper mainly studies the DefaultRocketMQListenerContainer of rocketmq.
DefaultRocketMQListenerContainer
RocketmqFLQListenerContainer.java
Public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {private final static Logger log = LoggerFactory.getLogger (DefaultRocketMQListenerContainer.class); private ApplicationContext applicationContext; / * * The name of the DefaultRocketMQListenerContainer instance * / private String name; private long suspendCurrentQueueTimeMillis = 1000; / * * Message consume retry strategy
-1 minute no retry,put into DLQ directly
0,broker control retry frequency
* > 0Delient client control retry frequency. * / private int delayLevelWhenNextConsume = 0; private String nameServer; private AccessChannel accessChannel = AccessChannel.LOCAL; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private ObjectMapper objectMapper; private RocketMQListener rocketMQListener; private RocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; private Class messageType; private boolean running; / / The following properties came from @ RocketMQMessageListener. Private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; private long consumeTimeout; / /. Public void setRocketMQMessageListener (RocketMQMessageListener anno) {this.rocketMQMessageListener = anno; this.consumeMode = anno.consumeMode (); this.consumeThreadMax = anno.consumeThreadMax (); this.messageModel = anno.messageModel (); this.selectorExpression = anno.selector_Expression (); this.selectorType = anno.selectorType (); this.consumeTimeout = anno.consumeTimeout ();} @ Override public void setupMessageListener (RocketMQListener rocketMQListener) {this.rocketMQListener = rocketMQListener } @ Override public void destroy () {this.setRunning (false); if (Objects.nonNull (consumer)) {consumer.shutdown ();} log.info ("container destroyed, {}", this.toString ();} @ Override public boolean isAutoStartup () {return true;} @ Override public void stop (Runnable callback) {stop () Callback.run ();} @ Override public void start () {if (this.isRunning ()) {throw new IllegalStateException ("container already running. "+ this.toString ();} try {consumer.start ();} catch (MQClientException e) {throw new IllegalStateException (" Failed to start RocketMQ push consumer ", e);} this.setRunning (true); log.info (" running container: {} ", this.toString ()) } @ Override public void stop () {if (this.isRunning ()) {if (Objects.nonNull (consumer)) {consumer.shutdown ();} setRunning (false);}} @ Override public boolean isRunning () {return running;} private void setRunning (boolean running) {this.running = running } @ Override public int getPhase () {/ / Returning Integer.MAX_VALUE only suggests that / / we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE;} @ Override public void afterPropertiesSet () throws Exception {initRocketMQPushConsumer (); this.messageType = getMessageType (); log.debug ("RocketMQ messageType: {}", messageType.getName ()) } @ Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext } @ Override public String toString () {return "DefaultRocketMQListenerContainer {" + "consumerGroup='" + consumerGroup +'\'+ ", nameServer='" + nameServer +'\'+ ", topic='" + topic +'\'+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + " SelectorExpression=' "+ selectorExpression +'\'+", messageModel= "+ messageModel +'}' } private void initRocketMQPushConsumer () throws MQClientException {Assert.notNull (rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull (consumerGroup, "Property' consumerGroup' is required"); Assert.notNull (nameServer, "Property 'nameServer' is required"); Assert.notNull (topic, "Property' topic' is required") RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk (applicationContext.getEnvironment (), this.rocketMQMessageListener.accessKey (), this.rocketMQMessageListener.secretKey ()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace (); if (Objects.nonNull (rpcHook)) {consumer = new DefaultMQPushConsumer (consumerGroup, rpcHook, new AllocateMessageQueueAveragely (), enableMsgTrace, this.applicationContext.getEnvironment (). ResolveRequiredPlaceholders (this.rocketMQMessageListener.customizedTraceTopic ()); consumer.setVipChannelEnabled (false); consumer.setInstanceName (RocketMQUtil.getInstanceName (rpcHook, consumerGroup));} else {log.debug ("Access-key or secret-key not configure in" + this + "."); consumer = new DefaultMQPushConsumer (consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment (). ResolveRequiredPlaceholders (this.rocketMQMessageListener.customizedTraceTopic ());} String customizedNameServer = this.applicationContext.getEnvironment (). ResolveRequiredPlaceholders (this.rocketMQMessageListener.nameServer ()); if (customizedNameServer! = null) {consumer.setNamesrvAddr (customizedNameServer);} else {consumer.setNamesrvAddr (nameServer);} if (accessChannel! = null) {consumer.setAccessChannel (accessChannel) } consumer.setConsumeThreadMax (consumeThreadMax); if (consumeThreadMax)
< consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); consumer.setInstanceName(this.name); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } private Class getMessageType() { Class targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); Type[] interfaces = targetClass.getGenericInterfaces(); Class superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length >0) {return (Class) actualTypeArguments [0];} else {return Object.class;} return Object.class;} else {return Object.class }} / /.}
DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle and ApplicationContextAware interfaces. The setRocketMQMessageListener method sets consumeMode, consumeThreadMax, messageModel, selectorExpression, selectorType and consumeTimeout based on the information annotated by RocketMQMessageListener.
The afterPropertiesSet method executes the initRocketMQPushConsumer and getMessageType methods; the initRocketMQPushConsumer method creates different DefaultMQPushConsumer according to whether the rpcHook is null or not, and then configures the consumer; according to messageModel, selectorType, consumeMode, etc. If the rocketMQListener type is RocketMQPushConsumerLifecycleListener, execute the prepareStart method of RocketMQPushConsumerLifecycleListener
The setupMessageListener method mainly saves the rocketMQListener;isAutoStartup method, returns the true;start method, mainly executes the consumer.start () method, while the stop and destroy methods mainly execute consumer.shutdown ().
DefaultMessageListenerConcurrently
RocketmqFLQListenerContainer.java
Public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@ SuppressWarnings ("unchecked") @ Override public ConsumeConcurrentlyStatus consumeMessage (List msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt: msgs) {log.debug ("received msg: {}", messageExt); try {long now = System.currentTimeMillis (); rocketMQListener.onMessage (doConvertMessage (messageExt)) Long costTime = System.currentTimeMillis ()-now; log.debug ("consume {} cost: {} ms", messageExt.getMsgId (), costTime);} catch (Exception e) {log.warn ("consume message failed. MessageExt: {} ", messageExt, e); context.setDelayLevelWhenNextConsume (delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER;}} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}
The DefaultMessageListenerConcurrently method implements the MessageListenerConcurrently interface; its consumeMessage method uses the for loop try catch to perform a rocketMQListener.onMessage (doConvertMessage (messageExt)) callback, all of which successfully return ConsumeConcurrentlyStatus.CONSUME_SUCCESS and ConsumeConcurrentlyStatus.RECONSUME_LATER in case of an exception
DefaultMessageListenerOrderly
RocketmqFLQListenerContainer.java
Public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@ SuppressWarnings ("unchecked") @ Override public ConsumeOrderlyStatus consumeMessage (List msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt: msgs) {log.debug ("received msg: {}", messageExt); try {long now = System.currentTimeMillis (); rocketMQListener.onMessage (doConvertMessage (messageExt)) Long costTime = System.currentTimeMillis ()-now; log.info ("consume {} cost: {} ms", messageExt.getMsgId (), costTime);} catch (Exception e) {log.warn ("consume message failed. MessageExt: {} ", messageExt, e); context.setSuspendCurrentQueueTimeMillis (suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}} return ConsumeOrderlyStatus.SUCCESS;}}
DefaultMessageListenerOrderly implements the MessageListenerOrderly interface. Its consumeMessage method uses the for loop try catch to perform rocketMQListener.onMessage (doConvertMessage (messageExt)) callbacks, all of which successfully return ConsumeOrderlyStatus.SUCCESS and ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT in case of an exception.
Summary
DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle and ApplicationContextAware interfaces. The setRocketMQMessageListener method sets consumeMode, consumeThreadMax, messageModel, selectorExpression, selectorType and consumeTimeout based on the information annotated by RocketMQMessageListener.
The afterPropertiesSet method executes the initRocketMQPushConsumer and getMessageType methods; the initRocketMQPushConsumer method creates different DefaultMQPushConsumer according to whether the rpcHook is null or not, and then configures the consumer; according to messageModel, selectorType, consumeMode, etc. If the rocketMQListener type is RocketMQPushConsumerLifecycleListener, execute the prepareStart method of RocketMQPushConsumerLifecycleListener
The setupMessageListener method mainly saves the rocketMQListener;isAutoStartup method, returns the true;start method, mainly executes the consumer.start () method, while the stop and destroy methods mainly execute consumer.shutdown ().
At this point, the study of "the principle and usage of DefaultRocketMQListenerContainer in rocketmq" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.