Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the function of ListenerContainerConfiguration in rocketmq

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

What is the role of ListenerContainerConfiguration in rocketmq? I believe many inexperienced people don't know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

ListenerContainerConfiguration

RocketmqkoSpringMubootMutual 2.0.3 Methodissources.jarAcheUnixorgUniverse apacheUnixRocketmqqqSpringSpringListenerContainerConfiguration.java

@ Configurationpublic class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {private final static Logger log = LoggerFactory.getLogger (ListenerContainerConfiguration.class); private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong (0); private StandardEnvironment environment; private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration (ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) {this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties @ Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {this.applicationContext = (ConfigurableApplicationContext) applicationContext;} @ Override public void afterSingletonsInstantiated () {Map beans = this.applicationContext.getBeansWithAnnotation (RocketMQMessageListener.class); if (Objects.nonNull (beans)) {beans.forEach (this::registerContainer);}} private void registerContainer (String beanName, Object bean) {Class clazz = AopProxyUtils.ultimateTargetClass (bean) If (! RocketMQListener.class.isAssignableFrom (bean.getClass () {throw new IllegalStateException (clazz + "is not instance of" + RocketMQListener.class.getName ());} RocketMQMessageListener annotation = clazz.getAnnotation (RocketMQMessageListener.class); validate (annotation); String containerBeanName = String.format ("% slots% s", DefaultRocketMQListenerContainer.class.getName (), counter.incrementAndGet ()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext GenericApplicationContext.registerBean (containerBeanName, DefaultRocketMQListenerContainer.class, ()-> createRocketMQListenerContainer (containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean (containerBeanName, DefaultRocketMQListenerContainer.class); if (! container.isRunning ()) {try {container.start ();} catch (Exception e) {log.error ("Started container failed. {} ", container, e); throw new RuntimeException (e);}} log.info (" Register the listener to container, listenerBeanName: {}, containerBeanName: {} ", beanName, containerBeanName);} private DefaultRocketMQListenerContainer createRocketMQListenerContainer (String name, Object bean, RocketMQMessageListener annotation) {DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer (); String nameServer = environment.resolvePlaceholders (annotation.nameServer ()); nameServer = StringUtils.isEmpty (nameServer)? RocketMQProperties.getNameServer (): nameServer; String accessChannel = environment.resolvePlaceholders (annotation.accessChannel ()); container.setNameServer (nameServer); if (! StringUtils.isEmpty (accessChannel)) {container.setAccessChannel (AccessChannel.valueOf (accessChannel));} container.setTopic (environment.resolvePlaceholders (annotation.topic (); container.setConsumerGroup (environment.resolvePlaceholders (annotation.consumerGroup (); container.setRocketMQMessageListener (annotation) Container.setRocketMQListener ((RocketMQListener) bean); container.setObjectMapper (objectMapper); container.setName (name); / / REVIEW ME, use the same clientId or multiple? Return container;} private void validate (RocketMQMessageListener annotation) {if (annotation.consumeMode () = = ConsumeMode.ORDERLY & & annotation.messageModel () = = MessageModel.BROADCASTING) {throw new BeanDefinitionValidationException ("Bad annotation definition in @ RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");}

ListenerContainerConfiguration implements ApplicationContextAware and SmartInitializingSingleton interfaces.

Its setApplicationContext method saves applicationContext; and its afterSingletonsInstantiated method gets the bean marked with RocketMQMessageListener annotations, and then executes the registerContainer one by one.

The registerContainer method first determines whether the bean is an implementation class of RocketMQListener, if not, throws an IllegalStateException; and then gets the information of the RocketMQMessageListener annotation to determine whether an unsupported attribute is set; then creates a DefaultRocketMQListenerContainer through createRocketMQListenerContainer and registers it to applicationContext, and then executes the start method for the container without running

RocketMQMessageListener

RocketmqkoSpringMubootMutel 2.0.3 Methodissources.jarUnixorgUniverse apacheUnix RocketmqqqSpringUniverse annotationGetMQMessageListener.Java

@ Target (ElementType.TYPE) @ Retention (RetentionPolicy.RUNTIME) @ Documentedpublic @ interface RocketMQMessageListener {String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}" String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; / * * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * See here for further discussion. * / String consumerGroup (); / * * Topic name. * / String topic (); / * * Control how to selector message. * * @ see SelectorType * / SelectorType selectorType () default SelectorType.TAG; / * * Control which message can be select. Grammar please see {@ link SelectorType#TAG} and {@ link SelectorType#SQL92} * / String selector_Expression () default "*"; / * Control consume mode, you can choice receive message concurrently or orderly. * / ConsumeMode consumeMode () default ConsumeMode.CONCURRENTLY; / * * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. * / MessageModel messageModel () default MessageModel.CLUSTERING; / * * Max consumer thread number. * / int consumeThreadMax () default 64; / * Max consumer timeout, default 30s. * / long consumeTimeout () default 3000L; / * The property of "access-key". * / String accessKey () default ACCESS_KEY_PLACEHOLDER; / * The property of "secret-key" * / String secretKey () default SECRET_KEY_PLACEHOLDER; / * * Switch flag instance for message trace. * / boolean enableMsgTrace () default true; / * * The name value of message trace topic.If you don't config,you can use the default trace topic name. * / String customizedTraceTopic () default TRACE_TOPIC_PLACEHOLDER; / * The property of "name-server" * / String nameServer () default NAME_SERVER_PLACEHOLDER; / * The property of "access-channel" * / String accessChannel () default ACCESS_CHANNEL_PLACEHOLDER;}

RocketMQMessageListener annotations define consumerGroup, topic, selectorType, selectorExpression, consumeMode, messageModel, consumeThreadMax, consumeTimeout, accessKey, secretKey, enableMsgTrace, customizedTraceTopic, nameServer, accessChannel attributes

Summary

ListenerContainerConfiguration implements ApplicationContextAware and SmartInitializingSingleton interfaces.

Its setApplicationContext method saves applicationContext; and its afterSingletonsInstantiated method gets the bean marked with RocketMQMessageListener annotations, and then executes the registerContainer one by one.

The registerContainer method first determines whether the bean is an implementation class of RocketMQListener, if not, throws an IllegalStateException; and then gets the information of the RocketMQMessageListener annotation to determine whether an unsupported attribute is set; then creates a DefaultRocketMQListenerContainer through createRocketMQListenerContainer and registers it to applicationContext, and then executes the start method for the container without running

After reading the above, have you mastered the role of ListenerContainerConfiguration in rocketmq? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report