Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the basic concepts and principles of RocketMQ?

2025-03-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article will explain in detail what the basic concepts and principles of RocketMQ are, and the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

RocketMQ usage

Basic concept

ProducerGroup

Usually some producer with the same attributes (the type of message processed-topic, and the message processing logic flow-distributed multiple clients) can be classified as the same group. In the transaction message mechanism, if a producer-A that sends a message goes down, so that the transaction message is always in the PREPARED state and times out, the broker will check back other producer of the same group to confirm whether the message should be commit or rollback.

ConsumerGroup

A consumer that has the same logic to consume the same message can be merged into a group. Consumers in the same group can jointly consume (CLUSTERING) messages corresponding to topic to achieve the function of distributed parallel processing.

Topoic

The logical management unit of the message.

Queue

The physical management unit of the message. The introduction of multiple Queue,Queue in a Topic enables message storage to be distributed and clustered, with the ability to scale horizontally.

Consumption schedule management

The broker side of RocketMQ is not responsible for pushing messages and stores them regardless of whether consumers consume them or not. Whoever wants to consume the message sends a request to broker to get the message, and the consumption record is maintained by consumer. RocketMQ provides two storage methods to keep consumption records: one is on the server where consumer is located, and the other is on the broker server. Users can also implement the corresponding consumption progress storage interface by themselves.

By default, using cluster consumption (CLUSTERING) will save the record on the broker side, while using broadcast consumption (BROADCASTING) will keep the consumption record locally.

Sequential message

If a user implements MessageQueueSelector for a batch of messages (usually with the same unique label ID) and selects the same Queue, the consumption of this batch of messages will be sequential (and the consumption will be completed by the same consumer).

Transaction message

Such a message has multiple states and is sent in two phases. In the first stage, a message with PREPARED status is sent. At this time, consumer cannot see the message in this state. After the message is sent, the user's TransactionExecutor interface is called back to perform the corresponding transaction operation (such as database). When the transaction operation is successful, commit is returned to the message, and broker is asked to perform commit operation on the message. The message that becomes commit status is visible to consumer.

Basic principles

Overview

RocketMQ uses Topic to manage messages for different applications. For the producer, to send the message, you need to specify the Topic of the message, and for the consumer, after startup, you need to subscribe to the corresponding Topic, and then you can consume the corresponding message. Topic is a logical concept. In physical implementation, one Topic is composed of multiple Queue. The advantage of using multiple Queue is that Broker storage can be distributed and system performance can be improved.

In RocketMQ, when producer sends messages to Broker, it needs to decide which queue to send. By default, producer will poll to send messages to each queue (all Queue under broker are merged into one List to poll).

For consumer, a fixed queue is assigned to each consumer (if the total number of queues does not change), and consumer pulls unconsumed messages from the fixed queue for processing.

Producer

Logical overview of the Producer side (belonging to client):

The logic on the producer side is relatively simple. You can send the message to a Queue, which Queue can be controlled by the user (MessageQueueSelector interface). By default, the polling method is selected as Queue. On the producer side, the Topic of all Broker and the corresponding Queue information (that is, TopicRoute information) are pulled locally from NameServer, and then a List is built according to it. So in MessageQueueSelector, you can see all the Queue information.

RocketMQ manages the messages of topic with multiple Queue, which makes it easy to scale horizontally and provide system throughput. The problem caused by this distribution is that the overall order cannot be achieved (in many cases, the global sequence is not needed).

When RocketMQ mentions support for sequential messages, he actually refers to the order based on the Queue level. If the user sends a batch of messages that need to satisfy the order (such as a series of subsequent operations of an order number of e-commerce, such as insert, delete, update of a primary key of the database, etc.) to a fixed Queue, the consumer of the message is consumed from this Queue, and the message is consumed sequentially.

Question 1: for the queue of sequential messages, can the cluster be dynamically expanded under non-stop service?

Consumer

The consumer logic is a little more complex. On initial thinking, the processor side needs to at least deal with:

1. Message acquisition

2. Management and storage of offset (consumption progress)

3. The distribution of Queue (rebalance) under the cluster consumption mode.

