In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "how to achieve message filtering in RocketMQ". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to achieve message filtering in RocketMQ".
Message filtering includes two filtering modes: expression-based filtering and class-based filtering. Among them, expression filtering is divided into TAG mode and SQL92 mode, which respectively introduces their filtering mechanism and code sample content, and probes into the principle of message filtering.
1. TAG mode filtering
When sending messages, we set TAG tags for each message, and messages in the same category are placed under a topic TOPIC, but if we classify them, we can classify them according to TAG. Each category of consumers may not be related to all messages under a topic, so we can filter through TAG and subscribe to a certain type of data we follow.
Producerpublic class Producer {public static void main (String [] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer ("producer_test"); producer.setNamesrvAddr ("10.10.12.203 producer.start 9876"); producer.start (); String [] tags = {"TagA", "TagB", "TagC", "TagD"}; for (int I = 0; I
< 40; i++) { try { String tag = tags[i % tags.length]; //构建消息 Message msg = new Message("GumxTest" /* Topic */, tag /* Tag */, ("RocketMQ消息测试,消息的TAG="+tag+" == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }} 主题GumxTest,标签分别是"TagA","TagB","TagC","TagD"每个分别发送10条消息 1.2、consumerpublic class Consumer { public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_tags"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("GumxTest", "TagA || TagC"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); Date date = new Date(msg.getStoreTimestamp()); System.out.println("Consumer1=== 存入时间 : "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.println("Consumer===启动成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }} 查看结果:When consumer groups subscribe to different Tag on the same topic, if the subscription is multiple Tag, it will be split by "| |".
Tag must be the same for topics subscribed to by the same consumer group
2. SQL expression filtering
SQL92 expression message filtering is to run the SQL filtering expression through the properties of the message to match the conditions. When the message is sent, you need to set the user's property putUserProperty method to set the property.
Supported syntax:
Numerical comparison, such as >, > =
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.