In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Recently, I happened to encounter the scene of delayed tasks in the production environment. I investigated the current mainstream schemes, analyzed the advantages and disadvantages, and finalized the final plan. This article records the process of the research and the implementation of the preliminary plan.
Comparison of candidate schemes
Here are several solutions to achieve delayed tasks, summing up the corresponding advantages and disadvantages.
The advantages and disadvantages of the scheme use the delay queue DelayQueue built in scenario JDK to implement simple data memory state, unreliable and relatively low consistency scenario scheduling framework and MySQL for short interval polling. DLX and TTL of RabbitMQ, which have obvious performance bottlenecks, less data volume and relatively low real-time performance, are generally called dead letter queue scheme. Asynchronous interaction can cut the time length of peak delay is uncontrollable. If the data needs to be persisted, the performance will be degraded-the scheduling framework and Redis carry out short interval polling data persistence. High performance is often difficult to implement in the payment result callback scheme with high real-time performance, high memory consumption and high real-time performance
If the amount of data applied is not high and the real-time requirement is relatively low, it is the best scheme to choose the scheduling framework and MySQL for short interval polling. However, the amount of data of the scene encountered by the author is relatively large, and the real-time performance is not high, so the scheme of sweeping the database will certainly cause great pressure on the MySQL instance. I remember seeing a PPT called "the Evolution of Box Technology aggregation payment system" a long time ago, in which there was a picture that gave me some inspiration:
It happens to use the scheduling framework and Redis for short interval polling to achieve delayed tasks, but in order to share the pressure of the application, the scheme in the figure is also fragmented. In view of the author's current business is urgent, so in the first phase of the program does not consider slicing, only a simplified version of the implementation.
Since there is no code or framework posted in PPT, some technical points that need to be solved need to be thought about on their own, and the following will reproduce the details of the implementation of the whole solution.
Scene design
The actual production scenario is that a system in charge of the author needs to connect with an external financier, and the corresponding attachment needs to be pushed in 30 minutes after each fund order is issued. This is simplified to a scenario of delayed processing of order information data, in which an order message (temporarily called OrderMessage) is recorded for each order issued. The order message needs to be processed asynchronously after a delay of 5 to 15 seconds.
The idea of realizing the rejected candidate scheme
Let's introduce the other four unselected candidates and analyze the implementation process with some pseudo-code and process.
JDK built-in delay queue
DelayQueue is an implementation of blocking queues, and its queue elements must be subclasses of Delayed. Here's a simple example:
Public class DelayQueueMain {private static final Logger LOGGER = LoggerFactory.getLogger (DelayQueueMain.class); public static void main (String [] args) throws Exception {DelayQueue queue = new DelayQueue (); / / default delay 5 seconds OrderMessage message = new OrderMessage ("ORDER_ID_10086"); queue.add (message); / / delay 6 seconds message = new OrderMessage ("ORDER_ID_10087", 6); queue.add (message) / / delay 10 seconds message = new OrderMessage ("ORDER_ID_10088", 10); queue.add (message); ExecutorService executorService = Executors.newSingleThreadExecutor (r-> {Thread thread = new Thread (r); thread.setName ("DelayWorker"); thread.setDaemon (true); return thread;}); LOGGER.info ("start scheduling thread.") ExecutorService.execute (()-> {while (true) {try {OrderMessage task = queue.take (); LOGGER.info ("delayed order message, {}", task.getDescription ());} catch (Exception e) {LOGGER.error (e.getMessage (), e);}); Thread.sleep (Integer.MAX_VALUE) } private static class OrderMessage implements Delayed {private static final DateTimeFormatter F = DateTimeFormatter.ofPattern ("yyyy-MM-dd HH:mm:ss"); / * * default delay 5000 Ms * / private static final long DELAY_MS = 1000L * 5; / * * order ID * / private final String orderId; / * creation timestamp * / private final long timestamp / * * Expiration time * / private final long expire; / * description * / private final String description; public OrderMessage (String orderId, long expireSeconds) {this.orderId = orderId; this.timestamp = System.currentTimeMillis (); this.expire = this.timestamp + expireSeconds * 1000L This.description = String.format ("order [% s]-creation time:% s, timeout:% s", orderId, LocalDateTime.ofInstant (Instant.ofEpochMilli (timestamp), ZoneId.systemDefault ()) .format (F), LocalDateTime.ofInstant (Instant.ofEpochMilli (expire), ZoneId.systemDefault ()) .format (F);} public OrderMessage (String orderId) {this.orderId = orderId; this.timestamp = System.currentTimeMillis () This.expire = this.timestamp + DELAY_MS; this.description = String.format ("order [% s]-creation time is:% s, timeout is:% s", orderId, LocalDateTime.ofInstant (Instant.ofEpochMilli (timestamp), ZoneId.systemDefault ()) .format (F), LocalDateTime.ofInstant (Instant.ofEpochMilli (expire), ZoneId.systemDefault ()) .format (F);} public String getOrderId () {return orderId } public long getTimestamp () {return timest} public long getExpire () {return expire;} public String getDescription () {return description;} @ Override public long getDelay (TimeUnit unit) {return unit.convert (this.expire-System.currentTimeMillis (), TimeUnit.MILLISECONDS) @ Override public int compareTo (Delayed o) {return (int) (this.getDelay (TimeUnit.MILLISECONDS)-o.getDelay (TimeUnit.MILLISECONDS));}
Note that when OrderMessage implements the Delayed interface, the key is to implement Delayed#getDelay () and Delayed#compareTo (). Run the main () method:
10 DelayWorker 16 INFO club.throwable.delay.DelayQueueMain 13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain-delayed processing order message, order [ORDER_ID_10086]-creation time is: 2019-08-20 10:16:08, timeout is: 2019-08-20 10:16:08-delayed processing order message: 2019-08-20 10 DelayWorker 16V 14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain-delayed processing order message Order [ORDER_ID_10087]-creation time is: 2019-08-20 10:16:08, timeout is: 2019-08-20 10 INFO club.throwable.delay.DelayQueueMain: 2019-08-20 10 INFO club.throwable.delay.DelayQueueMain-delay in processing order message, order [ORDER_ID_10088]-creation time is 2019-08-20 10:16:08, timeout is 2019-08-20 10:16:18
Scheduling Framework + MySQL
It is less difficult to use the scheduling framework to poll MySQL tables at short intervals, which should be preferred when the service is just online, the table data is not much and the real-time performance is not high. However, pay attention to the following points:
MySQL
Quartz, MySQL's Java driver package and spring-boot-starter-jdbc are introduced (this is just to facilitate the implementation of a relatively lightweight framework, and other more reasonable frameworks can be selected according to the scene in production):
Mysql mysql-connector-java 5.1.48 test org.springframework.boot spring-boot-starter-jdbc 2.1.7.RELEASE test org.quartz-scheduler quartz 2.3.1 test
Suppose the table is designed as follows:
CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;USE `delayTask` CREATE TABLE `tdispatordermessage` (id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR (50) NOT NULL COMMENT 'order ID', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT' creation date time', edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'modification date time', retry_times TINYINT NOT NULL DEFAULT 0 COMMENT 'retries', order_status TINYINT NOT NULL DEFAULT 0 COMMENT 'order status', INDEX idx_order_id (order_id) INDEX idx_create_time (create_time)) COMMENT 'order Information Table' # write two pieces of test data INSERT INTO t_order_message (order_id) VALUES ('10086'), (' 10087')
Write code:
/ / constant public class OrderConstants {public static final int MAX_RETRY_TIMES = 5; public static final int PENDING = 0; public static final int SUCCESS = 1; public static final int FAIL =-1; public static final int LIMIT = 10;} / / entity @ Builder@Datapublic class OrderMessage {private Long id; private String orderId; private LocalDateTime createTime; private LocalDateTime editTime; private Integer retryTimes; private Integer orderStatus;} / / DAO@RequiredArgsConstructorpublic class OrderMessageDao {private final JdbcTemplate jdbcTemplate Private static final ResultSetExtractor M = r-> {List list = Lists.newArrayList () While (r.next ()) {list.add (OrderMessage.builder () .id (r.getLong ("id")) .orderId (r.getString ("order_id")) .createTime (r.getTimestamp ("create_time"). ToLocalDateTime () .editTime (r.getTimestamp ("edit_time"). ToLocalDateTime ()) .retryTimes (r.getInt ("retry") _ times ") .orderStatus (r.getInt (" order_status ")) .build () } return list;}; public List selectPendingRecords (LocalDateTime start, LocalDateTime end, List statusList, int maxRetryTimes, int limit) {StringJoiner joiner = new StringJoiner (","); statusList.forEach (s-> joiner.add (String.valueOf (s) Return jdbcTemplate.query ("SELECT * FROM t_order_message WHERE create_time > =?" AND create_time
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.