RocketMQ provides two different forms of Consumer:PushConsumer and PullConsumer. As the name implies, for PullConsumer, users need to actively call the appropriate interface to pull unconsumed messages. For PushConsumer, the CallBack provided by the user for message processing will actively call back the CallBack to process the message when there is a message that has not been consumed. From the user's point of view, there are pull and push in Consumer, but the broker side of RocketMQ itself only stores all messages and is not responsible for push messages, so the underlying implementation of PushConsumer also has a long connection to actively pull unconsumed messages on broker, and then call back the user's callback logic.

Overview of PushConsumer-side logic:

The code for using PushConsumer yourself is very simple:

one

two

three

four

five

six

seven

eight

nine

ten

11 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("groupName")

Consumer.subscribe ("TopicName", "*"); / / a | b | c

Consumer.registerMessaeListener (new MessageListenerConcurrently () {

Public ConsumeConcurrentlyStatus consumeMessage (List msgs

ConsumeConcurrentlyContext context) {

System.out.println ("Consume Message Num:" + msgs.size ())

Return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

}

});

Consumer.start ()

/ / add shutdown hook to execute consumer.shutdown ()

The subscribe in line 2 is a message that is scheduled under some topic and conforms to some tag. This filter is filtered on the server side (there is also filtering logic on the consumer side). Tag is separated by "|". These tag will be parsed into SubscriptionData to store information, mainly storing the string collection of tag and the corresponding hashcode set of these tag (the storage and filtering on the broker side are actually the hashcode corresponding to tagging values, possibly to speed up filtering and save storage space).

The main logic starts after line 10 calls the start function. The main implementation of Consumer is DefaultMQPushConsumerImpl, which contains simple object relationships like the following figure:

The main functions of each object in DefaultMQPushConsumerImpl are as follows:

RebalancePushImpl: mainly responsible for deciding which Queue the current consumer should consume messages from.

PullAPIWrapper: persistent connection, which is responsible for pulling messages from broker, and then uses ConsumeMessageService to call back the user's Listener to perform message consumption logic.

ConsumeMessageService: implements the so-called "Push- passive" consumption mechanism; messages pulled from Broker are encapsulated and submitted to ConsumeMessageSerivce as ConsumeRequest. This service is responsible for calling back users' Listener consumption messages.

OffsetStore: maintains the current consumer consumption record (offset); there are two implementations, Local and Rmote,Local are stored on the local disk and are suitable for the BROADCASTING broadcast consumption model, while Remote stores the consumption progress on Broker, which is suitable for the CLUSTERING cluster consumption model

MQClientFactory: a hodgepodge that manages client (consumer, producer) and provides multiple functional interfaces for each Service (Rebalance, PullMessage, etc.) to call; most of the logic is done in this class

Use

Producer return value

When sending a message, the delivery fails only if an exception is thrown. In other cases, the application layer makes the corresponding trade-off processing logic according to the returned value as follows:

SendStatus

Return value interpretation

SEND_OK sent successfully

FLUSH_DISK_TIMEOUT was sent successfully, but broker flushing failed. If the server goes down, the message will be lost.

FLUSH_SLAVE_TIMEOUT write slave failed; if master goes down, message is lost

SLAVE_NOT_AVAILABLE is never available

Note: when configuring a cluster with multiple master and no slave, if the brokerRole of master is SYNC_MASTER, the sending message will always return this value; the latest version (3.1.14 or above) the transaction message will always fail (if the return value of send _ OK is processed in the transaction message, ROLL_BACK will be performed directly)

When the application side clearly indicates that the message sent by producer is in the status of SEND_OK, it is visible to consumer. Transaction messages can be used to accomplish this function. Starting from version 3.0.14, RocketMQ starts to check SendStatus for transaction messages, and if it is not SEND_OK, it directly performs the rollback of transaction messages.

Consumer return value

When using PushConsumer (using callBack callback to execute application consumption logic)

Non-sequential messages (ConsumeConcurrentlyStatus)

Return value interpretation

CONSUME_SUCCESS consumption is successful

RECONSUME_LATER consumption failed. Re-consume this batch of messages later.

RECONSUMER_LATER 's explanation:

