Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Redis to implement a distributed Task Scheduler in Java

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

How to use Redis to implement a distributed task scheduler in Java? aiming at this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.

Code example

Before going any further into the implementation, let's take a look at how the scheduler is used.

Class Demo {

Public static void main (String [] args) {

Var redis = new RedisStore ()

/ / sample is the task grouping name

Var store = new RedisTaskStore (redis, "sample")

/ / 5s is the life of the task lock

Var scheduler = new DistributedScheduler (store, 5)

/ / sign up for a single task

Scheduler.register (Trigger.onceOfDelay (5), Task.of ("once1", ()-> {

System.out.println ("once1")

}))

/ / Register a circular task

Scheduler.register (Trigger.periodOfDelay (5,5), Task.of ("period2", ()-> {

System.out.println ("period2")

}))

/ / sign up for a CRON task

Scheduler.register (Trigger.cronOfMinutes (1), Task.of ("cron3", ()-> {

System.out.println ("cron3")

}))

/ / set the global version number

Scheduler.version (1)

/ / register the listener

Scheduler.listener (ctx-> {

System.out.println (ctx.task (). Name () + "is complete")

});

/ / start the scheduler

Scheduler.start ()

}

}

When code upgrade tasks need to be increased or reduced (or change the scheduling time), only the global version number is incremented, the tasks in the existing process are automatically rescheduled, and those tasks that are not registered (task reduction) are automatically cleared. The new task (new task) will not be scheduled in the process of the old code (the code without the new task cannot be scheduled), and the cleared task (old task) will be unscheduled in the process of the old code.

For example, we need to cancel the period2 task and add the period4 task

Class Demo {

Public static void main (String [] args) {

Var redis = new RedisStore ()

/ / sample is the task grouping name

Var store = new RedisTaskStore (redis, "sample")

/ / 5s is the life of the task lock

Var scheduler = new DistributedScheduler (store, 5)

/ / sign up for a single task

Scheduler.register (Trigger.onceOfDelay (5), Task.of ("once1", ()-> {

System.out.println ("once1")

}))

/ / sign up for a CRON task

Scheduler.register (Trigger.cronOfMinutes (1), Task.of ("cron3", ()-> {

System.out.println ("cron3")

}))

/ / Register a circular task

Scheduler.register (Trigger.periodOfDelay (5,10), Task.of ("period4", ()-> {

System.out.println ("period4")

}))

/ / increase the global version number

Scheduler.version (2)

/ / register the listener

Scheduler.listener (ctx-> {

System.out.println (ctx.task (). Name () + "is complete")

});

/ / start the scheduler

Scheduler.start ()

}

}

Cron4j

It.sauronsoftware.cron4j

Cron4j

2.2.5

This open source library includes basic cron expression parsing and task scheduling, but you don't need to use its scheduler here. I will only use its expression parsing function and a simple method to determine whether the current time matches the expression (whether it is time to run the task).

We have very low time precision requirements for cron, so we only need to determine once a minute whether the current time is the right time to run the task.

Class SchedulingPattern {

/ / whether the expression is valid

Boolean validate (String cronExpr)

/ / whether the task should be run (judge once a minute)

Boolean match (long nowTs)

}

The repulsion of tasks

Because it is a distributed task scheduler, only one process can run at the point in time to control the same task in a multi-process environment. It's easy to use Redis distributed locks. Locks need to be held for a certain amount of time (for example, the default is 5s).

All processes schedule this task at the same time, but only one process can grab the lock. Because of the time inconsistency in the distributed environment, processes on different machines will have a small time difference window. The lock must maintain a window time. Here, I default to set it to 5s (customizable). This requires that the time difference between different machines cannot exceed 5s. If you exceed this value, there will be repeated scheduling.

Public boolean grabTask (String name) {

Var holder = new Holder ()

Redis.execute (jedis-> {

Var lockKey = keyFor ("task_lock", name)

Var ok = jedis.set (lockKey, "true", SetParams.setParams (). Nx (). Ex (lockAge))

Holder.value (ok! = null)

});

Return holder.value ()

}

