Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use celery

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article will explain in detail how to use celery for you. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.

one。 What is CleleryCelery is a simple, flexible and reliable distributed system that handles a large number of messages

An asynchronous task queue focused on real-time processing

It also supports task scheduling.

Celery architecture

The architecture of Celery consists of three parts, message middleware (message broker), task execution unit (worker) and task execution result storage (task result store).

Message middleware Celery does not provide message service itself, but it can be easily integrated with the message middleware provided by the third party. Including, RabbitMQ, Redis and so on.

Task execution unit Worker is a task execution unit provided by Celery. Worker runs concurrently in distributed system nodes.

Task result storage Task result store is used to store the results of tasks performed by Worker. Celery supports storing the results of tasks in different ways, including AMQP, redis, etc.

Version support

Celery version 4.0 runs on

Python ❨ 2.7,3.4,3.5 ❩

PyPy ❨ 5.4,5.5 ❩

This is the last version to support Python 2.7and and from the next version (Celery 5.x) Python 3.5or newer is required.

If you're running an older version of Python, you need to be running an older version of Celery:

Python 2.6: Celery series 3.1 or earlier.

Python 2.5: Celery series 3.0 or earlier.

Python 2.4 was Celery series 2.2 or earlier.

Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.

Second, use the scene

Asynchronous tasks: submit time-consuming operation tasks to Celery for asynchronous execution, such as sending SMS / email, message push, audio and video processing, etc.

Scheduled task: to perform something regularly, such as daily statistics.

3. Installation and configuration of Celery pip install celery

Message Middleware: RabbitMQ/Redis

App=Celery ('Task name', backend='xxx',broker='xxx')

4. Celery executes asynchronous tasks

Create a project celerytest

Create a py file: celery_app_task.py

Import celeryimport time# broker='redis://127.0.0.1:6379/2' does not add password backend='redis://:123456@127.0.0.1:6379/1'broker='redis://:123456@127.0.0.1:6379/2'cel=celery.Celery ('test',backend=backend,broker=broker) @ cel.taskdef add (xmemy): return xonomy

Create py file: add_task.py, add task

From celery_app_task import addresult = add.delay (4 celery_app_task 5) print (result.id) create the py file: run.py, execute the task, or use the command to execute: celery worker-A celery_app_task-l info

Note: under windows: celery worker-A celery_app_task-l info-P eventlet

From celery_app_task import celif _ _ name__ = ='_ main__': cel.worker_main () # cel.worker_main (argv= ['--loglevel=info')

Create a py file: result.py to view the results of task execution

From celery.result import AsyncResultfrom celery_app_task import celasync = AsyncResult (id= "e919d97d-2938-4d0f-9265-fd8237dc2aa3" App=cel) if async.successful (): result = async.get () print (result) # result.forget () # delete the result elif async.failed (): print ('execution failed') elif async.status = 'PENDING': print (' task waiting') elif async.status = 'RETRY': print (' task is retrying after exception') elif async.status = = 'STARTED': print ( 'The task has started to be executed') execute add_task.py Add a task and get the task ID

Execute run.py, or execute the command: celery worker-A celery_app_task-l info

Execute result.py, check the status of the task and get the results

Multitasking structure pro_cel ├── celery_task# celery related folder │ ├── celery.py # celery connection and configuration related files, must be called │ └── tasks1.py # all task functions │ └── tasks2.py # all task functions ├── check_result.py # check results └── send_task.py # trigger task

Celery.py