This batch of messages will be sendBack to broker, and will be re-consumed later. You can set parameters to make the "batch" of bulk consumption as one, so that repeated consumption can be avoided to a certain extent. However, after this setting, it may be less efficient. Another method is to control the repeated consumption of this batch of messages through the corresponding ConsumeConcurrentlyContext parameter in the user-specified CallBack (MessageListenerConcurrently).

The specific method is to control the ackIndex variable of context. This variable means that for this batch of messages (List), the messages in [0, ackIndex] are consumed successfully, while the messages in (ackIndex, Lst.size) are consumed failed. If the return value is RECONSUME_LATER, sendBack will be called and sent back to broker for the failed messages. (from the code point of view, this function only works on the consumer of the CLUSTERING consumption model and is directly discarded by BROADCASTING). There is also a small tips that, after a failed call to SendBack, attempts to repeatedly consume these postback failed messages locally in consumer (construct the corresponding ConsumeRequest). This processing mode (consumption first, messages that failed to consume try to send back to broker, messages that failed to send back to broker try to reconsume on the server side) keep trying until consumption succeeds or postback to broker succeeds.

Sequential messages (ConsumerOrderlyStatus)

Return value interpretation

SUCCESS message processing succeeded

ROLLBACK rollback messages-seems to be used in transaction messages

COMMIT commit message-seems to be used in transaction messages

SUSPEND_CURRENT_QUEUE_A_MOMENT current queue is suspended for a period of time

Question: how to understand the ROLLBACK and COMMIT on the consumer side?

General message

Use TIPS

Cluster building

1. Basic configuration

Use. / bin/mqbroker-p > conf/broker.conf to check the default values of all parameters, and modify the corresponding configuration according to the needs of your own cluster

two。 Cluster selection

The RocketMQ cluster supports some of the following modes of configuration:

The characteristics of cluster mode are applicable to the scenario.

One Broker instance for a single Master

Or multiple Broker instances, but Topic is only configured on one Broker; test

Multiple Master, no Slave, multiple Broker instances form a cluster, and the brokerID is all 0 (that is, the roles are all Master)

After master is hung up, this unconsumed message on master cannot be consumed for the time being.

Application scenarios that can tolerate message loss (not consumed by consumer), such as log collection

Multi-Master and multi-Slave each master has a high availability of slave HA for backup. When master-slave uses synchronous double write (asynchronous, any message that has not been written to slave may be lost), master hangs, and messages can be consumed from slave. However, automatic Failover is not supported when master is hung up (so producer writing is not supported).

Question: how to switch manually? Do you need to change the configuration of Slave to Master, and then restart the broker instance? The cost is a little high, do you support sending commands to switch?

When building a cluster with multiple master and no slave, the brokerRole configuration of master must be ASYNC_MASTER. If configured as SYNC_MASTER, the SendStatus of the return value will always be SLAVE_NOT_AVAILABLE when producer sends a message.

3. System parameter optimization

Refer to the parameters in bin/os.sh and focus on the physical memory reservation parameters (vm.min_free_kbytes).

4. Startup of broker

When you start broker in cluster mode, you need to set the list of the nameserver address. It must be noted here that all nameserver addresses must be included, because the nameserver of rocketmq is not synchronized, and broker is required to actively report. If there are three nameserver: A B C, and only An is specified at startup, but B / C is forgotten, the client will fail if it just links B or C to get the broker information.

Message usage

Problem sorting

Question:

1. What is the server disk configuration (focus on disk configuration and total disk capacity, for example: n * 4T SATA 7200)?

2. Has the disk of the server been raid? What kind of raid do you make? Is the disk swiping mode SYNC or ASYNC?

3. Which method is used to build rocketmq (multi-master without slave, multi-master and multi-slave)? If it is master-slave mode, is master-slave synchronization SYNC or ASYNC mode?

Answer:

1. Usually 3T disks, but actually 12 sas 15000-turn disks of 600,000,12* 600g have done raid10.

2. the disk refresh mode is usually asynchronous.

3. Most clusters do not enable slave, while a small number of clusters enable slave in sync mode.

On the basic concepts and principles of RocketMQ what is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report