Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to deal with message consumption failure

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the knowledge of "how to deal with message consumption failure". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

I. introduction

Before introducing the message middleware MQ, let's briefly understand why we refer to the message middleware.

For example, in an e-commerce platform, common users who place orders go through the following processes.

When the user places an order, after the order is created, the third-party payment platform will be called to deduct the user's account amount. If the platform payment deduction is successful, the result will be notified to the corresponding business system, and then the business system will update the order status. At the same time, call the warehouse API to reduce inventory and notify logistics for delivery!

Just imagine, from the order status update, to the deduction of inventory, and the notification of logistics delivery are all completed synchronously within one method. If the user pays successfully and the order status update is also successful, but the inventory deduction or logistics delivery step fails, then it will cause a problem, the user has paid successfully, but failed in deducting inventory in the warehouse, resulting in the failure of the whole transaction!

A failure, the boss can pretend not to see, but if thousands of orders fail, then the business losses caused by the system will be huge, the boss may not be able to sit still!

Therefore, for this business scenario, architects introduce asynchronous communication technology solutions to ensure the high availability of services, and the general process is as follows:

When the order system receives the deduction result sent by the payment platform, it will send the order message to the MQ message middleware and update the order status.

At the other end, the warehouse system will asynchronously monitor the message sent by the order system, and then operate services such as deducting inventory and notifying the logistics company to deliver goods after receiving the order message.

Under the optimized process, even if the inventory deduction service fails, it will not affect the user transaction.

As said in the Myth of Man and Moon, software engineering, there is no silver bullet!

When MQ message middleware is introduced, it will also bring another problem. If the MQ message middleware suddenly goes down and the message cannot be sent, the warehouse system will not be able to receive the order message and then cannot deliver the goods!

To solve this problem, the mainstream solution in the industry is to adopt cluster deployment, one master and multi-slave mode, so as to achieve high service availability. Even if a machine suddenly goes down, the service can still be available. During the server failure, restart the service by means of operation and maintenance, and then the service can still run normally!

But there is another problem, if the warehouse system has received the order message, but the business processing exception, or server exception, resulting in the current inventory of goods and no deduction, no delivery!

How should we deal with it at this time?

What we are going to introduce today is this kind of scenario. What should we do if message consumption fails?

II. Solutions

For the scenario where message consumption fails, we usually deal with it in the following ways:

When message consumption fails, the message will be re-pushed

If the number of retries exceeds the maximum, the exception message is stored in the database, and then manually intervened to troubleshoot the problem and retry manually

When the consumption of messages on the client fails, we will add the abnormal message to a message retry object, set the maximum number of retries, and re-push the message to the MQ message middleware. When the number of retries exceeds the maximum, the abnormal message will be stored in the MongoDB database to facilitate subsequent query of abnormal information.

Based on the above system model, we can write a common retry component, without saying much, just do it!

Third, code practice

This compensation service uses rabbitmq message middleware to deal with, other message middleware processing ideas are similar!

3.1. Create a message retry entity class

