Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the use of APScheduler?

2025-03-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

Today, I would like to talk to you about the use of APScheduler, many people may not know much about it. In order to make you understand better, the editor summed up the following content for you. I hope you can get something from this article.

1. Brief introduction

APScheduler is a scheduled task tool developed by Python, which runs across platforms, does not rely on the crontab service of Linux system, and can also be run on windows.

The address of the official document is https://apscheduler.readthedocs.io/en/latest/index.html

A brief introduction

APScheduler has four components

The trigger (triggers) specifies the timing of the execution of the scheduled task

Memory (job stores) can be stored periodically and persisted, either in a database or in redis

# stored in redis

From apscheduler.jobstores.redis import RedisJobStore

# stored in mongo

From apscheduler.jobstores.mongodb import MongoDBJobStore

# stored in the database

From apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

The executors executes the task as a process or thread when the task is scheduled to be executed.

Scheduler (schedulers)

# run in the background

From apscheduler.schedulers.background import BackgroundScheduler

# run in blocking mode, run in the foreground

From apscheduler.schedulers.background import BlockingScheduler

The added tasks can be persisted.

two。 Installation

Pip install apscheduler

3. Trigger Trigger

Date executes at a specific time and date

From datetime import date

From apscheduler.schedulers.blocking import BlockingScheduler

Sched = BlockingScheduler ()

Def my_job (text):

Print (text)

# executed 00:00:00 on November 6, 2019

Sched.add_job (my_job, 'date', run_date=date (2019, 11, 6))

# at 16:30:05 on November 6, 2019, you can specify a detailed time to run

Sched.add_job (my_job, 'date', run_date=datetime (2009, 11, 6, 16, 30, 5))

# Runtime can also be in the form of a string

Sched.add_job (my_job, 'date', run_date='2009-11-06 16 date', run_date='2009 30 date', run_date='2009 05mm, args= [' text])

# execute immediately

Sched.add_job (my_job, 'date')

Sched.start ()

Interval: used when running jobs at regular intervals

Weeks (int)-number of weeks between

Days (int)-number of days between

Hours (int)-hours between

Minutes (int)-minutes between

Seconds (int)-seconds between

Start_date (datetime | str)-the start of the interval

End_date (datetime | str)-the end point of the interval

Timezone (datetime.tzinfo | str)-time zone

Jitter (int | None)-time to delay the execution of the job

From datetime import datetime

# execute every two hours

Sched.add_job (job_function, 'interval', hours=2)

# every two hours from 09:30:00 on October 10, 2018 to 11:00:00 on June 15, 2019

Sched.add_job (job_function, 'interval', hours=2, start_date='2018-10-1009, 30-00-00, end_date='2019-06-15 11-00-00)

Cron: used when running jobs periodically at specific times of the day

Common parameters

Year (int | str)-4-digit year

Month (int | str)-month (1-12)

Day (int | str)-day (1-31)

Week (int | str)-ISO week (1-53)

Day_of_week (int | str)-the number or name of the working day (0-6 or Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday)

Hour (int | str)-hour (0-23)

Minute (int | str)-minutes (0-59)

Second (int | str)-seconds (0-59)

Start_date (datetime | str)-the earliest date / time of trigger (including)

End_date (datetime | str)-date / time the trigger ended (including)

Timezone (datetime.tzinfo | str)-time zone

Jitter (int | None)-delays the execution of the job by a few seconds

Common expression types

# executed at 00:00, 01:00, 02:00 and 03:00 on the third Friday in June, July, August, November and December

