In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article will explain in detail what the basic concepts and principles of RocketMQ are, and the content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
RocketMQ usage
Basic concept
ProducerGroup
Usually some producer with the same attributes (the type of message processed-topic, and the message processing logic flow-distributed multiple clients) can be classified as the same group. In the transaction message mechanism, if a producer-A that sends a message goes down, so that the transaction message is always in the PREPARED state and times out, the broker will check back other producer of the same group to confirm whether the message should be commit or rollback.
ConsumerGroup
A consumer that has the same logic to consume the same message can be merged into a group. Consumers in the same group can jointly consume (CLUSTERING) messages corresponding to topic to achieve the function of distributed parallel processing.
Topoic
The logical management unit of the message.
Queue
The physical management unit of the message. The introduction of multiple Queue,Queue in a Topic enables message storage to be distributed and clustered, with the ability to scale horizontally.
Consumption schedule management
The broker side of RocketMQ is not responsible for pushing messages and stores them regardless of whether consumers consume them or not. Whoever wants to consume the message sends a request to broker to get the message, and the consumption record is maintained by consumer. RocketMQ provides two storage methods to keep consumption records: one is on the server where consumer is located, and the other is on the broker server. Users can also implement the corresponding consumption progress storage interface by themselves.
By default, using cluster consumption (CLUSTERING) will save the record on the broker side, while using broadcast consumption (BROADCASTING) will keep the consumption record locally.
Sequential message
If a user implements MessageQueueSelector for a batch of messages (usually with the same unique label ID) and selects the same Queue, the consumption of this batch of messages will be sequential (and the consumption will be completed by the same consumer).
Transaction message
Such a message has multiple states and is sent in two phases. In the first stage, a message with PREPARED status is sent. At this time, consumer cannot see the message in this state. After the message is sent, the user's TransactionExecutor interface is called back to perform the corresponding transaction operation (such as database). When the transaction operation is successful, commit is returned to the message, and broker is asked to perform commit operation on the message. The message that becomes commit status is visible to consumer.
Basic principles
Overview
RocketMQ uses Topic to manage messages for different applications. For the producer, to send the message, you need to specify the Topic of the message, and for the consumer, after startup, you need to subscribe to the corresponding Topic, and then you can consume the corresponding message. Topic is a logical concept. In physical implementation, one Topic is composed of multiple Queue. The advantage of using multiple Queue is that Broker storage can be distributed and system performance can be improved.
In RocketMQ, when producer sends messages to Broker, it needs to decide which queue to send. By default, producer will poll to send messages to each queue (all Queue under broker are merged into one List to poll).
For consumer, a fixed queue is assigned to each consumer (if the total number of queues does not change), and consumer pulls unconsumed messages from the fixed queue for processing.
Producer
Logical overview of the Producer side (belonging to client):
The logic on the producer side is relatively simple. You can send the message to a Queue, which Queue can be controlled by the user (MessageQueueSelector interface). By default, the polling method is selected as Queue. On the producer side, the Topic of all Broker and the corresponding Queue information (that is, TopicRoute information) are pulled locally from NameServer, and then a List is built according to it. So in MessageQueueSelector, you can see all the Queue information.
RocketMQ manages the messages of topic with multiple Queue, which makes it easy to scale horizontally and provide system throughput. The problem caused by this distribution is that the overall order cannot be achieved (in many cases, the global sequence is not needed).
When RocketMQ mentions support for sequential messages, he actually refers to the order based on the Queue level. If the user sends a batch of messages that need to satisfy the order (such as a series of subsequent operations of an order number of e-commerce, such as insert, delete, update of a primary key of the database, etc.) to a fixed Queue, the consumer of the message is consumed from this Queue, and the message is consumed sequentially.
Question 1: for the queue of sequential messages, can the cluster be dynamically expanded under non-stop service?
Consumer
The consumer logic is a little more complex. On initial thinking, the processor side needs to at least deal with:
1. Message acquisition
2. Management and storage of offset (consumption progress)
3. The distribution of Queue (rebalance) under the cluster consumption mode.
RocketMQ provides two different forms of Consumer:PushConsumer and PullConsumer. As the name implies, for PullConsumer, users need to actively call the appropriate interface to pull unconsumed messages. For PushConsumer, the CallBack provided by the user for message processing will actively call back the CallBack to process the message when there is a message that has not been consumed. From the user's point of view, there are pull and push in Consumer, but the broker side of RocketMQ itself only stores all messages and is not responsible for push messages, so the underlying implementation of PushConsumer also has a long connection to actively pull unconsumed messages on broker, and then call back the user's callback logic.
Overview of PushConsumer-side logic:
The code for using PushConsumer yourself is very simple:
one
two
three
four
five
six
seven
eight
nine
ten
11 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("groupName")
Consumer.subscribe ("TopicName", "*"); / / a | b | c
Consumer.registerMessaeListener (new MessageListenerConcurrently () {
Public ConsumeConcurrentlyStatus consumeMessage (List msgs
ConsumeConcurrentlyContext context) {
System.out.println ("Consume Message Num:" + msgs.size ())
Return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
}
});
Consumer.start ()
/ / add shutdown hook to execute consumer.shutdown ()
The subscribe in line 2 is a message that is scheduled under some topic and conforms to some tag. This filter is filtered on the server side (there is also filtering logic on the consumer side). Tag is separated by "|". These tag will be parsed into SubscriptionData to store information, mainly storing the string collection of tag and the corresponding hashcode set of these tag (the storage and filtering on the broker side are actually the hashcode corresponding to tagging values, possibly to speed up filtering and save storage space).
The main logic starts after line 10 calls the start function. The main implementation of Consumer is DefaultMQPushConsumerImpl, which contains simple object relationships like the following figure:
The main functions of each object in DefaultMQPushConsumerImpl are as follows:
RebalancePushImpl: mainly responsible for deciding which Queue the current consumer should consume messages from.
PullAPIWrapper: persistent connection, which is responsible for pulling messages from broker, and then uses ConsumeMessageService to call back the user's Listener to perform message consumption logic.
ConsumeMessageService: implements the so-called "Push- passive" consumption mechanism; messages pulled from Broker are encapsulated and submitted to ConsumeMessageSerivce as ConsumeRequest. This service is responsible for calling back users' Listener consumption messages.
OffsetStore: maintains the current consumer consumption record (offset); there are two implementations, Local and Rmote,Local are stored on the local disk and are suitable for the BROADCASTING broadcast consumption model, while Remote stores the consumption progress on Broker, which is suitable for the CLUSTERING cluster consumption model
MQClientFactory: a hodgepodge that manages client (consumer, producer) and provides multiple functional interfaces for each Service (Rebalance, PullMessage, etc.) to call; most of the logic is done in this class
Use
Producer return value
When sending a message, the delivery fails only if an exception is thrown. In other cases, the application layer makes the corresponding trade-off processing logic according to the returned value as follows:
SendStatus
Return value interpretation
SEND_OK sent successfully
FLUSH_DISK_TIMEOUT was sent successfully, but broker flushing failed. If the server goes down, the message will be lost.
FLUSH_SLAVE_TIMEOUT write slave failed; if master goes down, message is lost
SLAVE_NOT_AVAILABLE is never available
Note: when configuring a cluster with multiple master and no slave, if the brokerRole of master is SYNC_MASTER, the sending message will always return this value; the latest version (3.1.14 or above) the transaction message will always fail (if the return value of send _ OK is processed in the transaction message, ROLL_BACK will be performed directly)
When the application side clearly indicates that the message sent by producer is in the status of SEND_OK, it is visible to consumer. Transaction messages can be used to accomplish this function. Starting from version 3.0.14, RocketMQ starts to check SendStatus for transaction messages, and if it is not SEND_OK, it directly performs the rollback of transaction messages.
Consumer return value
When using PushConsumer (using callBack callback to execute application consumption logic)
Non-sequential messages (ConsumeConcurrentlyStatus)
Return value interpretation
CONSUME_SUCCESS consumption is successful
RECONSUME_LATER consumption failed. Re-consume this batch of messages later.
RECONSUMER_LATER 's explanation:
This batch of messages will be sendBack to broker, and will be re-consumed later. You can set parameters to make the "batch" of bulk consumption as one, so that repeated consumption can be avoided to a certain extent. However, after this setting, it may be less efficient. Another method is to control the repeated consumption of this batch of messages through the corresponding ConsumeConcurrentlyContext parameter in the user-specified CallBack (MessageListenerConcurrently).
The specific method is to control the ackIndex variable of context. This variable means that for this batch of messages (List), the messages in [0, ackIndex] are consumed successfully, while the messages in (ackIndex, Lst.size) are consumed failed. If the return value is RECONSUME_LATER, sendBack will be called and sent back to broker for the failed messages. (from the code point of view, this function only works on the consumer of the CLUSTERING consumption model and is directly discarded by BROADCASTING). There is also a small tips that, after a failed call to SendBack, attempts to repeatedly consume these postback failed messages locally in consumer (construct the corresponding ConsumeRequest). This processing mode (consumption first, messages that failed to consume try to send back to broker, messages that failed to send back to broker try to reconsume on the server side) keep trying until consumption succeeds or postback to broker succeeds.
Sequential messages (ConsumerOrderlyStatus)
Return value interpretation
SUCCESS message processing succeeded
ROLLBACK rollback messages-seems to be used in transaction messages
COMMIT commit message-seems to be used in transaction messages
SUSPEND_CURRENT_QUEUE_A_MOMENT current queue is suspended for a period of time
Question: how to understand the ROLLBACK and COMMIT on the consumer side?
General message
Use TIPS
Cluster building
1. Basic configuration
Use. / bin/mqbroker-p > conf/broker.conf to check the default values of all parameters, and modify the corresponding configuration according to the needs of your own cluster
two。 Cluster selection
The RocketMQ cluster supports some of the following modes of configuration:
The characteristics of cluster mode are applicable to the scenario.
One Broker instance for a single Master
Or multiple Broker instances, but Topic is only configured on one Broker; test
Multiple Master, no Slave, multiple Broker instances form a cluster, and the brokerID is all 0 (that is, the roles are all Master)
After master is hung up, this unconsumed message on master cannot be consumed for the time being.
Application scenarios that can tolerate message loss (not consumed by consumer), such as log collection
Multi-Master and multi-Slave each master has a high availability of slave HA for backup. When master-slave uses synchronous double write (asynchronous, any message that has not been written to slave may be lost), master hangs, and messages can be consumed from slave. However, automatic Failover is not supported when master is hung up (so producer writing is not supported).
Question: how to switch manually? Do you need to change the configuration of Slave to Master, and then restart the broker instance? The cost is a little high, do you support sending commands to switch?
When building a cluster with multiple master and no slave, the brokerRole configuration of master must be ASYNC_MASTER. If configured as SYNC_MASTER, the SendStatus of the return value will always be SLAVE_NOT_AVAILABLE when producer sends a message.
3. System parameter optimization
Refer to the parameters in bin/os.sh and focus on the physical memory reservation parameters (vm.min_free_kbytes).
4. Startup of broker
When you start broker in cluster mode, you need to set the list of the nameserver address. It must be noted here that all nameserver addresses must be included, because the nameserver of rocketmq is not synchronized, and broker is required to actively report. If there are three nameserver: A B C, and only An is specified at startup, but B / C is forgotten, the client will fail if it just links B or C to get the broker information.
Message usage
Problem sorting
Question:
1. What is the server disk configuration (focus on disk configuration and total disk capacity, for example: n * 4T SATA 7200)?
2. Has the disk of the server been raid? What kind of raid do you make? Is the disk swiping mode SYNC or ASYNC?
3. Which method is used to build rocketmq (multi-master without slave, multi-master and multi-slave)? If it is master-slave mode, is master-slave synchronization SYNC or ASYNC mode?
Answer:
1. Usually 3T disks, but actually 12 sas 15000-turn disks of 600,000,12* 600g have done raid10.
2. the disk refresh mode is usually asynchronous.
3. Most clusters do not enable slave, while a small number of clusters enable slave in sync mode.
On the basic concepts and principles of RocketMQ what is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.