Global version number

We attach a global version number to the task list and trigger the task reload of the process by changing the version number when the business needs to add or decrease scheduled tasks. This reload process involves polling the global version number (a key of Redis), reloading the task list configuration and rescheduling all tasks immediately if the version number changes.

Private void scheduleReload () {

/ / 1s compare once

This.scheduler.scheduleWithFixedDelay (()-> {

Try {

If (this.reloadIfChanged ()) {

This.rescheduleTasks ()

}

} catch (Exception e) {

LOG.error ("reloading tasks error", e)

}

}, 0,1, TimeUnit.SECONDS)

}

Rescheduling tasks first cancels all currently scheduled tasks, and then schedules all tasks that have just been loaded.

Private void rescheduleTasks () {

This.cancelAllTasks ()

This.scheduleTasks ()

}

Private void cancelAllTasks () {

This.futures.forEach ((name, future)-> {

LOG.warn ("cancelling task {}", name)

Future.cancel (false)

});

This.futures.clear ()

}

Because the task needs to be persisted, a set of task serialization format is designed, which is also very simple, just use text symbols to split the task configuration properties.

/ / one-time task (startTime)

ONCE@2019-04-29T15:26:29.946+0800

/ / cyclic task, (startTime,endTime,period), where the end time of the task is endless.

PERIOD@2019-04-29T15:26:29.949+0800 | 292278994-08-17T15:12:55.807+0800 | 5

/ / cron task, once a minute

CRON@*/1 *

$redis-cli

127.0.0.1 purl 6379 > hgetall sample_triggers

1) "task3"

2) "CRON@*/1 *"

3) "task2"

4) "PERIOD@2019-04-29T15:26:29.949+0800 | 292278994-08-17T15:12:55.807+0800 | 5"

5) "task1"

6) "ONCE@2019-04-29T15:26:29.946+0800"

7) "task4"

8) "PERIOD@2019-04-29T15:26:29.957+0800 | 292278994-08-17T15:12:55.807+0800 | 10"

Thread pool

Time scheduling will have a separate thread (single thread pool), and the task will be run by another thread pool (the number can be customized).

Class DistributedScheduler {

Private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor ()

Private ExecutorService executor = Executors.newFixedThreadPool (threads)

}

The reason for separating the thread pool is to prevent the execution of tasks (IO) from affecting the precise scheduling of time.

Support for non-mutually exclusive tasks

A mutex task requires a single process to run a task. A task without mutex is a task without a distributed lock, which can be run by multiple processes at the same time. Mutex is required by default.

Class Task {

/ * *

* whether to consider multi-process mutual exclusion (true means no mutual exclusion, and multiple processes can run at the same time)

, /

Private boolean concurrent

Private String name

Private Runnable runner

...

Public static Task of (String name, Runnable runner) {

Return new Task (name, false, runner)

}

Public static Task concurrent (String name, Runnable runner) {

Return new Task (name, true, runner)

}

}

Add callback interface

Considering that the user of the scheduler may need to monitor the running status of the task, a simple callback interface has been added, which is relatively simple at present. Be able to report the result of the operation (success or exception) and the time it takes to run

Class TaskContext {

Private Task task

Private long cost; / / Runtime

Private boolean ok

Private Throwable e

}

Interface ISchedulerListener {

Public void onComplete (TaskContext ctx)

}

Support for storage expansion

At present, it only implements task storage in the form of Redis and Memory, and it is also feasible to expand to zk, etcd and relational database, just implement the following interface.

Interface ITaskStore {

Public long getRemoteVersion ()

Public Map getAllTriggers ()

Public void saveAllTriggers (long version, Map triggers)

Public boolean grabTask (String name)

}

This is the answer to the question about how to use Redis to implement a distributed task scheduler in Java. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel for more related knowledge.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report