Sched.add_job (job_function, 'cron', month='6-8 Magi 11-12, day='3rd fri', hour='0-3')

# executed at 5:30 from Monday to Friday before May 30th, 2014

Sched.add_job (job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

# executed in the form of a decorator, executed on the last Sunday of each month

@ sched.scheduled_job ('cron', id='my_job_id', day='last sun')

Def some_decorated_task ():

Print ("I am printed at 00:00:00 on the last Sunday of every month!")

# can be executed using standard crontab expressions

Sched.add_job (job_function, CronTrigger.from_crontab ('01-15 may-aug *'))

# delay execution by 120 seconds

Sched.add_job (job_function, 'cron', hour='*', jitter=120)

Calendarinterval: used when running jobs at specific times of the day at calendar-based intervals

Parameter settings are the same as those in interval

From datetime import datetime

From apscheduler.schedulers.blocking import BlockingScheduler

Def job_function ():

Print ("Hello World")

Sched = BlockingScheduler ()

# perform this task at 15:36:00 every month

Sched.add_job (job_function, 'calendarinterval', months=1, hour=15, minute=36)

# starting from today at 15:36 every two months, the time range is 2019-6-16 to 2020-3-26

Sched.add_job (job_function, 'calendarinterval', months=2, start_date='2019-06-16'

End_date='2020-03-16, hour=15, minute=36)

Sched.start ()

4. Storage container

REDIS_CONF = {

"password": "xxxxx"

"host": "192.168.137.120"

"port": 6379

"db": 0}

From apscheduler.jobstores.redis import RedisJobStore

From apscheduler.jobstores.mongodb import MongoDBJobStore

From apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# memory

Job_stores = {

# using redis storage

'redis': RedisJobStore (jobs_key=jobs_key, run_times_key=run_times_key, * * REDIS_CONF)

# using mongo storage

'mongo': MongoDBJobStore ()

# Database storage

'default': SQLAlchemyJobStore (url='sqlite:///jobs.sqlite')

}

# Actuator

Executors = {

'default': ThreadPoolExecutor (20), # 20 threads

'processpool': ProcessPoolExecutor (5) # 5 processes

}

Job_defaults = {

'coalesce': False, # the same task is triggered multiple times

'max_instances': 3 # each task can be triggered up to three times at the same time

}

# use configuration and start

Scheduler = BackgroundScheduler (jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

5. The executor executes the task as a process or thread when it is scheduled to be executed.

# Thread execution

From apscheduler.executors.pool import ThreadPoolExecutor

Executors = {

'default': ThreadPoolExecutor (20) # up to 20 threads executing simultaneously

}

Scheduler = BackgroundScheduler (executors=executors)

# the manner of the process

Executors = {

'default': ProcessPoolExecutor (5) # up to 5 processes execute simultaneously

}

6. Dispatcher

BlockingScheduler: used as a stand-alone process

From apscheduler.schedulers.blocking import BlockingScheduler

Scheduler = BlockingScheduler ()

Scheduler.start ()

# here the program will block and copy the code

BackgroundScheduler runs in the background, using in the framework

From apscheduler.schedulers.background import BackgroundScheduler

Scheduler = BackgroundScheduler ()

Scheduler.start ()

# the program will not block and copy the code here

AsyncIOScheduler: use it when your program uses asyncio.

GeventScheduler: use it when your program uses gevent.

TornadoScheduler: use it when your program is based on Tornado.

TwistedScheduler: use it when your program uses Twisted

QtScheduler: you can use it if your application is a Qt application.

7. Three methods of configuration

Method 1

From pytz import utc

From apscheduler.schedulers.background import BackgroundScheduler

From apscheduler.jobstores.mongodb import MongoDBJobStore

From apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

From apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

Jobstores = {

'mongo': MongoDBJobStore ()

'default': SQLAlchemyJobStore (url='sqlite:///jobs.sqlite')

}

Executors = {

'default': ThreadPoolExecutor (20), # maximum number of threads

'processpool': ProcessPoolExecutor (5) # maximum number of processes

}

Job_defaults = {

'coalesce': False

'max_instances': 3 # the maximum number of instances started by the same task

}

# how to use configuration

Scheduler = BackgroundScheduler (jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) which is the best http://www.zzkdfk.com/ for gynecological examination in Zhengzhou

Method 2

From apscheduler.schedulers.background import BackgroundScheduler

# add configuration in the form of a dictionary

Scheduler = BackgroundScheduler ({

'apscheduler.jobstores.mongo': {

'type': 'mongodb'

}

'apscheduler.jobstores.default': {

'type': 'sqlalchemy'

'url': 'sqlite:///jobs.sqlite'

}

'apscheduler.executors.default': {

'class': 'apscheduler.executors.pool:ThreadPoolExecutor'

'max_workers': '20'

}

'apscheduler.executors.processpool': {

'type': 'processpool'

'max_workers':'5'

}

'apscheduler.job_defaults.coalesce': 'false'

'apscheduler.job_defaults.max_instances':'3'

'apscheduler.timezone': 'UTC'

})

Method 3

From pytz import utc

From apscheduler.schedulers.background import BackgroundScheduler

From apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

From apscheduler.executors.pool import ProcessPoolExecutor

Jobstores = {

'mongo': {'type':' mongodb'}

'default': SQLAlchemyJobStore (url='sqlite:///jobs.sqlite')

}

Executors = {

'default': {'type':' threadpool', 'max_workers': 20}

'processpool': ProcessPoolExecutor (max_workers=5)

}

Job_defaults = {

'coalesce': False

'max_instances': 3

}

Scheduler = BackgroundScheduler ()

# use the configure attribute of the scheduler object to increase the configuration of memory and actuator memory

Scheduler.configure (jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

8. Scheduled task start

Scheduler.start ()

For BlockingScheduler, the program blocks here to prevent exiting and is used as a stand-alone process.

For BackgroundScheduler, you can use it in your application. Is no longer used as a separate process.

9. Task management

Mode 1

Job = scheduler.add_job (myfunc, 'interval', minutes=2) # add Task

Job.remove () # Delete Task

Job.pause () # tentative tasks

Job.resume () # recovery task

Job.shutdown () # turn off scheduling

Job.shutdown (wait=False) # does not wait for running tasks

Mode 2

Scheduler.add_job (myfunc, 'interval', minutes=2, id='my_job_id') # add Task

Scheduler.remove_job ('my_job_id') # Delete Task

Scheduler.pause_job ('my_job_id') # interim task

Scheduler.resume_job ('my_job_id') # restore task

Modify scheduling, modify configuration properties of scheduling

Job.modify (max_instances=6, name='Alternate name')

# change trigger

Scheduler.reschedule_job ('my_job_id', trigger='cron', minute='*/5')

Gets the job list get_jobs () method, which returns a list of Job instances

10. Use of logs

Logging is not used in the project

Import logging

Logging.basicConfig ()

Logging.getLogger ('apscheduler') .setLevel (logging.DEBUG)

Integrate into the log in the project

Logger = logging.getLogger ("django")

.

Scheduler = BackgroundScheduler (jobstores=job_stores, executors=executors, job_defaults=job_defaults)

Scheduler._logger = logger

11. A complete example

REDIS_CONF = {

"password": "xxxxx"

"host": "192.168.137.120"

"port": 6379

"db": 0}

Logger = logging.getLogger ("django")

Jobs_key = 'collection_api_apscheduler.jobs'

Run_times_key = 'collection_api_apscheduler.run_times'

Job_stores = {

'default': RedisJobStore (jobs_key=jobs_key, run_times_key=run_times_key, * * REDIS_CONF)

}

Executors = {

'default': {'type':' threadpool', 'max_workers': 60}

}

Job_defaults = {

'coalesce': True, # when the same task is triggered multiple times at the same time, it will only be run once

'max_instances': 3

'misfire_grace_time': 30, # execute this task 30 seconds after expiration

}

Scheduler = BackgroundScheduler (jobstores=job_stores, executors=executors, job_defaults=job_defaults)

Scheduler._logger = logger

# if the job list is in the persistent scheduler, the scheduler continues to execute

If scheduler.get_jobs ():

Scheduler.resume ()

# add scheduled tasks

Scheduler.add_job (handle_news_task, 'date', id='handle_news_task', replace_existing=True)

Scheduler.add_job (.)

Scheduler.start ()

After reading the above, do you have any further understanding of the use of APScheduler? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 245

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report