@ Data @ EqualsAndHashCode (callSuper = false) @ Accessors (chain = true) public class MessageRetryDTO implements Serializable {private static final long serialVersionUID = 1L; / * original message body * / private String bodyMsg; / * Source ID * / private String sourceId; / * Source description * / private String sourceDesc / * switch * / private String exchangeName; / * * routing key * / private String routingKey; / * * queue * / private String queueName; / * status, 1: initialize, 2: successful, 3: failed * / private Integer status = 1 / * maximum number of retries * / private Integer maxTryCount = 3; / * current retries * / private Integer currentRetryCount = 0; / * * retry interval (Ms) * / private Long retryIntervalTime = 0L; / * * Task failure information * / private String errorMsg / * creation time * / private Date createTime @ Override public String toString () {return "MessageRetryDTO {" + "bodyMsg='" + bodyMsg +'\'+ ", sourceId='" + sourceId +'\'+ ", sourceDesc='" + sourceDesc +'\'+ ", exchangeName='" + exchangeName +'\'+ " RoutingKey=' "+ routingKey +'\'+", queueName=' "+ queueName +'\'+", status= "+ status +", maxTryCount= "+ maxTryCount +", currentRetryCount= "+ currentRetryCount +", retryIntervalTime= "+ retryIntervalTime +", errorMsg=' "+ errorMsg +'\'+" CreateTime= "+ createTime +'}' } / * check whether the number of retries exceeds the maximum value * * @ return * / public boolean checkRetryCount () {retryCountCalculate (); / / check whether the number of retries exceeds the maximum value if (this.currentRetryCount < this.maxTryCount) {return true;} return false } / * recalculate number of retries * / private void retryCountCalculate () {this.currentRetryCount = this.currentRetryCount + 1;}}

3.2. Write a service to retry the abstract class

Public abstract class CommonMessageRetryService {private static final Logger log = LoggerFactory.getLogger (CommonMessageRetryService.class); @ Autowired private RabbitTemplate rabbitTemplate; @ Autowired private MongoTemplate mongoTemplate / * * initialization message * * @ param message * / public void initMessage (Message message) {log.info ("{} received message: {}, business data: {}", this.getClass () .getName (), message.toString (), new String (message.getBody () Try {/ / encapsulated message MessageRetryDTO messageRetryDto = buildMessageRetryInfo (message); if (log.isInfoEnabled ()) {log.info ("deserialization message: {}", messageRetryDto.toString ());} prepareAction (messageRetryDto) } catch (Exception e) {log.warn ("handle message exception, error message:", e);}} / * * prepare to execute * * @ param retryDto * / protected void prepareAction (MessageRetryDTO retryDto) {try {execute (retryDto); doSuccessCallBack (retryDto) } catch (Exception e) {log.error ("current task execution exception, business data:" + retryDto.toString (), e) / / failed to execute, calculate whether to continue to retry if (retryDto.checkRetryCount ()) {if (log.isInfoEnabled ()) {log.info ("retry message: {}", retryDto.toString ());} retrySend (retryDto) } else {if (log.isWarnEnabled ()) {log.warn ("maximum number of retries for current task, business data:" + retryDto.toString (), e);} doFailCallBack (retryDto.setErrorMsg (e.getMessage () } / * task executed successfully, callback service (rewrite as needed) * * @ param messageRetryDto * / private void doSuccessCallBack (MessageRetryDTO messageRetryDto) {try {successCallback (messageRetryDto) } catch (Exception e) {log.warn ("execution successful callback exception, queue description: {}, error reason: {}", messageRetryDto.getSourceDesc (), e.getMessage ()) Task execution failed, callback service (rewrite as needed) * * @ param messageRetryDto * / private void doFailCallBack (MessageRetryDTO messageRetryDto) {try {saveMessageRetryInfo (messageRetryDto.setErrorMsg (messageRetryDto.getErrorMsg (); failCallback (messageRetryDto) } catch (Exception e) {log.warn ("execution failed callback exception, queue description: {}, error reason: {}", messageRetryDto.getSourceDesc (), e.getMessage ());}} / * execute task * * @ param messageRetryDto * / protected abstract void execute (MessageRetryDTO messageRetryDto) / * @ param messageRetryDto * / protected abstract void successCallback (MessageRetryDTO messageRetryDto); / * @ param messageRetryDto * / protected abstract void failCallback (MessageRetryDTO messageRetryDto) / * build message compensation entity * @ param message * @ return * / private MessageRetryDTO buildMessageRetryInfo (Message message) {/ / if the header contains a compensation message entity, return Map messageHeaders = message.getMessageProperties () .getHeaders (); if (messageHeaders.containsKey ("message_retry_info")) {Object retryMsg = messageHeaders.get ("message_retry_info") If (Objects.nonNull (retryMsg)) {return JSONObject.parseObject (String.valueOf (retryMsg), MessageRetryDTO.class);}} / / automatically add business messages to the compensation entity MessageRetryDTO messageRetryDto = new MessageRetryDTO (); messageRetryDto.setBodyMsg (new String (message.getBody (), StandardCharsets.UTF_8)); messageRetryDto.setExchangeName (message.getMessageProperties (). GetReceivedExchange ()) MessageRetryDto.setRoutingKey (message.getMessageProperties (). GetReceivedRoutingKey ()); messageRetryDto.setQueueName (message.getMessageProperties (). GetConsumerQueue ()); messageRetryDto.setCreateTime (new Date ()); return messageRetryDto } / * exception messages are re-stored in the library * @ param retryDto * / private void retrySend (MessageRetryDTO retryDto) {/ / put the compensation message entity into the header, and the original message content remains unchanged MessageProperties messageProperties = new MessageProperties (); messageProperties.setContentType (MessageProperties.CONTENT_TYPE_JSON); messageProperties.setHeader ("message_retry_info", JSONObject.toJSON (retryDto)) Message message = new Message (retryDto.getBodyMsg (). GetBytes (), messageProperties); rabbitTemplate.convertAndSend (retryDto.getExchangeName (), retryDto.getRoutingKey (), message);} / * * store exception messages in mongodb * @ param retryDto * / private void saveMessageRetryInfo (MessageRetryDTO retryDto) {try {mongoTemplate.save (retryDto, "message_retry_info") } catch (Exception e) {log.error ("failed to store exception message to mongodb, message data:" + retryDto.toString (), e);}

3.3. Write monitoring service classes

In the consumer application, it is also very simple, for example, for the inventory deduction operation, we can deal with it in the following ways!

@ Component public class OrderServiceListener extends CommonMessageRetryService {private static final Logger log = LoggerFactory.getLogger (OrderServiceListener.class); / * listen for order system to issue order success message * @ param message * / @ RabbitListener (queues = "mq.order.add") public void consume (Message message) {log.info ("order received order success message: {}", message.toString ()); super.initMessage (message) } @ Override protected void execute (MessageRetryDTO messageRetryDto) {/ / call inventory reduction service to throw business exception} @ Override protected void successCallback (MessageRetryDTO messageRetryDto) {/ / business processing succeeded, callback} @ Override protected void failCallback (MessageRetryDTO messageRetryDto) {/ / business processing failed, callback}}

When message consumption fails and the maximum number of times is exceeded, the message is stored in mongodb, and then, like regular database operations, you can query the exception message through the web interface and retry for a specific scenario!

This is the end of "how to deal with message consumption failure". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report