In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "what is the meaning of RocketMQ sequential messages". In daily operation, I believe that many people have doubts about what RocketMQ sequential messages mean. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "what is the meaning of RocketMQ sequential messages?" Next, please follow the editor to study!
We know that because of the nature of message queues, messages are not consumed sequentially. RocketMQ does not provide so-called sequential messages for us to use, but sometimes some scenarios need to receive messages sequentially. Today we focus on how to implement this function. Although RocketMQ does not provide sequential consumption, we can implement it in disguise. We know that messages need to be placed in a queue before they can be consumed, and the characteristic of the queue itself is FIFO first-in, first-out, which can be achieved by putting messages that need to be sequenced into a queue.
1. Scene analysis
Scenario: messages between the two business systems are transmitted through MQ, and data from business system An is transmitted to business system B, requiring accurate and real-time messages. However, the original data of business system A may be modified, which requires real-time changes in business system B. Ensure the real-time, consistency and reliability of the message.
1.1. Default message production process analysis
The topic test_1 sends messages to the dual-master Broker1 and Broker2 of RocketMQ. The test_1 topic on each broker corresponds to four queues. Messages with a message id of 001001 are created (create) and updated (update). The MQ cluster is dual-master. Using the default message sending algorithm, messages polled are discarded to each queue.
By default, messages are distributed to different queues of each broker according to the polling algorithm, ensuring that the messages in each queue are evenly distributed. When there are multiple consumers in the cluster, multiple consumers will be dispersed to different queues to consume messages to ensure that messages can be consumed in real time.
Because the order of the message itself cannot be guaranteed if it is put into different queues, and the updated message may be consumed first. When the message consumption is created, the business needs to judge whether the message is up-to-date and needs to check the database to verify that it is updated. If not, the latest message is discarded to ensure the consistency of data between business system An and business system B, which increases the difficulty of business processing.
1.2. Analysis of sequential message production process
When we produce messages, we can put the messages of the same message ID into the same queue, and ensure that the messages that need to be consumed sequentially are put into the same queue, so that the messages in the queue are orderly. But at the same time, it is also necessary to ensure that the consumption of messages is orderly in order to ensure the sequential consumption of messages.
In the cluster mode, consumers in the same consumption group share the consumption of message queues under their subscription topics, and the same message queue will only be consumed by one consumer in the consumption group at the same time. A consumer can assign multiple consumption queues at the same time.
For normal messages in cluster mode, the thread pool defaults to creating 20 (configurable) threads. Multithreading pulls messages from the queue to increase concurrency and speed up message consumption.
For sequential messages in cluster mode, sequential consumption is a single thread, and a thread can only go to one queue to obtain data. When you need to obtain messages in a queue, you need to lock the message queue (PS: the principle will be analyzed in detail according to the source code later).
For sequential messages in broadcast mode, sequential consumption is single-threaded and consumed directly. There is no need to lock the message queue because there is no competition with each other.
2. Write Producer2.1, customize queue selector public static void main (String [] args) throws Exception {/ / Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer ("order_group_test_1"); / / Launch the instance. Producer.setNamesrvAddr ("10.10.12.203 new String 9876"); producer.start (); String [] tags = new String [] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int I = 0; I < 100; iTunes +) {int orderId = I% 10 / / Create a message instance, specifying topic, tag and message body. Message msg = new Message ("order_test_1", tags [I% tags.length], "KEY" + I, ("Hello RocketMQ" + I) .getBytes (RemotingHelper.DEFAULT_CHARSET)) SendResult sendResult = producer.send (msg, new MessageQueueSelector () {@ Override public MessageQueue select (List mqs, Message msg, Object arg) {Integer id = (Integer) arg; int index = id% mqs.size () Return mqs.get (index);}}, orderId); System.out.printf ("% s% n", sendResult);} / / server shutdown producer.shutdown ();}
We put them into different queues according to the sequence number of 100 messages, take the module according to the sequence number, and put the same into a queue.
2.2. Queue selector provided
Above is our algorithm for implementing a custom queue selector, and RocketMQ also provides three queue selection algorithms
From the picture, we can see that there are three kinds.
SelectMessageQueueByHash: select queue through hash.
SelectMessageQueueByRandom: randomly select queue.
SelectMessageQueueByMachineRoom: select queue for computer room (not implemented)
Let's take a look at the implementation separately.
2.2.1, SelectMessageQueueByHashpublic class SelectMessageQueueByHash implements MessageQueueSelector {public MessageQueue select (List mqs, Message msg, Object arg) {int value = arg.hashCode (); if (value < 0) {value = Math.abs (value);} value% = mqs.size (); return ((MessageQueue) mqs.get (value)) }}
If we look at the source code, we can find that the HashCode is obtained by the parameters provided, if it is negative, the absolute value is taken, and the hash value is modeled with the total number of queues to obtain its queue.
2.2.2, SelectMessageQueueByRandompublic class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random; public SelectMessageQueueByRandom () {this.random = new Random (System.currentTimeMillis ());} public MessageQueue select (List mqs, Message msg, Object arg) {int value = this.random.nextInt (mqs.size ()); return ((MessageQueue) mqs.get (value));}}
Generate a random number within the number of queues, and obtain the queue through random numbers.
2.2.3. SelectMessageQueueByMachineRoom (not implemented) public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {private Set consumeridcs; public MessageQueue select (List mqs, Message msg, Object arg) {return null;} public Set getConsumeridcs () {return this.consumeridcs;} public void setConsumeridcs (Set consumeridcs) {this.consumeridcs = consumeridcs;}}
We found that the select method is null, but it is not implemented. We need to do it ourselves.
Although RocketMQ provides three queue selection algorithms (actually 2, which are not implemented by SelectMessageQueueByMachineRoom), it is not recommended to use them. Different business rules have different queue selection algorithms. It is recommended to implement them manually.
3. Write Consumerpublic static void main (String [] args) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (); consumer.setConsumerGroup ("order_consumer_test_push"); consumer.setNamesrvAddr ("10.10.12.203 DefaultMQPushConsumer consumer 9876); consumer.subscribe (" order_test_1 "," * ") Consumer.registerMessageListener (new MessageListenerOrderly () {@ Override public ConsumeOrderlyStatus consumeMessage (List paramList) ConsumeOrderlyContext paramConsumeOrderlyContext) {try {for (MessageExt msg: paramList) {String msgbody = new String (msg.getBody (), "utf-8") System.out.println ("MessageBody:" + msgbody); / / output message content}} catch (Exception e) {e.printStackTrace () Return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;} return ConsumeOrderlyStatus.SUCCESS; / / consumption success}}); consumer.start () } catch (Exception e) {/ / TODO Auto-generated catch block e.printStackTrace ();}}
View the result
We find two typical sets of numbers, the tail number is 6 and 9, the tail number is 6, and the tail number is 9, but the result is the same, consumed sequentially.
At this point, the study of "what is the meaning of RocketMQ sequential messages" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.