In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces "how to correctly use RabbitMQ asynchronous programming". In daily operation, I believe many people have doubts about how to correctly use RabbitMQ asynchronous programming. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "how to correctly use RabbitMQ asynchronous programming"! Next, please follow the editor to study!
1 applicable scenarios
1.1 Branch processes that serve the main process
In the registration process, data writing DB is the main process, but sending coupons or welcome messages to users after registration is a branch process, which is not timely.
1.2 users do not need to see the results in real time
For example, after the delivery order is issued, the distribution and delivery process can be handled asynchronously. After the completion of each stage, you can send a push or text message to the user to let the user know.
1.3 MQ
Buffer distribution of tasks, traffic peaking, service decoupling and message broadcasting.
Of course, asynchronous processing is not only implemented through MQ, but also in other ways.
For example, start a new thread to execute and return Future.
There are also various asynchronous frameworks, such as Vertx, which are implemented through callback
2 the pit of asynchronous processing
2.1 Asynchronous processing requires message compensation to close the loop
Although RabbitMQ can land messages on disk, even if MQ exception message data will not be lost, but asynchronous processes in message sending, transmission, processing and other links, message loss may occur. None of the MQ is guaranteed to be 100% available, and the business design needs to consider how the asynchronous process will continue if it is not available.
Therefore, for asynchronous processing processes, compensation or the establishment of active and standby dual active processes must be considered.
2.1.1 case
A welcome message is sent asynchronously after user registration.
The user registers DB as the synchronous process
After receiving the message, the member service sends a welcome message as an asynchronous process.
Blue line
MQ asynchronous processing (mainline), messages may be lost (dotted lines represent asynchronous calls)
Green line
Compensation Job periodic message compensation (spare line) to compensate for lost messages on the main line
Consider extreme MQ middleware failure scenarios
The processing capacity of the spare line is required to reach the performance of the main line.
Code example
UserController registration + sends asynchronous messages. The registration method registers 10 users at one time, and the probability that the user registration message cannot be sent is 50%.
The MemberService membership service listens for successful registration messages and sends welcome text messages. Use ConcurrentHashMap to store the ID of those users who have sent text messages to achieve idempotency, so as to avoid repeatedly sending text messages when the same users are compensated
For MQ consumer programs, the processing logic must be deduplicated (idempotent):
MQ messages may be duplicated due to misconfiguration and stability of the middleware itself.
Automatic compensation for repetition
For example, in this case, the same message may go both MQ and compensation, which is bound to be repeated, and considering the high cohesion, the compensation Job itself will not do the de-duplication.
Manual compensation for repetition
When message accumulation occurs, the asynchronous processing process is bound to be delayed. If the compensation function is provided, when the processing encounters a delay, it is likely to compensate manually first, and after a period of time, the processor receives the message and repeats the processing.
When there was a MQ failure, hundreds of thousands of fund messages were accumulated in the MQ, resulting in the business being unable to process in a timely manner. The operation thought that the program was wrong, so it first processed it manually through the background. As a result, the message was repeatedly processed after the MQ system was restored, resulting in a large number of funds being reissued.
Asynchronous processing must consider the possibility of message repetition, so the processing logic must be idempotent to prevent repeated processing.
The compensated Job standby operation is then defined.
For scheduled tasks, compensation is made every 5 seconds, because Job does not know which user registered messages may be lost, so it is full compensation.
Compensation logic
Compensate every 5 seconds, compensating 5 users sequentially. The next compensation operation starts from the last user ID of the last compensation.
The compensation task is submitted to the thread pool for "asynchronous" processing to improve processing power.
In order to achieve high cohesion, it is best to use the same method to process messages on both the main line and the backup line. The MemberService in this case listens for MQ messages and CompensationJob compensation, and calls welcome.
The simple compensation logic here is only demo, and the actual production code must:
Consider configuring parameters such as the frequency of compensation, the number of processes per time, and the size of the compensation thread pool as appropriate values to meet the compensated throughput.
Consider the spare line compensation data for appropriate delay
For example, users whose registration time is less than 30s will be compensated to facilitate staggering with the real-time MQ process of the main line and avoid conflicts.
For example, the offset data of which user is currently compensated needs to be landed DB.
The compensated Job itself must be highly available and can use mission systems such as xxl-job or ElasticJob.
Run the program, execute the registration method to register 10 users, and view the log
Visible
A total of 10 users, four of which were successfully sent by MQ: 1, 5, 7, 8
The first run of the compensation task compensates users 2, 3 and 4, the second run compensates users 6 and 9, and the third run complements user 10
The highest standard of message compensation closed loop
The throughput of compensating full data can be achieved. That is, if the compensation spare line is perfect enough, even if the MQ is shut down directly, it will slightly affect the timeliness of processing, but at least ensure that the process can be executed normally.
Summary
The actual development should consider the asynchronous process to lose messages or handle interrupt scenarios.
The asynchronous process needs to have a spare line to compensate, for example, the full compensation method here, even if the asynchronous process fails completely, the business can continue through compensation.
2.2 RabbitMQ broadcast, work queue mode pit
The message mode is broadcast Or work queue
Message broadcast
Different consumers can spend separately on the same news.
Queue mode
Different consumers share the data of the same queue, and the same message can only be consumed once by a certain consumer.
For example, the registration message of the same user
The member service needs to monitor to send welcome text messages.
Marketing services need to monitor to send small gifts to new users.
However, members and marketing services may have multiple instances, and messages from the same user can be broadcast to different services (broadcast mode) at the same time, but for different instances of the same service (such as member service 1 and member service 2), no matter which instance is processed, it can be processed once (work queue mode):
When implementing the code, it is important to confirm the mechanism of the MQ system to ensure that the message is routed as expected.
It is simple and straightforward for RocketMQ to implement similar functions: if consumers belong to a group, messages will only be consumed by one consumer in the same group; if consumers belong to different groups, each group can consume messages once.
The message routing pattern of RabbitMQ is queue + switch, the queue is the message carrier, and the switch determines the way the message is routed to the queue.
Step1: membership service-listens for new user registration messages sent by the user service
If the two member services are started, the registration messages of the same user should only be consumed by one of the instances.
Implement RabbitMQ queue, switch and binding three-piece set respectively.
Queues use anonymous queues
The switch uses DirectExchange, and the route that the switch binds to the anonymous queue Key is an empty string
After receiving the message, print the port used by the instance.
Configuration of message publishers, consumers, and MQ
After the two program instances are started using ports 12345 and 45678, a message is sent and the output log shows that both instances of the same member service have received messages:
So the problem is I don't know.
Binding relationship between RabbitMQ Direct switch and queue
RabbitMQ's direct switch routes messages according to routingKey. Each time the program starts, an anonymous (randomly named) queue is created, so each member service instance corresponds to a separate queue and binds to the direct switch with an empty routingKey.
The user service also sets an empty routingKey when sending messages, so when the direct switch receives the message, it finds that it matches the two queues and forwards the message.
Repair
Do not use anonymous queues for member services, but use the same queues.
Replace the anonymous queue in the above code with a normal queue:
Private static final String QUEUE = "newuserQueue"; @ Beanpublic Queue queue () {return new Queue (QUEUE);}
In this way, only one of the two instances can receive the same message, and different messages are polled and sent to different instances.
Current switch and queue relationship
Step2: user services-broadcast messages to members, marketing services
Members and marketing services are expected to receive broadcast messages, but each instance of a member / marketing service needs to receive a message only once.
Declare a queue and a FanoutExchange, then simulate two user services and two marketing services:
Register four users. The log found that a user registration message was received either by the member service or by the marketing service, which is not a broadcast. It is obviously FanoutExchange that can be used, why doesn't it work?
Because the broadcast switch ignores routingKey, it broadcasts messages to all bound queues. The two member services and two marketing services in this case are bound to the same queue, so the four services can only receive a message once:
Repair
Split queues, membership and marketing services are each bound to a broadcast switch using a separate queue
Current switch and queue structure
It can be verified from the log output that for each MQ message, the member service and the marketing service will receive one message respectively, and one message will be broadcast to two services at the same time, and it will be received by polling in two instances of each service:
Once the asynchronous message routing pattern is misconfigured, it may lead to repeated processing of the message, or it may lead to the failure of important services to receive the message, resulting in business logic errors.
Summary
In a micro-service scenario, when multiple instances of different services listen for messages, generally, different services need to receive the same message at the same time, while multiple instances of the same service only need to poll to receive messages. We need to confirm that the message routing configuration of MQ meets the requirements to avoid duplicate or missed messages.
2.3 Dead letter closes the pit of MQ
A dead letter message that cannot always be processed may cause a blocking MQ.
If there is no upper limit for the task queue of the thread pool, it may eventually lead to OOM, and similar MQ should also pay attention to the problem of task accumulation. For the MQ accumulation caused by sudden traffic, the problem is not big, the appropriate adjustment of consumer spending power should be able to solve. But in many cases, the accumulation of message queues is blocked because there are a large number of messages that cannot be processed all the time.
2.3.1 case
The user service sends a message after the user registers, and the member service listens to the message and distributes coupons to the user, but because the user does not save it successfully, the member service always fails to process the message, and the message enters the queue again, and then the processing fails. The same message that reverberates in MQ is the dead letter.
As MQ is filled with more and more dead letters, consumers need to spend a lot of time repeatedly dealing with dead messages, which hinders the consumption of normal messages, and eventually MQ may collapse due to too much data.
Define a queue, a direct switch, and then bind the queue to the switch
SendMessage sends messages to MQ, accesses one message at a time, and uses self-increasing identifiers as message content
After receiving the message, directly NPE, the simulation processing error
Call the sendMessage API to send two messages, and then go to the RabbitMQ console. You can see that the two messages are always in the queue and are constantly re-delivered, resulting in a re-delivery QPS of 1063.
A large amount of abnormal information can also be seen in the log.
Repair scheme
The simplest way to solve the problem of infinitely repeated queuing of dead letter
When the program makes an error, it throws the AmqpRejectAndDontRequeueException directly to prevent the message from entering the queue again.
Throw new AmqpRejectAndDontRequeueException ("error")
However, I prefer to retry the same message several times to solve the occasional message processing failure caused by network problems, and then deliver the message to the specially set DLX if it still fails. For data from DLX, you may only log and send alarms, and will not repeat delivery even if there is an exception.
The logic is as follows
In view of this problem, let's look at it.
Simple solution for Spring AMQP
Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community
Define dead-letter switch and dead-letter queue. In fact, they are all ordinary switches and queues, but they are specially used to handle dead-letter messages.
Build a RetryOperationsInterceptor through RetryInterceptorBuilder to handle retries in case of failure. The strategy is to try a maximum of 5 times (retry 4 times), and to take an exponential Backoff retry with a delay of 1 second for the first retry, 2 seconds for the second retry, and so on, the maximum delay is 10 seconds; if the fourth retry still fails, use RepublishMessageRecoverer to put the message back into a DLX
Defines the handler for the dead letter queue. Only logs are recorded in this case
Code
Execute the program, send two messages, and view the log:
The interval between the four retries of msg2 is 1 second, 2 seconds, 4 seconds and 8 seconds respectively, plus the first failure, so the maximum number of attempts is 5.
After 4 retries, RepublishMessageRecoverer sends the message to DLX
The dead letter handler outputs got dead message msg2.
Although the two messages are sent almost at the same time, msg2 does not start processing until all four retries of msg1 are over, because the default SimpleMessageListenerContainer has only one consuming thread. Performance problems can be avoided by increasing consumption threads:
Directly set the concurrentConsumers parameter to 10 to increase to 10 worker threads
You can also set the maxConcurrentConsumers parameter to let SimpleMessageListenerContainer dynamically adjust the number of consumer threads.
At this point, the study on "how to correctly use RabbitMQ asynchronous programming" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.