In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces the "advanced must-see summary of RocketMQ knowledge points". In the daily operation, I believe that many people have doubts about the advanced must-see summary of RocketMQ knowledge points. The editor consulted all kinds of materials and sorted out simple and useful methods of operation. I hope it will be helpful to answer the doubts of "advanced must-see RocketMQ knowledge points summary". Next, please follow the editor to study!
Overall architecture design of RocketMQ
The overall architecture design is mainly divided into four parts, namely: Producer, Consumer, Broker, NameServer.
In order to be more realistic, I draw cluster deployment, such as Broker, I also draw the master and slave.
Producer: a message producer that can be deployed in a cluster. It will first establish a persistent connection with the random one in the NameServer cluster to know which Broker Master the current Topic to be sent is stored on, and then establish a persistent connection with it to support multiple load balancing modes to send messages.
Consumer: message consumer, can also be deployed in clusters. It will also first establish a persistent connection with the random one in the NameServer cluster to know which Broker Master or Slave the current Topic of the message is stored on, and then they establish a persistent connection to support cluster consumption and broadcast consumption of messages.
Broker: mainly responsible for message storage, query consumption, and supports master-slave deployment. A Master can support reading and writing to multiple Slave,Master, while Slave only supports reading. Broker registers its own routing information with each NameServer in the cluster.
NameServer: a simple Topic routing registry that supports dynamic registration and discovery of Broker and saves the relationship between Topic and Borker. Usually it is also a cluster deployment, but each NameServer does not communicate with each other, and each NameServer has complete routing information, that is, stateless.
Let me summarize the interaction between them with one more paragraph:
Start the NameServer cluster first, and there is no data interaction between each NameServer. After Broker starts, heartbeats will be sent to all NameServer periodically (every 30s), including: IP, Port, TopicInfo,NameServer will regularly scan the Broker survival list. If there is no heartbeat for more than 120s, remove the Broker related information, which means offline.
In this way, each NameServer knows the relevant information of all the Broker in the cluster. At this time, the Producer goes online and can know which Broker the Topic message it wants to send is on the Broker, establish a persistent connection with the corresponding Broker (Master role), and send the message.
Consumer online can also know which Topic it wants to receive Broker from NameServer, establish a connection with the corresponding Master and Slave, and receive messages.
The simple workflow is described above. I believe you already have some impression on the overall data flow. Let's take a look at the details of each part.
NameServer
It is characterized by lightweight and stateless. The role is similar to the situation of Zookeeper, from the above description, we know that its two main functions are: Broker management, routing information management.
Overall, it's relatively simple. I'll post some more fields to give you a more intuitive impression of what it stores.
Producer
Producer is nothing more than a message producer. First of all, it needs to know which Broker the message is sent to, so the mapping of Topic and Broker from a NameServer is stored in local memory every 30s. If a new Broker is found, a long-term connection is established with it, and a heartbeat is sent to Broker to maintain the connection every 30s.
And the Broker that can be sent is polled to send the message to achieve the purpose of load balancer. In the case of synchronous delivery, if the delivery fails, it will be recast twice by default (retryTimesWhenSendFailed = 2), and the broker that failed last time will not be selected and will be delivered to other broker.
It will also be retried in the event of an asynchronous send failure, and the default is twice (retryTimesWhenSendAsyncFailed = 2), but only on the same Broker.
Producer startup process
Then let's take a look at the startup process of Producer to see what has been done.
The general startup flowchart is clear, but some details may not be clear, such as rebalancing, TBW102, and scheduled tasks. Don't worry.
Some people may ask why this producer wants to pull services and rebalance.
Because both Producer and Consumer need to use MQClientInstance, and the same clientId shares a MQClientInstance, clientId is put together through native IP and instanceName (the default is default), so multiple Producer and Consumer actually use a MQClientInstance.
For the scheduled tasks, please take a look at the following figure:
Producer message flow
Let's take a look at the process of sending a message. it's not very complicated, it's just to find out which Broker the Topic to send the message is on, and then send the message.
Now you know what TBW102 is for, that is, the Broker startup that accepts automatic theme creation registers this default theme with NameServer, so that when Producer sends a message for a new Topic, it knows which Broker can automatically create a theme and then sends it to that Broker.
When Broker receives this message, it finds that no corresponding topic is found, but it accepts the creation of a new topic, which creates the corresponding Topic routing information.
Disadvantages of automatically creating themes
Automatically create a topic, then it is possible that messages on that topic will only be sent to one Broker, which does not play the role of load balancing.
After the request to create a new Topic arrives at the Broker, the Broker creates the corresponding routing information, but the heartbeat is sent every 30s, so it takes up to 30s for the NameServer to know the routing information of the new Topic.
Assuming that the sender is still sending messages continuously and quickly, there is actually no routing information about this Topic on the NameServer, so there is an opportunity to allow other automatically created Broker to also create corresponding Topic routing information, so that the Broker in the cluster can accept the Topic information to achieve the purpose of load balancing, but some Broker may not receive it.
If the sender does not send any of the routing information within 30 seconds after sending it, and the previous Broker updates the routing information to the NameServer with the heartbeat, then the Producer that sends the Topic message can only know from the NameServer that the Topic message can only be sent to the previous Broker. This will be unbalanced. If there are many new topic messages, the Broker will have a high load.
Therefore, it is not recommended that online enable allows automatic creation of themes, that is, autoCreateTopicEnable parameters.
Failure delay mechanism for sending messages
One parameter is sendLatencyFaultEnable, which is not enabled by default. The purpose of this parameter is to back off the Broker that timed out for a period of time.
Sending a message records the time it takes to send the message at this time, and if it exceeds a certain amount of time, the Broker is not allowed to be sent for a period of time.
For example, if the sending time exceeds the 15000ms, the message cannot be sent to the Broker within 600000 ms.
This mechanism is actually very critical, and the high probability of sending timeout indicates that the Broker load is high, so avoid it for a while and let it slow down, which is also the key to achieving high availability of message delivery.
Make a brief summary
Producer pulls routing information from NameSrv every 30s to update the local routing table, establishes a long connection with a new Broker, and sends a heartbeat to Broker every 30s.
Do not turn on autoCreateTopicEnable in a production environment.
Producer improves the high availability of message delivery through retry and delay mechanisms.
Broker
Broker is a little more complicated, but very important. Roughly divided into the following five modules, let's take a look at the picture of the official website.
Remoting remote module to handle customer requests. Client Manager manages the client to maintain subscribed topics. Store Service provides message storage query service. HA Serivce, master-slave synchronization is highly available. Index Serivce, which is indexed by specifying key to facilitate query.
There are several modules that have nothing to say, so let's not analyze them. Take a look at the storage first.
Storage of Broker
RocketMQ storage uses a local file storage system, which is efficient and reliable.
It mainly involves three types of files, namely, CommitLog, ConsumeQueue, and IndexFile.
CommitLog
Messages for all topics in RocketMQ are stored in CommitLog. A single CommitLog defaults to 1G, and the file name is named with the starting offset, fixed 20 bits, and the deficiency is preceded by 0. For example, 00000000000000000000 represents the first file, and the second file name is 000000001073741824, indicating that the starting offset is 1073741824. In this way, the corresponding file can be found with the offset.
All messages are written sequentially, and if the file size is exceeded, the next file is opened.
ConsumeQueue
ConsumeQueue message consumption queue can be thought of as the index of messages in CommitLog. Because CommitLog is a mixture of all topics, messages can be found more efficiently through the index.
ConsumeQueue stores entries of a fixed size, storing only 8 bytes of commitlog physical offset, 4 bytes of message length and 8 bytes of Tag hash value, and fixed 20 bytes.
In the actual storage, ConsumeQueue corresponds to a Queue under a Topic, each file is about 5.72m, composed of 30w pieces of data.
Consumers first get the real physical address of the message from ConsumeQueue, and then go to CommitLog to get the message.
IndexFile
IndexFile is the index file, which provides an additional means of finding messages without affecting the main process.
Query the corresponding message through Key or time interval, the file name is named after the creation timestamp, the size of a fixed single IndexFile file is about 400m, and an IndexFile stores 2000W indexes.
Let's look at how the contents of the above three files are generated:
The message is first stored in Commitlog, and then a ReputMessageService thread forwards the message to the message consumption queue file and index file in near real time, that is, it is generated asynchronously.
Message flushing mechanism
RocketMQ provides two options: synchronous and asynchronous flushing of messages. We all know that the efficiency of flushing is relatively low. The efficiency is the highest when simply stored in memory, but the reliability is not high. There are roughly the following situations that affect the reliability of messages:
Broker is violently shut down, such as kill-9Broker, the operating system is down, the machine is powered off, the machine is broken, and the disk is broken.
If it is all 1-4, synchronous flushing will certainly be fine. If it is asynchronous, some messages may be lost. 5 and 6 will have to rely on the copy mechanism. If synchronous double writes must be stable, but the performance is too poor. If asynchronous, it is possible to lose some messages.
So you need to look at the scenario to use synchronous, asynchronous flushing and replica double writing mechanisms.
Page caching and memory mapping
Commitlog is mixed storage, so all messages are written sequentially, and the sequential writing of files is basically the same as the speed of writing to memory.
And RocketMQ files all use memory mapping, that is, Mmap, to map the program virtual page directly to the page cache, without the need for a copy of the kernel state to the user state, let's take a look at the picture drawn by my previous article.
Page cache is actually the cache of files by the operating system, which is used to accelerate the reading and writing of files, that is to say, the writing of files is first written to the page cache, the operating system will flush the disk irregularly (time is uncontrollable), and the reading of files will be loaded into the page cache first. And the contents of the adjacent blocks will be pre-read according to the locality principle.
In fact, it is also because of the use of memory mapping mechanism, so the file storage of RocketMQ uses a fixed-length structure to facilitate mapping the entire file to memory at once.
File pre-allocation and file preheating
The memory mapping is only mapped, and the data will only be loaded into memory when a missing page break occurs when the page is actually read, so RocketMQ makes some optimizations to prevent performance jitter at run time.
File pre-allocation
The default size of CommitLog is 1G. When the size limit is exceeded, new files need to be prepared, and RocketMQ acts as a background thread AllocateMappedFileService. Constantly processing AllocateRequest,AllocateRequest is actually a pre-allocated request, which prepares the allocation of the next file in advance to prevent the allocation of files during the message writing process, resulting in jitter.
File preheating
There is a warmMappedFile method that iterates over each page of the currently mapped file, writes a 0 byte, and then calls mlock and madvise (MADV_WILLNEED).
Mlock: you can lock some or all of the address space used by a process in physical memory to prevent it from being swapped into swap space.
Madvise: advise the operating system that this file will be accessed in the near future, so it might be a good idea to read a few pages in advance.
Make a brief summary
CommitLog uses hybrid storage, where all Topic exist together, write sequentially, and the file name is named after the starting offset.
Messages are written to CommitLog and then distributed to ConsumerQueue and IndexFile through background threads.
Consumers first read the ConsumerQueue to get the physical address of the real message, and then access the CommitLog to get the real message.
The mmap mechanism is used to reduce one copy, and file pre-allocation and file preheating are used to improve performance.
Synchronous and asynchronous flushing are provided, and the appropriate mechanism is selected according to the scene.
HA of Broker
The slave Broker will establish a long connection with the master Broker, then obtain the maximum offset of the master Broker commitlog, and start pulling messages from the master Broker. The master Broker will return a certain number of messages and cycle to achieve master-slave data synchronization.
The consumer message will first request the master Broker, and if the master Broker feels a little stressed right now, it will return the suggestion of pulling the message from the Broker, and then the consumer will pull the message from the server.
Consumer
There are two modes of consumption, namely broadcast mode and cluster mode.
Broadcast mode: each consumer under a grouping consumes a complete Topic message.
Cluster mode: consumers under a grouping split up consumption of Topic messages.
Generally speaking, we use cluster mode.
While consumer messages are divided into push and pull modes, take a detailed look at my article message queue push-pull mode, analyze the advantages and disadvantages of RokcetMQ and Kafka message push-pull mode and push-pull mode from the source code level respectively.
Load balancing Mechanism on the Consumer side
Consumer will regularly obtain the number of queues under Topic, and then find all the consumer information of the same consumer group subscribed to the Topic. The default allocation policy is similar to paging sort allocation.
Put the queue in order, and then the consumers in order, for example, there are 9 queues, 3 consumers, then consumer-1 consumes messages of queue 0, 1, 2, consumer-2 consumes queues 3, 4, 5, and so on.
So if the load is too large, then add queues, plus consumers, through the load balancing mechanism can feel the rebalancing, uniform load.
Retry of Consumer message consumption
It is inevitable that message consumption fails, so you need to provide a retry of consumption failure, while the general consumption failure is either due to incorrect message structure or some status that cannot be processed temporarily, so it is not appropriate to retry immediately.
RocketMQ sets a retry queue for each consumer group, Topic is% RETRY%+consumerGroup, and sets a number of retry levels to delay the retry.
In order to take advantage of the delay queue function of RocketMQ, the retried message is first saved in the delay queue with the name of Topic "SCHEDULE_TOPIC_XXXX", and the original Topic information is stored in the extension field of the message.
Delay returns to the retry queue after a period of time, and then Consumer consumes the retry queue topic to get the previous message.
If the consumption fails for more than a certain number of retries, it will be moved to the dead letter queue, that is, Topic% DLQ% "+ ConsumerGroup. Storing the dead letter queue is considered a success in consumption, because there is nothing you can do about it, so let it go for a while.
Then we can process these messages in the dead letter queue manually.
Global and local order of messages
The global order is to eliminate all concurrency, one Topic and one queue, and the concurrency of Producer and Consuemr is one.
Local order actually refers to a queue order, and multiple queues can still be parallel.
You can specify Producer for a business to send only this queue through MessageQueueSelector, and then Comsuer accepts messages through MessageListenerOrderly, which is actually locked consumption.
There will be a mqLockTable in Broker. When creating a task to pull messages, sequential messages need to be locked in Broker before the message queue is successfully locked.
In fact, strict sequential messages are very difficult. Suppose everything is all right now. If a Broker goes down and rebalancing occurs, the consumer instance corresponding to the queue will change, and disorder may occur. If you want to keep the strict order, you can only make the whole cluster unavailable.
Some points of attention
1. Subscription messages are stored in ConsumerGroup, so each Consumer in ConsumerGroup needs to have the same subscription.
Because subscription messages are uploaded with the heartbeat, if the Consumer subscription information in a ConsumerGroup is different, then they will overwrite each other.
For example, consumer A subscribes to Topic a, consumer B subscribes to Topic b, and consumer A goes to Broker to get the message, then B's heartbeat is sent, Broker is updated, and then he receives a request from A. he looks confused and doesn't have this subscription relationship.
2. RocketMQ master / slave read / write separation
Read from can only be read and cannot be written, and only the offset read by the current client and the maximum offset accepted by the current Broker exceed the limit of physical memory will be read when it is small, so under normal circumstances, traffic can not be shared.
3. Adding machines alone will not increase the consumption speed, and the number of queues needs to keep up.
4. As mentioned earlier, do not allow automatic creation of topics
Best practices for RocketMQ
Some of these best practices are referenced from the official website.
The use of Tags
It is recommended to apply a Topic and use tages to mark different businesses, because the tages setting is more flexible, and a Topic for each application is very clear, which can be identified intuitively.
The use of Keys
If there is a unique identification on the message service, please enter it in the keys field to facilitate future location search.
Improve the consumption power of Consumer
1. Improve the parallelism of consumption: increase the number of queues and consumers, improve the parallel consumption thread of a single consumer, the parameter consumeThreadMax.
2. Batch consumption, set the consumeMessageBatchMaxSize parameter, so that you can get more than one message at a time, and then, for example, a update statement needs to be executed ten times before it is executed once.
3. Skip non-core messages. In order to preserve those core messages when the load is heavy, set those non-core messages. For example, after 1W messages are stacked, you will directly return to consumption success and skip non-core messages.
Addressing of NameServer
Use HTTP static server addressing (default) so that NameServer can discover it dynamically.
JVM option
The following is copied from the official website:
If you are not concerned about the startup time of the RocketMQ Broker, use the "pre-touch" Java heap to ensure that each page will be allocated during JVM initialization.
For those who don't care about startup time, you can enable it:-XX:+AlwaysPreTouch disabling bias locking may reduce JVM pauses,-XX:-UseBiasedLocking for garbage collection, a G1 collector with JDK 1.8 is recommended.
-XX:+UseG1GC-XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25-XX:InitiatingHeapOccupancyPercent=30
In addition, do not set the value of-XX:MaxGCPauseMillis too low, otherwise JVM will use a younger generation to achieve this goal, which will lead to very frequent minor GC, so it is recommended to use rolling GC log files:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5-XX:GCLogFileSize=30m
Linux kernel parameters
The following is copied from the official website:
Vm.extra_free_kbytes, which tells VM to reserve additional available memory between the threshold for background recycling (kswapd) and the threshold for direct recycling (by allocating processes). RocketMQ uses this parameter to avoid long delays in memory allocation. (related to specific kernel versions) vm.min_free_kbytes, if set to lower than 1024KB, will cleverly destroy the system, and the system is prone to deadlocks under high load. Vm.max_map_count, which limits the maximum number of memory-mapped areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so it is recommended that you set a larger value for this parameter. (agressiveness-- > aggressiveness) vm.swappiness, which defines how actively the kernel swaps memory pages. Higher values increase aggression, while lower values reduce the amount of exchange. It is recommended that you set the value to 10 to avoid switching delays. File descriptor limits,RocketMQ needs to open file descriptors for files (CommitLog and ConsumeQueue) and network connections. We recommend setting the value of the file descriptor to 655350. Disk scheduler,RocketMQ recommends the use of the Imax O deadline scheduler, which attempts to provide a guaranteed delay for requests.
At this point, the study on the "advanced must-see summary of RocketMQ knowledge points" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.