In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article will explain in detail how to use celery for you. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.
one。 What is CleleryCelery is a simple, flexible and reliable distributed system that handles a large number of messages
An asynchronous task queue focused on real-time processing
It also supports task scheduling.
Celery architecture
The architecture of Celery consists of three parts, message middleware (message broker), task execution unit (worker) and task execution result storage (task result store).
Message middleware Celery does not provide message service itself, but it can be easily integrated with the message middleware provided by the third party. Including, RabbitMQ, Redis and so on.
Task execution unit Worker is a task execution unit provided by Celery. Worker runs concurrently in distributed system nodes.
Task result storage Task result store is used to store the results of tasks performed by Worker. Celery supports storing the results of tasks in different ways, including AMQP, redis, etc.
Version support
Celery version 4.0 runs on
Python ❨ 2.7,3.4,3.5 ❩
PyPy ❨ 5.4,5.5 ❩
This is the last version to support Python 2.7and and from the next version (Celery 5.x) Python 3.5or newer is required.
If you're running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.
Second, use the scene
Asynchronous tasks: submit time-consuming operation tasks to Celery for asynchronous execution, such as sending SMS / email, message push, audio and video processing, etc.
Scheduled task: to perform something regularly, such as daily statistics.
3. Installation and configuration of Celery pip install celery
Message Middleware: RabbitMQ/Redis
App=Celery ('Task name', backend='xxx',broker='xxx')
4. Celery executes asynchronous tasks
Create a project celerytest
Create a py file: celery_app_task.py
Import celeryimport time# broker='redis://127.0.0.1:6379/2' does not add password backend='redis://:123456@127.0.0.1:6379/1'broker='redis://:123456@127.0.0.1:6379/2'cel=celery.Celery ('test',backend=backend,broker=broker) @ cel.taskdef add (xmemy): return xonomy
Create py file: add_task.py, add task
From celery_app_task import addresult = add.delay (4 celery_app_task 5) print (result.id) create the py file: run.py, execute the task, or use the command to execute: celery worker-A celery_app_task-l info
Note: under windows: celery worker-A celery_app_task-l info-P eventlet
From celery_app_task import celif _ _ name__ = ='_ main__': cel.worker_main () # cel.worker_main (argv= ['--loglevel=info')
Create a py file: result.py to view the results of task execution
From celery.result import AsyncResultfrom celery_app_task import celasync = AsyncResult (id= "e919d97d-2938-4d0f-9265-fd8237dc2aa3" App=cel) if async.successful (): result = async.get () print (result) # result.forget () # delete the result elif async.failed (): print ('execution failed') elif async.status = 'PENDING': print (' task waiting') elif async.status = 'RETRY': print (' task is retrying after exception') elif async.status = = 'STARTED': print ( 'The task has started to be executed') execute add_task.py Add a task and get the task ID
Execute run.py, or execute the command: celery worker-A celery_app_task-l info
Execute result.py, check the status of the task and get the results
Multitasking structure pro_cel ├── celery_task# celery related folder │ ├── celery.py # celery connection and configuration related files, must be called │ └── tasks1.py # all task functions │ └── tasks2.py # all task functions ├── check_result.py # check results └── send_task.py # trigger task
Celery.py
From celery import Celerycel = Celery ('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # contains the following two task files, go to the corresponding py file to find the task Classify multiple tasks include= ['celery_task.tasks1',' celery_task.tasks2']) # time zone cel.conf.timezone = 'Asia/Shanghai'# whether to use UTCcel.conf.enable_utc = False
Tasks1.py
Import timefrom celery_task.celery import cel@cel.taskdef test_celery (res): time.sleep (5) return "test_celery task result:% s"% res
Tasks2.py
Import timefrom celery_task.celery import cel@cel.taskdef test_celery2 (res): time.sleep (5) return "test_celery2 task result:% s"% res
Check_result.py
From celery.result import AsyncResultfrom celery_task.celery import celasync = AsyncResult (id= "08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) if async.successful (): result = async.get () print (result) # result.forget () # delete the result, execution is complete, the result will not be automatically deleted # async.revoke (terminate=True) # no matter when it is now All must be terminated # async.revoke (terminate=False) # if the task hasn't started yet, it can be terminated. Elif async.failed (): print ('execution failed') elif async.status = = 'PENDING': print (' task waiting') elif async.status = 'RETRY': print (' task is retrying after exception') elif async.status = = 'STARTED': print (' task has started to be executed')
Send_task.py
From celery_task.tasks1 import test_celeryfrom celery_task.tasks2 import test_celery2# immediately tells celery to execute the test_celery task, passing a parameter result = test_celery.delay ('execution of the first') print (result.id) result = test_celery2.delay ('execution of the second') print (result.id)
Add a task (execute send_task.py), open work:celery worker-A celery_task-l info-P eventlet, and check the result of task execution (execute check_result.py)
5. Celery performs scheduled tasks
Set the time for celery to perform a task add_task.py
From celery_app_task import addfrom datetime import datetime# mode one # v1 = datetime (2019, 2, 13, 18, 19, 56) # print (v1) # v2 = datetime.utcfromtimestamp (v1.timestamp ()) # print (v2) # result = add.apply_async (args= [1,3] Eta=v2) # print (result.id) # Mode 2 ctime = datetime.now () # defaults to utc time utc_ctime = datetime.utcfromtimestamp (ctime.timestamp ()) from datetime import timedeltatime_delay = timedelta (seconds=10) task_time = utc_ctime + time_delay# use apply_async and set time result = add.apply_async (args= [4,3], eta=task_time) print (result.id) similar to contab's timed task multitask structure celery.py is modified as follows
From datetime import timedeltafrom celery import Celeryfrom celery.schedules import crontabcel = Celery ('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include= [' celery_task.tasks1', 'celery_task.tasks2' ]) cel.conf.timezone = 'Asia/Shanghai'cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# name at will' add-every-10-seconds': {# execute the test_celery function 'task':' celery_task.tasks1.test_celery', under tasks1 # execute # 'schedule': 1.0 every 2 seconds # 'schedule': crontab (minute= "* / 1"),' schedule': timedelta (seconds=2), # pass parameters' args': ('test',)}, #' add-every-12-seconds': {# 'task':' celery_task.tasks1.test_celery', # April 11 of each year 08:42 execute # 'schedule': crontab (minute=42, hour=8, day_of_month=11, month_of_year=4), #' schedule': crontab (minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) #},} start a beat:celery beat-A celery_task-l info
Start work execution: celery worker-A celery_task-l info-P eventlet
6. Use celery in django
Create a celeryconfig.py under the project directory
Import djcelerydjcelery.setup_loader () CELERY_IMPORTS= ('app01.tasks',) # there are some situations that can prevent deadlock CELERYD _ FORCE_EXECV=True# setting the number of concurrent worker CELERYD_CONCURRENCY=4# allows retry CELERY_ACKS_LATE=True# to execute a maximum of 100tasks per worker is destroyed, can prevent memory leaks CELERYD_MAX_TASKS_PER_CHILD=100# timeout CELERYD_TASK_TIME_LIMIT=12*30
Create a tasks.py under the app01 directory
From celery import task@taskdef add ('a'): with open ('a. Textbook,'a textbook, encoding='utf-8') as f: f.write ('a') print (axib)
View function views.py
From django.shortcuts import render,HttpResponsefrom app01.tasks import addfrom datetime import datetimedef test (request): # result=add.delay (2Magne3) ctime = datetime.now () # default utc time utc_ctime = datetime.utcfromtimestamp (ctime.timestamp ()) from datetime import timedelta time_delay = timedelta (seconds=5) task_time = utc_ctime + time_delay result= add.apply_async (args= [4,3], eta=task_time) print (result.id) return HttpResponse ('ok')
Settings.py
INSTALLED_APPS = [... 'djcelery',' app01']. From djagocele import celeryconfigBROKER_BACKEND='redis'BOOKER_URL='redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2' 's article on "how to use celery" ends here. I hope the above content can be of some help to you, so that you can learn more knowledge, if you think the article is good. Please share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.