In addition to Weibo, there is also WeChat
Please pay attention

WeChat public account
Shulou
 
            
                     
                
2025-10-21 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how to solve the pit encountered by SpringBoot integration RocketMQ", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how to solve the pit encountered by SpringBoot integrating RocketMQ".
Application scenario
When implementing RocketMQ consumption, the @ RocketMQMessageListener annotation is generally used to define Group, Topic and selectorExpression (rules for data filtering and selection). In order to support dynamic filtering of data, expressions are generally used, and then dynamically switched through apollo or cloud config.
Introduce reliance on org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 consumer code @ RocketMQMessageListener (consumerGroup = "${rocketmq.group}", topic = "${rocketmq.topic}", selectorExpression = "${rocketmq.selectorExpression}") public class Consumer implements RocketMQListener {@ Override public void onMessage (String s) {System.out.println ("data consumed is:" + s);}} troubleshooting
The default selectorExpression for the entire RocketMQMessageListener annotation is *, which means to receive all the data under the current Topic. If we want to configure tags dynamically, we will find that all the data is filtered when we use the ${rocketmq.selectorExpression} expression. The tracking source code (ListenerContainerConfiguration.java) finds that when creating listener, the data of selectorExpression is overwritten after obtaining the corresponding data in the environment environment variable, resulting in the entire filter condition being changed to an expression.
@ Override public void afterSingletonsInstantiated () {/ / get all bean Map beans = this.applicationContext.getBeansWithAnnotation (RocketMQMessageListener.class) using RocketMQMessageListener annotations; if (Objects.nonNull (beans)) {/ / Circular Registration Container beans.forEach (this::registerContainer) }} private void registerContainer (String beanName, Object bean) {Class clazz = AopProxyUtils.ultimateTargetClass (bean); / / verify whether the current bean implements the RocketMQListener interface if (! RocketMQListener.class.isAssignableFrom (bean.getClass () {throw new IllegalStateException (clazz + "is not instance of" + RocketMQListener.class.getName ()) } / / get annotation RocketMQMessageListener annotation = clazz.getAnnotation (RocketMQMessageListener.class) on bean; / / parse group and topic to support the expression String consumerGroup = this.environment.resolvePlaceholders (annotation.consumerGroup ()); String topic = this.environment.resolvePlaceholders (annotation.topic ()) Boolean listenerEnabled = (boolean) rocketMQProperties.getConsumer (). GetListeners (). GetOrDefault (consumerGroup, Collections.EMPTY_MAP) .getOrDefault (topic, true); if (! listenerEnabled) {log.debug ("Consumer Listener (group: {}, topic: {}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return } validate (annotation); String containerBeanName = String.format ("% slots% s", DefaultRocketMQListenerContainer.class.getName (), counter.incrementAndGet ()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; / / register bean, call createRocketMQListenerContainer genericApplicationContext.registerBean (containerBeanName, DefaultRocketMQListenerContainer.class, ()-> createRocketMQListenerContainer (containerBeanName, bean, annotation)) DefaultRocketMQListenerContainer container = genericApplicationContext.getBean (containerBeanName, DefaultRocketMQListenerContainer.class); if (! container.isRunning ()) {try {container.start ();} catch (Exception e) {log.error ("Started container failed. {} ", container, e); throw new RuntimeException (e);} log.info (" Register the listener to container, listenerBeanName: {}, containerBeanName: {} ", beanName, containerBeanName);} private DefaultRocketMQListenerContainer createRocketMQListenerContainer (String name, Object bean, RocketMQMessageListener annotation) {DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer (); container.setRocketMQMessageListener (annotation) String nameServer = environment.resolvePlaceholders (annotation.nameServer ()); nameServer = StringUtils.isEmpty (nameServer)? RocketMQProperties.getNameServer (): nameServer; String accessChannel = environment.resolvePlaceholders (annotation.accessChannel ()); container.setNameServer (nameServer); if (! StringUtils.isEmpty (accessChannel)) {container.setAccessChannel (AccessChannel.valueOf (accessChannel));} container.setTopic (environment.resolvePlaceholders (annotation.topic () / / the data has been fetched here according to the expression String tags = environment.resolvePlaceholders (annotation.selector_Expression ()); if (! StringUtils.isEmpty (tags)) {container.setSelector_Expression (tags);} container.setConsumerGroup (environment.resolvePlaceholders (annotation.consumerGroup (); / / here the data of SelectorExpression is overwritten as the expression container.setRocketMQMessageListener (annotation) Container.setRocketMQListener ((RocketMQListener) bean); container.setObjectMapper (objectMapper); container.setMessageConverter (rocketMQMessageConverter.getMessageConverter ()); container.setName (name); / / REVIEW ME, use the same clientId or multiple? Return container;} problem solving
Because the ListenerContainerConfiguration class is the afterSingletonsInstantiated method that implements the SmartInitializingSingleton interface, we can parse the selectorExpression data and assign it back through reflection before ListenerContainerConfiguration initialization.
/ * * after springboot initialization, RocketMQ container uses reflection to dynamically change data before initialization * * / @ Configurationpublic class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {@ Autowired private ApplicationContext applicationContext; @ Autowired private StandardEnvironment environment; @ Override public void afterPropertiesSet () throws Exception {Map beans = applicationContext.getBeansWithAnnotation (RocketMQMessageListener.class); for (Object bean: beans.values ()) {Class clazz = AopProxyUtils.ultimateTargetClass (bean) If (! RocketMQListener.class.isAssignableFrom (bean.getClass () {continue;} RocketMQMessageListener annotation = clazz.getAnnotation (RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler (annotation); Field field = invocationHandler.getClass (). GetDeclaredField ("memberValues"); field.setAccessible (true); Map memberValues = (Map) field.get (invocationHandler) For (Map.Entry entry: memberValues.entrySet ()) {if (Objects.nonNull (entry)) {memberValues.put (entry.getKey (), environment.resolvePlaceholders (String.valueOf (entry.getValue ();}
Except for the first time, this Bug has been fixed in the 2.1.0 version of the dependency package, and it is recommended that you use a package above 2.1.0 without causing dependency conflicts.
At this point, I believe you have a deeper understanding of "how to solve the pit encountered in SpringBoot integration RocketMQ". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

The market share of Chrome browser on the desktop has exceeded 70%, and users are complaining about

The world's first 2nm mobile chip: Samsung Exynos 2600 is ready for mass production.According to a r


A US federal judge has ruled that Google can keep its Chrome browser, but it will be prohibited from

Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope





 
             
            About us Contact us Product review car news thenatureplanet
More Form oMedia: AutoTimes. Bestcoffee. SL News. Jarebook. Coffee Hunters. Sundaily. Modezone. NNB. Coffee. Game News. FrontStreet. GGAMEN
© 2024 shulou.com SLNews company. All rights reserved.