Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to solve distributed transactions with RocketMQ

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article focuses on "how to solve distributed transactions with RocketMQ". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how RocketMQ solves distributed transactions".

How to ensure consistency:

RocketMQ solution for distributed transactions (reliable message final consistency scheme)

1. System A sends a prepared message to MQ. If the prepared message fails, then cancel the operation directly.

2. If the message is sent successfully, the local transaction (executeLocalTransaction) is executed. If it succeeds, it tells MQ to send a confirmation message, and if it fails, it tells MQ to send a rollback message.

3. If a confirmation message is sent, then system B will receive the confirmation message and execute the local transaction.

4. In step 2 above, sending a confirmation or rollback message failed due to network reasons, but broker has a polling mechanism. According to the unique id query local transaction status, MQ will automatically regularly poll all prepared messages to call back your interface (checkLocalTransaction), asking you whether this message is a failure of local transaction processing, and whether all messages that do not send confirmation should continue to retry or roll back? In the first version, you can check the database to see if the local transaction was executed before, and if it is rolled back, then roll back here, too. This is to avoid the possibility that the local transaction executes successfully and the confirmation message fails to be sent.

PS: this scheme does not support the rollback of transaction initiating services, but most Internet applications do not require transaction initiators to roll back. If transaction initiators are required to roll back, strong consistency schemes such as 2PC, 3PC, TCC and other strong consistency schemes should be adopted to implement distributed transactions, such as LCN.

Order-inventory-distributed transaction

Here through an example to talk about RocketMQ to achieve the specific coding of distributed transactions.

Scenario: when an order is issued, the order service generates the order. When the order is paid successfully, the order status is modified and the inventory service is notified to deduct the inventory.