From celery import Celerycel = Celery ('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # contains the following two task files, go to the corresponding py file to find the task Classify multiple tasks include= ['celery_task.tasks1',' celery_task.tasks2']) # time zone cel.conf.timezone = 'Asia/Shanghai'# whether to use UTCcel.conf.enable_utc = False

Tasks1.py

Import timefrom celery_task.celery import cel@cel.taskdef test_celery (res): time.sleep (5) return "test_celery task result:% s"% res

Tasks2.py

Import timefrom celery_task.celery import cel@cel.taskdef test_celery2 (res): time.sleep (5) return "test_celery2 task result:% s"% res

Check_result.py

From celery.result import AsyncResultfrom celery_task.celery import celasync = AsyncResult (id= "08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) if async.successful (): result = async.get () print (result) # result.forget () # delete the result, execution is complete, the result will not be automatically deleted # async.revoke (terminate=True) # no matter when it is now All must be terminated # async.revoke (terminate=False) # if the task hasn't started yet, it can be terminated. Elif async.failed (): print ('execution failed') elif async.status = = 'PENDING': print (' task waiting') elif async.status = 'RETRY': print (' task is retrying after exception') elif async.status = = 'STARTED': print (' task has started to be executed')

Send_task.py

From celery_task.tasks1 import test_celeryfrom celery_task.tasks2 import test_celery2# immediately tells celery to execute the test_celery task, passing a parameter result = test_celery.delay ('execution of the first') print (result.id) result = test_celery2.delay ('execution of the second') print (result.id)

Add a task (execute send_task.py), open work:celery worker-A celery_task-l info-P eventlet, and check the result of task execution (execute check_result.py)

5. Celery performs scheduled tasks

Set the time for celery to perform a task add_task.py

From celery_app_task import addfrom datetime import datetime# mode one # v1 = datetime (2019, 2, 13, 18, 19, 56) # print (v1) # v2 = datetime.utcfromtimestamp (v1.timestamp ()) # print (v2) # result = add.apply_async (args= [1,3] Eta=v2) # print (result.id) # Mode 2 ctime = datetime.now () # defaults to utc time utc_ctime = datetime.utcfromtimestamp (ctime.timestamp ()) from datetime import timedeltatime_delay = timedelta (seconds=10) task_time = utc_ctime + time_delay# use apply_async and set time result = add.apply_async (args= [4,3], eta=task_time) print (result.id) similar to contab's timed task multitask structure celery.py is modified as follows

From datetime import timedeltafrom celery import Celeryfrom celery.schedules import crontabcel = Celery ('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include= [' celery_task.tasks1', 'celery_task.tasks2' ]) cel.conf.timezone = 'Asia/Shanghai'cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# name at will' add-every-10-seconds': {# execute the test_celery function 'task':' celery_task.tasks1.test_celery', under tasks1 # execute # 'schedule': 1.0 every 2 seconds # 'schedule': crontab (minute= "* / 1"),' schedule': timedelta (seconds=2), # pass parameters' args': ('test',)}, #' add-every-12-seconds': {# 'task':' celery_task.tasks1.test_celery', # April 11 of each year 08:42 execute # 'schedule': crontab (minute=42, hour=8, day_of_month=11, month_of_year=4), #' schedule': crontab (minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) #},} start a beat:celery beat-A celery_task-l info

Start work execution: celery worker-A celery_task-l info-P eventlet

6. Use celery in django

Create a celeryconfig.py under the project directory

Import djcelerydjcelery.setup_loader () CELERY_IMPORTS= ('app01.tasks',) # there are some situations that can prevent deadlock CELERYD _ FORCE_EXECV=True# setting the number of concurrent worker CELERYD_CONCURRENCY=4# allows retry CELERY_ACKS_LATE=True# to execute a maximum of 100tasks per worker is destroyed, can prevent memory leaks CELERYD_MAX_TASKS_PER_CHILD=100# timeout CELERYD_TASK_TIME_LIMIT=12*30

Create a tasks.py under the app01 directory

From celery import task@taskdef add ('a'): with open ('a. Textbook,'a textbook, encoding='utf-8') as f: f.write ('a') print (axib)

View function views.py

From django.shortcuts import render,HttpResponsefrom app01.tasks import addfrom datetime import datetimedef test (request): # result=add.delay (2Magne3) ctime = datetime.now () # default utc time utc_ctime = datetime.utcfromtimestamp (ctime.timestamp ()) from datetime import timedelta time_delay = timedelta (seconds=5) task_time = utc_ctime + time_delay result= add.apply_async (args= [4,3], eta=task_time) print (result.id) return HttpResponse ('ok')

Settings.py

INSTALLED_APPS = [... 'djcelery',' app01']. From djagocele import celeryconfigBROKER_BACKEND='redis'BOOKER_URL='redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2' 's article on "how to use celery" ends here. I hope the above content can be of some help to you, so that you can learn more knowledge, if you think the article is good. Please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report