Database design: CREATE TABLE `buy_ order` (`id`int (11) NOT NULL, `order_ id`varchar 'NOT NULL DEFAULT' 'COMMENT' order id', `buy_ num` int (11) DEFAULT NULL COMMENT 'purchase quantity', `good_ id`int (11) DEFAULT NULL COMMENT 'commodity ID', `user_ id` int (11) DEFAULT NULL COMMENT' user ID', `pay_ status` int (11) DEFAULT NULL COMMENT 'payment status, 0: no payment 1: paid', PRIMARY KEY (`id`) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ciCREATE TABLE `yzy_ repo` (`id` int (11) NOT NULL AUTO_INCREMENT, `good_ name` varchar 'NOT NULL DEFAULT' 'COMMENT' commodity name', `num`int (11) NOT NULL DEFAULT'0' COMMENT 'inventory quantity', PRIMARY KEY (`id`) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT=' test, inventory table'

The main method to start the actual combat order service service package com.transaction.order;import com.alibaba.dubbo.config.annotation.Reference;import com.transaction.repository.IRepositoryService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Service;import org.springframework.web.client.RestTemplate;import java.util.List;@Servicepublic class OrderService {@ Autowired OrderDao orderDao; public final int PAY_DONE = 1 / * check whether the order exists and the status is payment completion * * / public boolean checkOrderPaySuccess (String orderId) {List allOrders = orderDao.findAll (); return allOrders.stream () .anyMatch (order-> order.getOrderId (). Equals (orderId) & & order.getPayStatus () = = PAY_DONE) } / * Update order is completed for payment * * / public void updatePayStatusByOrderId (String orderId) {orderDao.updatePayStatusByOrderId (orderId, PAY_DONE);} / * generate order. The default status is unpaid * * / public void save (String orderId, int num, int goodId,int userId) {YzyOrder yzyOrder = new YzyOrder (); yzyOrder.setOrderId (orderId) YzyOrder.setBuyNum (num); yzyOrder.setGoodId (goodId); yzyOrder.setUserId (userId); orderDao.save (yzyOrder);}} Business process 1. Create an order with an unpaid status in the order table

Execute curl '127.0.0.1virtual 8081 on the terminal or browser to save the numb picture 2 minutes goodwill cards idcards 1percent username idcards 1001'

/ * * generate order API * @ param num * @ param goodId * @ param userId * @ return * / @ GetMapping ("save") public String makeOrder (@ RequestParam ("num") int num, @ RequestParam ("good_id") int goodId @ RequestParam ("user_id") int userId) {orderService.save (UUID.randomUUID () .toString (), num, goodId,userId) Return "success"; 2. After the payment is completed, the inventory will be deducted through the MQ notification service.

OrderController:pay sends the MQ transaction message that the order payment is successful. Note here that you don't call OrderService::updatePayStatusByOrderId directly and then send a normal MQ message. Instead, it first sends the transaction message to MQ, and then MQ calls back the TransactionListener::executeLocalTransaction of the order service, where the order status is updated to ensure the consistency between sending the transaction message and updating the order status.

GetMapping ("pay") public String pay (@ RequestParam ("order_id") String orderId) throws UnsupportedEncodingException, MQClientException, JsonProcessingException {transactionProducer.sendOrderPaySucessEvent (orderId); return "success";} 3. The order server's transaction message listener @ Componentpublic class TransactionProducer implements InitializingBean {private TransactionMQProducer producer; @ Autowired private OrderService orderService; @ Autowired private OrderDao orderDao; @ Override public void afterPropertiesSet () throws Exception {producer = new TransactionMQProducer ("order-pay-group"); producer.setNamesrvAddr ("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876") ThreadFactory threadFactory = new ThreadFactoryBuilder (). SetNameFormat ("transaction-thread-name-%s"). Build (); ThreadPoolExecutor executor = new ThreadPoolExecutor (2,5,60, TimeUnit.SECONDS, new ArrayBlockingQueue (30), threadFactory); producer.setExecutorService (executor) / / set the callback producer.setTransactionListener to send the message (new TransactionListener () {/ * determine whether to execute the local transaction according to the result sent by the message * * callback to this method means that the message has been successfully sent to MQ You can update the order status to "payment successful" * / @ Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) {/ / determine whether the transaction message requires commit and rollback ObjectMapper objectMapper = new ObjectMapper () based on whether the local transaction is executed or not. LocalTransactionState state = LocalTransactionState.UNKNOW; try {OrderRecord record = objectMapper.readValue (msg.getBody (), OrderRecord.class) / / MQ has received the transaction message sent by the TransactionProducer send method. Execute the local transaction / / record the order information locally orderService.updatePayStatusByOrderId (record.getOrderId ()); state = LocalTransactionState.COMMIT_MESSAGE;} catch (UnsupportedEncodingException e) {e.printStackTrace () State = LocalTransactionState.ROLLBACK_MESSAGE;} catch (IOException e) {e.printStackTrace (); state = LocalTransactionState.ROLLBACK_MESSAGE;} return state } / * RocketMQ callback tells broker whether the message was successfully delivered according to whether the local transaction was successfully executed * @ return * / @ Override public LocalTransactionState checkLocalTransaction (MessageExt msg) {ObjectMapper objectMapper = new ObjectMapper (); LocalTransactionState state = LocalTransactionState.UNKNOW OrderRecord record = null; try {record = objectMapper.readValue (msg.getBody (), OrderRecord.class);} catch (IOException e) {e.printStackTrace () } try {/ / judge whether the transaction is executed successfully based on whether there is a transaction_id corresponding to the transfer record boolean isLocalSuccess = orderService.checkOrderPaySuccess (record.getOrderId ()); if (isLocalSuccess) {state = LocalTransactionState.COMMIT_MESSAGE } else {state = LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {e.printStackTrace ();} return state;}}); producer.start () } public void sendOrderPaySucessEvent (String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException {ObjectMapper objectMapper = new ObjectMapper (); YzyOrder order = orderDao.findAll (). Stream () .filter (item- > item.getOrderId (). Equals (orderId)) .filter (Collectors.toList ()) .get (0); if (order = = null) {System.out.println ("not found order" + orderId) } / / construct the sent transaction message OrderRecord record = new OrderRecord (); record.setUserId (order.getUserId ()); record.setOrderId (orderId); record.setBuyNum (order.getBuyNum ()); record.setPayStatus (order.getPayStatus ()); record.setGoodId (order.getGoodId ()) Message message = new Message ("Order-Success", "", record.getOrderId (), objectMapper.writeValueAsString (record) .getBytes (RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult result = producer.sendMessageInTransaction (message, null); System.out.println ("send transaction messages, orderId =" + record.getOrderId () + "+ result.toString ());}} 4. Inventory service deduction of inventory

Issues to pay attention to:

1. Deduction of inventory should prevent it from being deducted into a negative number in the case of concurrency.

two。 The way to update inventory by select before update is to add distributed locks or database optimistic locks, and update statements need to be idempotent.

UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money

3. Pay attention to the consumption idempotent processing through msgId or orderId

In the case of @ Override public int reduce (Integer buyNum, Integer goodId) {/ / concurrent, in order to prevent inventory from being deducted into a negative number, there are three solutions / / 1. Select for update (must be placed in the transaction) / / 2. This logic plus distributed lock / / 3. Add a version field to the database to optimistically lock while (true) {Optional repoOption = repositoryDao.findById (goodId); if (! repoOption.isPresent ()) {return 0;} YzyRepo repo = repoOption.get (); / / avoid database inventory deduction less than zero if (repo.getNum ()-buyNum)

< 0) { return -1; } repo.setNum(repo.getNum() - buyNum); int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId); if(affect >

0) {return affect;}} at this point, I believe you have a deeper understanding of "how RocketMQ solves distributed transactions". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report