In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article shows you how to install and use Celery, the content is concise and easy to understand, it will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.
Introduction
Celery is Distributed Task Queue, a distributed task queue. Distribution determines that there can be multiple worker, and the queue indicates that it is an asynchronous operation.
Celery core module
Celery has five core roles
Task
It's a task. There are asynchronous tasks and timing tasks.
Broker
The middleman receives a message from the producer, that is, Task, and stores the task in the queue. The consumer of the task is Worker. Celery itself does not provide queue service, so it is recommended to use Redis or RabbitMQ to implement queue service.
Worker
The unit that executes the task, which monitors the message queue in real time and, if there is a task, gets the task and executes it.
Beat
The scheduled task scheduler sends the task to the Broler according to the configuration timing.
Backend
Used to store the execution results of the task.
The relationship between the various roles can be understood in the following picture:
Installation
Celery4.x no longer supports the Windows platform. 3.1.26 is the last up-to-date version of 3.x, and 3.1.25 is installed below.
Pip install celerypip install celery==3.1.25
The only recommended Broker is RabbitMQ and redis. RabbitMQ does not need to install additional modules as long as the service is ready.
If you want to use redis, also prepare the redis service and install the redis module:
Pip install redis
The above installation can also install redis together with the following command:
Pip install-U 'celery [redis]' verification
Use the command celery-- version to view the version and verify by the way:
> celery-- version'celery' is not an internal or external command, nor is it a runnable program or batch file.
The error is reported here because the celery is not added to the environment variable, so the program cannot be found. But I don't want to add it either, so I can type all the paths:
> G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Scripts\ celery-- basic operation of version3.1.25 (Cipater)
Run a simple task here, and finally get the execution result of the task.
Create a task
Click the following to write a piece of code:
# task1.pyfrom celery import Celery# create Celery instance app = Celery ('tasks', broker='redis://192.168.246.11:6379/0',) # create task @ app.taskdef add (x, y): print ("calculate the sum of 2 values:% s% s"% (x, y)) return xroomy
If you use RabbitMQ, change broker to this broker='amqp://192.168.3.108'.
Start Worker
Start Celery Worker to start listening and perform tasks:
$celery-A task1 worker-- loglevel=info $celery-A task1 worker-- l debug # or you can start like this
The parameter-An is followed by a Celery instance. The name of the instance can be omitted and it is all task1.app. You need to change the directory to the same directory as the task1 file to execute the command, or see if there is any parameter to add the file directory to the environment variable of python. Because the parameters after-An are imported as modules of python. So I ran the Worker like this:
G:\ > G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Scripts\ celery-A Steed.Documents.PycharmProjects.Celery.task1 worker-- loglevel=info [2018-09-28 17 PycharmProjects 5515: WARNING/MainProcess] g:\ steed\ documents\ pycharmprojects\ venv\ celery\ lib\ celery\ apps\ worker.py:161: CDeprecationWarning:Starting from version 3.2 Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackersthe ability to execute any command. It's important to secureyour broker from unauthorized access when using pickle, so we thinkthat enabling pickle should require a deliberate action and not bethe default choice.If you depend on pickle then you should set a setting to disable thiswarning and to be sure that everything will continue workingwhen you upgrade to Celery 3.2 CELERY_ACCEPT_CONTENT: CELERY_ACCEPT_CONTENT = ['pickle',' json', 'msgpack',' yaml'] You must only enable the serializers that you will actually use. Warnings.warn (CDeprecationWarning (W_PICKLE_DEPRECATED))-celery@IDX-xujf v3.1.25 (Cipater)-*-*-Windows-10-10.0.17134Mercy SP0muri-*-* *-* *-[config]-* * -. > app: tasks:0x1fb5056fda0- * * -. > transport: redis://192.168.246.11:6379/0- * * -. > results: disabled://- * * -. > concurrency: 4 (prefork)-- *- -*-[queues]. > celery exchange=celery (direct) key=celery [tasks]. Steed.Documents.PycharmProjects.Celery.task1.add [2018-09-28 17 Connected to redis://192.168.246.11:6379/0 [2018-09-28 17 Connected to redis://192.168.246.11:6379/0 55 Connected to redis://192.168.246.11:6379/0 922: INFO/MainProcess] mingle: searching for neighbors [2018-09-28 17 Connected to redis://192.168.246.11:6379/0: INFO/MainProcess] mingle: all alone [2018-09-28 171415: WARNING/MainProcess] celery@IDX-xujf ready. Invoke task
To send a task to Worker, you need to call the delay () method, which operates on IDLE:
> import sys > dir = r "G:\ Steed\ Documents\ PycharmProjects\ Celery" > sys.path.append (dir) # my task file is not in the environment variable. IDLE cannot find it > from task1 import add > add.delay (1,2) >
Worker displays the following information
Received task: task1.add [4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] [2018-09-29 11 [2018-09-29 11] task1.add [4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] [2018-09-29 11] calculate the sum of two values: 1 2 [2018-09-29 11 task1.add 103Magnology 109: INFO/MainProcess] Task task1.add [4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] succeeded in 0s: 3
The above is just a call to send a task, and the result is not available. There is no return value received above, this time save the return value to:
> t = add.delay (3,4) > > type (t) # check the type of returned value > t.get () # this sentence will cause an error Traceback (most recent call last): File ", line 1, in t.get () File" G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Lib\ site-packages\ celery\ result.py ", line 169, in get no_ack=no_ack File "G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Lib\ site-packages\ celery\ backends\ base.py", line 616, in _ is_disabled'No result backend configured. 'NotImplementedError: No result backend configured. Please see the documentation for more information.
Here is the time of instantiation, where the backend is not defined, which is where the results of the task are saved.
Get the returned result
Modify the code of the original task, add the backend parameter when instantiated, and specify where to save the results of the task. The results are also stored in the same redis here:
From celery import Celeryapp = Celery ('tasks', broker='redis://192.168.246.11', backend='redis://192.168.246.11', # omits the port number this time) @ app.taskdef add (x, y): print ("calculate the sum of 2 values:% s% s"% (x, y)) return xroomy
Then you need to restart Worker,IDLE and restart it. Now you can get the returned result of the task:
> t = add.delay (1,1) > t.get () 2 >
If it is RabbitMQ, the initial setting of app is written as follows:
App = Celery ('tasks', broker='amqp://192.168.3.108', backend='rpc://192.168.3.108', # the new version of rpc will initially replace amqp, using RabbitMQ # backend='amqp://192.168.3.108', # if it is an old version without rpc, then only amqp) other operations
Get entry blocking
The above task is performed too fast, prepare a task that will take some time to execute:
Import time@app.taskdef upper (v): for i in range (10): time.sleep (1) print (I) return v.upper ()
Calling a task with get will block until the task returns a result, so there will be no asynchronous effect:
> t = upper.delay ("abc") > t.get () 'ABC'
Ready gets whether the task is completed and does not block.
The ready () method returns whether the task has been completed. Wait until True is returned and then go to get. You can get the result immediately:
> t = upper.delay ("abcd") > t.ready () False > > t.ready () False > t.ready () False > t.ready () True > t.get () 'ABCD' > >
Get sets the timeout
You can also set a timeout for get. If it times out, an exception will be thrown:
> t = upper.delay ("abcde") > t.get (timeout=11) 'ABCDE' > t = upper.delay ("abcde") > t.get (timeout=1) Traceback (most recent call last): File ", line 1, in t.get (timeout=1) File" G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ lib\ site-packages\ celery\ result.py ", line 169,in get no_ack=no_ack File "G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ lib\ site-packages\ celery\ backends\ base.py", line 238, in wait_for raise TimeoutError ('The operation timed out.') celery.exceptions.TimeoutError: The operation timed out. >
Task error report
If a task executes an error, such as performing this task:
> t = upper.delay 123 >
Then the error will be displayed on the Worker side:
Task task1.upper [11820ee6-6936-4680-93c2-462487ec927e] raised unexpected: AttributeError ("'int' object has no attribute' upper'",) Traceback (most recent call last): File "g:\ steed\ documents\ pycharmprojects\ venv\ lib\ site-packages\ celery\ app\ trace.py", line 240, in trace_task R = retval = fun (* args) * * kwargs) File "g:\ steed\ documents\ pycharmprojects\ venv\ celery\ lib\ site-packages\ celery\ app\ trace.py", line 438, in _ _ protected_call__ return self.run (* args, * * kwargs) File "G:\ Steed\ Documents\ PycharmProjects\ Celery\ task1.py", line 25, in upper return v.upper () AttributeError: 'int' object has no attribute' upper'
Then when you get the result, the error is thrown as an exception, which is very unfriendly:
> > t.get () Traceback (most recent call last): File ", line 1, in t.get () File" G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ lib\ site-packages\ celery\ result.py ", line 175, in get raise meta ['result'] AttributeError:' int' object has no attribute 'upper' >
The get setting only gets error results and does not trigger exceptions.
> > t.get (propagate=False) AttributeError ("'int' object has no attribute' upper'",) >
Error messages are stored in traceback
> t.traceback'Traceback (most recent call last):\ nFile "g:\\ steed\\ documents\\ pycharmprojects\\ venv\\ celery\\ lib\\ site-packages\\ celery\\ app\\ trace.py", line 240, in trace_task\ nR = retval = fun (* args, * * kwargs)\ n File "g:\\ steed\\ documents\\ pycharmprojects\ venv\\ celery\\ lib\\ site-packages\ celery\\ app\\ trace.py", line 438 In _ _ protected_call__\ nreturn self.run (* args, * * kwargs)\ nFile "G:\\ Steed\\ Documents\\ PycharmProjects\\ Celery\\ task1.py", line 25, in upper\ n return v.upper ()\ nAttributeError:\ 'int\' object has no attribute\ 'upper\' > > Summary
Start Celery Worker to start monitoring and perform tasks
$celery-A tasks worker-loglevel=info
Invoke task
> from tasks import add > t = add.delay (4,4)
Get the results synchronously
> t.get () > t.get (timeout=1)
Check whether the task is completed
> > t.ready ()
If there is an error, get the error result without triggering the exception
> t.get (propagate=False) > t.traceback # print exception details using Celery in the project
You can configure celery as an application, assuming the name of the application is CeleryPro, and the directory format is as follows:
CeleryPro ├─ _ _ init.py ├─ celery.py ├─ tasks.py
The name of the connection file here must be celery.py, and other names are optional.
Celery file
The file name must be celery.py:
From _ _ future__ import absolute_import, unicode_literalsfrom celery import Celeryapp = Celery ('CeleryPro', broker='redis://192.168.246.11', backend='redis://192.168.246.11', include= [' CeleryPro.tasks']) # Optional configuration, see the application user guide.app.conf.update (result_expires=3600,) if _ _ name__ ='_ _ main__': app.start ()
The first sentence from _ _ future__ import absolute_import, unicode_literals, the next unicode_literals doesn't know what it is. However, the previous absolute_import is absolutely introduced. Because the file name of this file is celery, the default following form celery is to import this file, but what we really need is to import the celery module, so we use the absolute import module. If you want to import this file, you can write from .celery like this, add a dot, and you will use it in the following tasks
Tasks file
This file has one more point in the first two lines, so here you want to import the celery.py file above. After that, all you have to do is write various tasks and add decorators:
From _ _ future__ import absolute_import, unicode_literalsfrom .celery import appimport time@app.taskdef add (x, y): print ("calculate the sum of 2 values:% s% s"% (x, y)) return x+y@app.taskdef upper (v): for i in range (10): time.sleep (1) print (I) return v.upper () start worker
At startup, the-A parameter is followed by the application name CeleryPro. You also need to start cd on your CeleryPro parent directory, otherwise you won't find it:
> G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Scripts\ celery-l info [2018-09-29 15VOV 06purge 20Magne818: WARNING/MainProcess] g:\ steed\ documents\ pycharmprojects\ venv\ celery\ lib\ celery\ apps\ worker.py:161: CDeprecationWarning:Starting from version 3.2Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackersthe ability to execute any command. It's important to secureyour broker from unauthorized access when using pickle, so we thinkthat enabling pickle should require a deliberate action and not bethe default choice.If you depend on pickle then you should set a setting to disable thiswarning and to be sure that everything will continue workingwhen you upgrade to Celery 3.2 CELERY_ACCEPT_CONTENT: CELERY_ACCEPT_CONTENT = ['pickle',' json', 'msgpack',' yaml'] You must only enable the serializers that you will actually use. Warnings.warn (CDeprecationWarning (W_PICKLE_DEPRECATED))-celery@IDX-xujf v3.1.25 (Cipater)-*-*-Windows-10-10.0.17134Mercy SP0muri-*-* *-* *-[config]-* * -. > app: CeleryPro:0x21deadaf470- * * -. > transport: redis://192.168.246.11:6379//- * * -. > results: redis://192.168.246.11/- * * -. > concurrency: 4 (prefork)-- * *-*-[queues]. > celery exchange=celery (direct) key=celery [tasks]. CeleryPro.tasks.add. CeleryPro.tasks.upper [2018-09-29 15 Connected to redis://192.168.246.11:6379// [2018-09-29 15 INFO/MainProcess] mingle: searching for neighbors [2018-09-29 15 mingle: INFO/MainProcess] mingle: all alone [2018-09-29 15 INFO/MainProcess] celery@IDX-xujf ready.
Various starting positions
Note that CeleryPro is used here:
Celery-A CeleryPro worker-loglevel=info # foreground startup is not recommended celery-A CeleryPro worker-l info # foreground startup abbreviation celery multi start W1-A CeleryPro-l info # background startup call task is recommended
The calling task is also called in the parent directory of CeleryPro, and all kinds of uses are the same.
All operations are performed under the parent directory of CeleryPro, that is, just make sure that the parent directory of CeleryPro is in the environment variable. Or use sys.path.append () to add to the environment variable.
This means that celery is packaged as an application in your project, and the contents of the application are placed in the CeleryPro folder. CeleryPro is used as a module in your project. The home directory of your project must be added to the environment variable at the start of the project, so it should be very convenient to use it in the project after it is packaged.
Start multiple Worker in the background
Start the command:
Celery-A project name worker-loglevel=info: foreground launch command
Celery multi start W1-A project name-l info: background launch command
Celery multi restart W1-A project name-l info: background restart command
Celery multi stop W1-A project name-l info: background stop command
The difference between foreground and background: the background is started through mult.
W1 is the name of worker. Multiple worker can be launched in the background, and each worker has a name.
Even if all the worker are already done, the user still starts the task, and all the tasks are retained until the worker executes and returns the result.
If the worker started in the foreground is disconnected, the task of the worker disappears; if the worker started in the background is disconnected, the task in the background is still there. I don't quite understand the meaning of this sentence.
Check how many worker of Celery are currently available
It seems that it can only be viewed through ps. Here are three backend Worker,ps to take a look, then stop a Worker, and then use ps to take a look:
[root@Python3 ~] # celery multi start W1-A CeleryPro-l infocelery multi v4.2.1 (windowlicker) > Starting nodes... > w1@Python3: OK [root@Python3 ~] # celery multi start w2-A CeleryPro-l infocelery multi v4.2.1 (windowlicker) > Starting nodes... > w2@Python3: OK [root@Python3 ~] # celery multi start w3-A CeleryPro-l infocelery multi v4.2.1 (windowlicker) > Starting nodes... > w3@Python3: OK [root@Python3 ~] # ps-ef | grep celeryroot 1346 10 20:49? 00:00:01 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w1%I.log-- pidfile=w1.pid-- hostname=w1@Python3root 1350 1346 0 20:49? 00:00:00 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w1%I.log -- pidfile=w1.pid-- hostname=w1@Python3root 1360 10 20:49? 00:00:01 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w2%I.log-- pidfile=w2.pid-- hostname=w2@Python3root 1364 1360 0 20:49? 00:00:00 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w2%I.log-- pidfile=w2.pid-- hostname=w2@ Python3root 1374 10 20:49? 00:00:01 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w3%I.log-- pidfile=w3.pid-- hostname=w3@Python3root 1378 1374 0 20:49? 00:00:00 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w3%I.log-- pidfile=w3.pid-- hostname=w3@Python3root 1391 1251 0 20 55 pts/0 00:00:00 grep-- color=auto celery [root@Python3 ~] # celery multi stop w1celery multi v4.2.1 (windowlicker) > Stopping nodes... > w1@Python3: TERM-> 1346 [root@Python3 ~] # ps-ef | grep celeryroot 1360 10 20:49? 00:00:01 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w2%I.log-- pidfile=w2.pid-- hostname=w2@Python3root 1364 1360 0 20:49? 00:00:00 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w2% I.log-pidfile=w2.pid-hostname=w2@Python3root 1374 10 20:49? 00:00:01 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w3%I.log-- pidfile=w3.pid-- hostname=w3@Python3root 1378 1374 0 20:49? 00:00:00 / usr/bin/python3.6-m celery worker-A CeleryPro-l info-- logfile=w3%I.log-- pidfile=w3.pid- -hostname=w3@Python3root 1398 1251 0 20:57 pts/0 00:00:00 grep-- color=auto celery [root@Python3 ~] # Windows platform does not support
The error message is as follows:
File "g:\ steed\ documents\ pycharmprojects\ venv\ celery\ lib\ site-packages\ celery\ platforms.py", line 429, in detached raise RuntimeError ('This platform does not support detach.') RuntimeError: This platform does not support detach. > w1@IDX-xujf: * Child terminated with errorcode 1FAILED
Take a look at line 429 of code based on the error message:
If not resource: raise RuntimeError ('This platform does not support detach.')
Resource is judged here, and then an exception is thrown directly. What exactly is resource? you can search for the variable name (resource) in this file.
# get the value of this resource at the beginning resource = try_import ('resource') # the try_import method above, and in another file def try_import (module, default=None): "Try to import and return module, or return None if the module does not exist." Try: return importlib.import_module (module) except ImportError: return default# below there is a method comment that indicates that resource is None and stands for Windowsdef get_fdmax (default=None): "Return the maximum number of open file descriptors on this system.: keyword default: Value returned if there's no file descriptor limit." Try: return os.sysconf ('SC_OPEN_MAX') except: pass if resource is None: # Windows return default fdmax = resource.getrlimit (resource.RLIMIT_NOFILE) [1] if fdmax = = resource.RLIM_INFINITY: return default return fdmax
What is done above is to try to import a module "resource". This module is for Unix only.
Scheduled task
There is still a big difference between version 3 scheduled tasks and version 4. There are more scheduled tasks in the other 4 versions.
Celery3
To continue using the previous two tasks, you only need to add some configuration (conf) for celery and set up a schedule for the task.
The parameters in app.conf are all uppercase. Here, they are case-sensitive and cannot be lowercase:
# CeleryPro/tasks.pyfrom _ _ future__ import absolute_import, unicode_literalsfrom .celery import appimport time@app.taskdef add (x, y): print ("calculate the sum of 2 values:% s% s"% (x, y)) return x+y@app.taskdef upper (v): for i in range (10): time.sleep (1) print (I) return v.upper () # CeleryPro/celery.pyfrom _ _ future__ import absolute_import Unicode_literalsfrom celery import Celeryfrom celery.schedules import crontabfrom datetime import timedeltaapp = Celery ('CeleryPro', broker='redis://192.168.246.11', backend='redis://192.168.246.11', include= [' CeleryPro.tasks']) app.conf.CELERYBEAT_SCHEDULE = {'add every 10 seconds': {' task': 'CeleryPro.tasks.add',' schedule': timedelta (seconds=10) # you can use the timedelta object # 'schedule': 10, and # also supports the number of seconds directly represented by' args': (1,2)}, 'upper every 2 minutes': {' task': 'CeleryPro.tasks.upper',' schedule': crontab (minute='*/2'), 'args': (' abc',),} } # app.conf.CELERY_TIMEZONE = 'UTC'app.conf.CELERY_TIMEZONE =' Asia/Shanghai'# Optional configuration, see the application user guide.app.conf.update (CELERY_TASK_RESULT_EXPIRES=3600,) if _ _ name__ = ='_ main__': app.start ()
Set the expiration of task result to `CELERY_TASK_RESULT_EXPIRES=3600'. The default setting is 1 day, which is cleared by a built-in periodic task that exceeds the time limit, according to the website.
A built-in periodic task will delete the results after this time (celery.task.backend_cleanup).
After the settings are completed, start Worker and start Beat to OK:
G:\ Steed\ Documents\ PycharmProjects\ Celery > G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Scripts\ celery.exe-A CeleryPro worker-l infoG:\ Steed\ Documents\ PycharmProjects\ Celery > G:\ Steed\ Documents\ PycharmProjects\ venv\ Celery\ Scripts\ celery.exe-A CeleryPro beat-l info
The parameters in version 3 are all uppercase, but in version 4, they are changed to lowercase, and many parameter names have changed. Here is the corresponding relationship between the new and old parameters:
Http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration
Celery4
The advantage of the new version is that scheduled tasks can be defined separately as ordinary tasks. Without the @ app.on_after_configure.connect decorator, version 3 does not have this decorator.
Write code
Create a separate py file to store scheduled tasks:
# CeleryPro/periodic4.pyfrom _ _ future__ import absolute_import, unicode_literalsfrom .celery import appfrom celery.schedules import crontab@app.on_after_configure.connectdef setup_periodic_tasks (sender, * * kwargs): # execute sender.add_periodic_task every 10 seconds (10.0, hello.s (), name='hello every 10') # give a name to the task # execute sender.add_periodic_task every 30 seconds (30, upper.s ('abcdefg')) Expires=10) # set task timeout 10 seconds # execution period is the same as Linux's scheduled task crontab setting: sender.add_periodic_task (crontab (hour='*', minute='*/2', day_of_week='*'), add.s (11,22),) @ app.taskdef hello (): print ('Hello World') @ app.taskdef upper (arg): return arg.upper () @ app.taskdef add (x Y): print ("calculate the sum of 2 values:% s% s"% (x, y)) return xroomy
There are a total of three plans.
The name parameter names the plan so that the task reports using the value of name, like this: hello every 10. Otherwise, the command that calls the function is displayed by default, like this: CeleryPro.periodic4.upper ('abcdefg').
The expires parameter sets the task timeout. If the timeout is not completed, it may be abandoned (no testing).
Modify the previous celery.py file and add the newly written task file to the include list. By the way, I'll use RabbitMQ to play here:
# CeleryPro/celery.pyfrom _ _ future__ import absolute_import, unicode_literalsfrom celery import Celeryapp = Celery ('CeleryPro', broker='amqp://192.168.3.108', backend='rpc', include= [' CeleryPro.tasks' 'CeleryPro.periodic4']) app.conf.timezone =' UTC' # scheduled tasks default to UTC time # app.conf.timezone = 'Asia/Shanghai' # can also be changed to Beijing time # Optional configuration, see the application user guide.app.conf.update (result_expires=3600,) if _ _ name__ = =' _ main__': app.start ()
Start worker
The startup method is the same as before:
[root@Python3] # celery-A CeleryPro worker-l info/usr/local/lib/python3.6/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this isabsolutely not efforts Please specify a different user using the-- uid option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid -celery@Python3 v4.2.1 (windowlicker)-*-- *-Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-01 1214414 35lb-- *-* * -[config]-* * -. > app: CeleryPro:0x7ffb0c8b2908- * * -. > transport: amqp://guest:**@192.168.3.108:5672//- * * -. > results: rpc://- * *-*-- -. > concurrency: 1 (prefork)-- * -. > task events: OFF (enable-E to monitor tasks in this worker)-*-- [queues]. > celery exchange=celery (direct) key=celery [tasks]. CeleryPro.periodic4.add. CeleryPro.periodic4.hello. CeleryPro.periodic4.upper. CeleryPro.tasks.add. CeleryPro.tasks.upper [2018-10-01 12 Connected to amqp://guest:**@192.168.3.108:5672// 46 INFO/MainProcess] mingle: searching for neighbors [2018-10-01 12 Connected to amqp://guest:**@192.168.3.108:5672// 46 INFO/MainProcess] mingle: all alone [2018-10-01 1246 mingle: INFO/MainProcess] celery@Python3 ready.
After startup, take a look at [tasks]. The newly added scheduled tasks are listed, and the previous tasks are all there.
Start Beat
Here-An is followed by full CeleryPro.periodic4, which is a little different from the parameters for starting Worker:
[root@Python3] # celery-A CeleryPro.periodic4 beat-l infocelery beat v4.2.1 (windowlicker) is starting.__ -... _-_ LocalTime-> 2018-10-01 12:45:04Configuration->. Broker-> amqp://guest:**@192.168.3.108:5672//. Loader-> celery.loaders.app.AppLoader. Scheduler-> celery.beat.PersistentScheduler. Db-> celerybeat-schedule. Logfile-> [stderr] @% INFO. Maxinterval-> 5.00 minutes (300s) [2018-10-01 12 INFO/MainProcess] beat: Starting... [2018-10-01 12 INFO/MainProcess] Scheduler: Sending due task hello every 10 (CeleryPro.periodic4.hello) [2018-10-01 12 Starting... ('abcdefg') (CeleryPro.periodic4.upper)
Immediately after startup, two tasks executed at regular intervals are sent to Worker for execution, and then continue to be sent at defined intervals.
Another task set with crontab will not be sent until the time match. It was 45 points at that time, and it will be implemented until 46 points.
The old version of the practice can also be used
As mentioned above, the new version mainly provides an extra decorator. Without the newly provided decorator, you can still write scheduled tasks in the configuration:
# CeleryPro/celery.pyfrom _ _ future__ import absolute_import, unicode_literalsfrom celery import Celeryapp = Celery ('CeleryPro', broker='amqp://192.168.3.108', backend='rpc', include= [' CeleryPro.tasks']) app.conf.beat_schedule = {'every 5 seconds': {' task': 'CeleryPro.tasks.upper',' schedule': 5 'args': (' xyz',)}} # Optional configuration, see the application user guide.app.conf.update (result_expires=3600,) if _ _ name__ = ='_ main__': app.start ()
This is set in the configuration to start a common task at a regular time. The CeleryPro.periodic4 in include is deleted here, and it doesn't matter if you keep it.
The task file tasks.py is the same as before, as follows:
# CeleryPro/tasks.pyfrom _ _ future__ import absolute_import, unicode_literalsfrom .celery import appimport time@app.taskdef add (x, y): print ("calculate the sum of 2 values:% s% s"% (x, y)) return x+y@app.taskdef upper (v): for i in range (10): time.sleep (1) print (I) return v.upper ()
Finally, start Worker and start Breat to try:
[root@Python3] # celery-A CeleryPro beat-l info
Here the-A parameter of Beat can also start the scheduled task here with CeleryPro. The effect of CeleryPro.tasks is the same. In addition, if you add periodic4.py to the include list and start it with the CeleryPro.periodic4 parameter, the scheduled task here will also start.
Crontab is also supported here, using the same method as before, replacing the value of the schedule parameter with the function that calls crontab.
Summary
The above two methods of scheduled tasks have their own application scenarios.
If you want to change the function that the task executes, you have to change the code and restart Worker.
What we are talking about here is to change the plan (including adding, canceling, and modifying the schedule cycle), but the function of task execution remains the same. With the @ app.on_after_configure.connect decorator, the plan is written in a function. It seems impossible to add new tasks dynamically. But the advantage is that the structure is relatively clear.
In the latter method, simply update the configuration information in the app.conf.beat_schedule dictionary and restart Beat to take effect.
An example of crontab
Here are some examples of crontab:
ExampleMeaningcrontab () Execute every minute.crontab (minute=0,hour= 0) Execute daily at midnight.crontab (minute=0,hour=' * / 3') Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.crontab (minute=0,hour='0,3,6,9,12,15,18,21') Same as previous.crontab (minute='*/15') Execute every 15 minutes.crontab (day_of_week='sunday') Execute every minute (! At Sundays.crontab (minute='',hour='', day_of_week='sun') Same as previous.crontab (minute='*/10',hour='3,17,22', day_of_week='thu,fri') Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.crontab (minute=0, hour='/2,/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pmcrontab (minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of "15", which is divisible by 5) .crontab (minute=0, hour='*/3,8-17') Execute every hour divisible by 3 And every hour during office hours (8am-5pm). Crontab (day_of_month='2') Execute on the second day of every month.crontab (day_of_month='2-30max 3') Execute on every even numbered day.crontab (day_of_month='1-7 Execute on 11th of May every year.crontab 15-21') Execute on the first and third weeks of the month.crontab (day_of_month='11',month_of_year='5') Execute on 11th of May every year.crontab (month_of_year='*/3') Execute on the first month of every quarter. Calendar (Solar schedules)
Version 4 also provides such a way to specify a plan
If you have a task that should be executed according to sunrise, sunset, dawn or dusk, you can use the solar schedule type:
If you have a task that is performed according to sunrise, sunset, dawn or dusk, you can use the calendar type:
All events are calculated based on UTC time, so they are not affected by the time zone setting. Example of the official website:
From celery.schedules import solarapp.conf.beat_schedule = {# Executes at sunset in Melbourne 'add-at-melbourne-sunset': {' task': 'tasks.add',' schedule': solar ('sunset',-37.81753, 144.96715),' args': (16, 16),},}
Here the solar function provides three parameters, event, latitude, and longitude. The signs used for latitude and longitude are shown in the following table:
SignArgumentMeaning+latitudeNorth-latitudeSouth+longitudeEast-longitudeWest
The supported event types are as follows:
EventMeaningdawn_astronomicalExecute at the moment after which the sky is no longer completely dark. This is when the sun is 18 degrees below the horizon.dawn_nauticalExecute when there's enough sunlight for the horizon and some objects to be distinguishable; formally, when the sun is 12 degrees below the horizon.dawn_civilExecute when there's enough light for objects to be distinguishable so that outdoor activities can commence Formally, when the Sun is 6 degrees below the horizon.sunriseExecute when the upper edge of the sun appears over the eastern horizon in the morning.solar_noonExecute when the sun is highest above the horizon on that day.sunsetExecute when the trailing edge of the sun disappears over the western horizon in the evening.dusk_civilExecute at the end of civil twilight, when objects are still distinguishable and some stars and planets are visible. Formally, when the sun is 6 degrees below the horizon.dusk_nauticalExecute when the sun is 12 degrees below the horizon. Objects are no longer distinguishable, and the horizon is no longer visible to the naked eye.dusk_astronomicalExecute at the moment after which the sky becomes completely dark; formally, when the sun is 18 degrees below the horizon. Best practices used in Django
If you use it in django, you can write the configuration of celery directly in the settings.py file of django. In addition, the task function is written in the tasks.py file and placed in each app directory. There can be a tasks.py under each app, and all tasks are shared.
Create a directory structure
To create a django project, the name of the project is UsingCeleryWithDjango,app and the name app01 is fine. After creating the project, create a CeleryPro directory under the project directory and a celery.py file under the directory. The directory structure is as follows:
UsingCeleryWithDjango │├─ manage.py │├─ app01 │ │ admin.py │ │ apps.py │ │ models.py │ │ tests.py │ │ views.py │ └ _ _ init__.py │├─ CeleryPro │ │ celery.py │ └ _ _ init__.py templates UsingCeleryWithDjango settings.py urls.py wsgi.py _ _ init__.py
The above just needs to pay attention to the structure and location of CeleryPro, and the rest is the default content after the creation of the django project.
The CeleryPro/celery.py file is used to create an instance of celery.
CeleryPro/__init__.py file, you need to make sure that celery is loaded when Django starts. The @ shared_task decorator in the celery module will be used later in app.
CeleryPro sample code
The specific sample files are in the official github, and there are some differences between version 3 and version 4.
The latest version: https://github.com/celery/celery/tree/master/examples/django
Version 3.1: https://github.com/celery/celery/tree/3.1/examples/django
You can also switch between different versions of the branch view in Github.
I'm going to do it with version 3.1.25 on Windows.
# UsingCeleryWithDjango/CeleryPro/__init__.pyfrom _ _ future__ import absolute_import, unicode_literals__author__ = '749B'# This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app__all__ = (' celery_app',) # UsingCeleryWithDjango/CeleryPro/celery.pyfrom _ _ future__ import absolute_importimport osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault (' DJANGO_SETTINGS_MODULE' 'UsingCeleryWithDjango.settings') from django.conf import settings # noqaapp = Celery (' CeleryPro') # Using a string here means the worker will not have to# pickle the object when using Windows.app.config_from_object ('django.conf:settings') # automatically discovers tasks# under all app but The new version of django's INSTALLED_APPS cannot be found in # app.autodiscover_tasks (lambda: settings.INSTALLED_APPS) # this is the official example. This is the INSTALLED_APPS part of setting.py, INSTALLED_APPS = ['django.contrib.admin',' django.contrib.auth', 'django.contrib.contenttypes',' django.contrib.sessions', 'django.contrib.messages',' django.contrib.staticfiles'. The way to write 'app01.apps.App01Config', #' automatically discovers that if you can't find tasks # 'app01', #, you can find it automatically]''# or you don't want to change settings.INSTALLED_APPS. Then write the app list in a list as a parameter app.autodiscover_tasks (['app01']) # here I'll just @ app.task (bind=True) def debug_task (self): print (' Request: {0roomr} '.format (self.request)
There is a hole here. I wrote a note, which should be clearer.
Task file tasks
Create the tasks.py file (the same level directory as the models.py file) under app, and create the task.
-app01/-app01/tasks.py-app01/models.py
The functions created in the tasks.py file use the @ shared_task decorator. These tasks are shared by all app.
# UsingCeleryWithDjango/app01/tasks.py# Create your tasks herefrom _ _ future__ import absolute_import, unicode_literalsfrom celery import shared_task@shared_taskdef add (x, y): return x + y@shared_taskdef mul (x, y): return x * y@shared_taskdef xsum (numbers): return sum (numbers) set settings.py
This is the configuration file for django, but now the configuration of celery can also be written here:
# UsingCeleryWithDjango/UsingCeleryWithDjango/settings.py# is the configuration of django, so # Celery settingsBROKER_URL = 'redis://192.168.246.11/0'CELERY_RESULT_BACKEND =' redis://192.168.246.11/0' is omitted
Here to do the most basic settings, use redis to receive the task and save the task results, the other settings are set by default.
Start Worker
The startup command is the same, and the key is the parameter after-A:
G:\ Steed\ Documents\ PycharmProjects\ UsingCeleryWithDjango > G:\ Steed\ Documents\ PycharmProjects\ venv\ UsingCeleryWithDjango\ Scripts\ celery-A CeleryPro worker-l info [2018-10-02 20 PycharmProjects 55 A CeleryPro worker-l info] g:\ steed\ documents\ pycharmprojects\ venv\ usingcelerywithdjango\ lib\ site-packages\ celery\ apps\ worker.py:161: CDeprecationWarning:Starting from version 3.2 Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackersthe ability to execute any command. It's important to secureyour broker from unauthorized access when using pickle, so we thinkthat enabling pickle should require a deliberate action and not bethe default choice.If you depend on pickle then you should set a setting to disable thiswarning and to be sure that everything will continue workingwhen you upgrade to Celery 3.2 CELERY_ACCEPT_CONTENT: CELERY_ACCEPT_CONTENT = ['pickle',' json', 'msgpack',' yaml'] You must only enable the serializers that you will actually use. Warnings.warn (CDeprecationWarning (W_PICKLE_DEPRECATED))-celery@IDX-xujf v3.1.25 (Cipater)-*-*-Windows-10-10.0.17134Mercy SP0muri-*-* *-* *-[config]-* * -. > app: CeleryPro:0x27f5e4dbe80- * * -. > transport: redis://192.168.246.11:6379/0- * * -. > results: redis://192.168.246.11/0- * * -. > concurrency: 4 (prefork)-- *-*-[queues]. > celery exchange=celery (direct) key=celery [tasks]. CeleryPro.celery.debug_task. App01.tasks.add. App01.tasks.mul. App01.tasks.xsum [2018-10-02 20 INFO/MainProcess] mingle: searching for neighbors [2018-10-02 20 INFO/MainProcess] mingle: all alone [2018-10-02 20 Connected to redis://192.168.246.11:6379/0 55 pycharmprojects\ venv\ usingcelerywithdjango\ lib\ celery\ fixups\ django. Py:265: UserWarning: Using settings.DEBUG leads to a memory leak Never use this setting in production environments! Warnings.warn ('Using settings.DEBUG leads to a memory leak, never' [2018-10-02 20 purl 55 purl 57 Magi 653: WARNING/MainProcess] celery@IDX-xujf ready.
The above is started successfully, and it will be no problem to make sure that all the tasks below [tasks] are available.
With regard to the following content of this [tasks], which is the names of all our custom tasks, let's take a look at how we get these task names.
Get all the tasks
All tasks can be obtained through app.tasks. This app is the instance generated by app = Celery ('CeleryPro') in CeleryPro/celery.py. And changed the alias through from. Celery import app as celery_app in CeleryPro/__init__.py, so it should be celery_app.tasks in this project.
The print celery_app.tasks results are as follows:
{'celery.chord_unlock':,' celery.group':, 'app01.tasks.xsum':,' celery.backend_cleanup':, 'app01.tasks.add':,' celery.map':, 'app01.tasks.mul':,' celery.chain':, 'CeleryPro.celery.debug_task':,' celery.starmap':, 'celery.chord':,' celery.chunks':}
Our tasks are all in it, but there are a lot of other tasks (all starting with celery). Previously, Worker was started with the-l info parameter, and if you use the-l debug parameter, you can also see these tasks. In other words, celery does a filter when starting Worker, debug mode prints all, and info mode prints only user-defined tasks. The next step is to look in the source code to see how the filtering is done.
I intercepted the following in the source code to analyze it:
# celery/apps/worker.py# is first some string content that will be printed to the console at startup # this is LOGO This is not the point ARTLINES = ['-','- * -','--*--','- *--','- * * -','- * * -' '- * * -','- * -','-' ] # this string is the string of the print task list # before it is output to the console The format will be used to format the string so that the task list can dynamically output EXTRA_INFO_FMT = "" [tasks] {tasks} "# there are many methods in this class Here's a look at class Worker (WorkController): # this is the way to generate a task list. # the logic is also very simple, that is, to determine whether to output all the task if you start with "celery'" and "include_builtins" as True. For False, filter out the specific values of # include_builtins at the beginning of 'celery'. See the following extra_info method def tasklist (self, include_builtins=True, sep='\ n values, int_='celery.'): return sep.join ('. {0} '.format (task) for task in sorted (self.app.tasks) if (not task.startswith (int_) if not include_builtins else task) # this method calls the above tasklist method # to determine the startup level first, and according to whether the level is less than or equal to debug, determine the include_builtins parameter # and finally use the result returned by tasklist Format EXTRA_INFO_FMT def extra_info (self): if self.loglevel G:\ Steed\ Documents\ PycharmProjects\ venv\ UsingCeleryWithDjango\ Scripts\ celery-l info-P eventlet-celery@IDX-xujf v4.2.1 (windowlicker)-*-*-- Windows-10-10.0.17134-SP0 2018- 10-08 13 CeleryPro:0x16ad81d16a0- 33 CeleryPro:0x16ad81d16a0- 21-* *-- [config]-* * -. > app: CeleryPro:0x16ad81d16a0- * * -. > transport: redis://192.168.246.11:6379/0- * *- . > results: redis://192.168.246.11/0- * * -. > concurrency: 4 (eventlet)-- *--. > task events: OFF (enable-E to monitor tasks in this worker)-*-- [queues]. > celery exchange=celery (direct) key=celery [tasks]. CeleryPro.celery.debug_task. App01.tasks.add. App01.tasks.mul. App01.tasks.xsum [2018-10-08 13 Connected to redis://192.168.246.11:6379/0 Connected to redis://192.168.246.11:6379/0: INFO/MainProcess] mingle: searching for neighbors [2018-10-08 13 INFO/MainProcess] mingle: all alone [2018-10-08 13 Connected to redis://192.168.246.11:6379/0 22502: WARNING/MainProcess] g:\ steed\ documents\ pycharmprojects\ venv\ usingcelerywithdjango\ lib\ celery\ fixups\ django. Py:200: UserWarning: Using settings.DEBUG leads to a memory leak Never use this setting in production environments! Warnings.warn ('Using settings.DEBUG leads to a memory leak Never'[2018-10-08 13 celery@IDX-xujf ready celery@IDX-xujf ready: INFO/MainProcess] pidbox: Connected to redis://192.168.246.11:6379/0. [2018-10-08 13 pidbox: INFO/MainProcess] Received task: app01.tasks.add [2b56d6b7-012f-44db-bf4b-2d85d22dcd8d] [2018-10-08 13 celery@IDX-xujf ready: INFO/MainProcess] Task app01.tasks.add [2b56d6b7 -012f-44db-bf4b-2d85d22dcd8d] succeeded in 0.0s: 7
Above is the log of worker. After startup, a task has been submitted, and this time it has been processed normally.
Use Django_Celery_Beat
Register in settings's INSTALLED_APPS first:
INSTALLED_APPS = [. 'django_celery_beat',]
A database that applies django_celery_beat will automatically create several tables. Just migrate directly:
> python manage.py migrateOperations to perform: Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessionsRunning migrations: Applying django_celery_beat.0001_initial... OK Applying django_celery_beat.0002_auto_20161118_0346... OK Applying django_celery_beat.0003_auto_20161209_0049... OK Applying django_celery_beat.0004_auto_20170221_0000... OK Applying django_celery_beat.0005_add_solarschedule_events_choices... OK Applying django_celery_beat.0006_auto_20180210_1226... OK
After logging in to django admin, you can look at the following tables:
Tasks are configured in the Periodic tasks table. The other tables show the execution cycles of various tasks.
Configuration task
First enter the Intervals table and create a new task cycle. Here we build a cycle every five seconds.
Then go to the Periodic tasks table, select the task to be performed, and associate it with a cycle.
The task you can see here is by automatically discovering the registration task:
There is also a section to fill in the task parameters, and there are two boxes with JSON in it. The position parameters are written above, and the key parameters are written below:
The JSON here will be deserialized and passed to the task function as "* args, * * kwargs".
All right, when the task is configured, the other task cycles are the same, so don't try.
Start Beat
You still need to start a Beat to send tasks on a regular basis. Start Worker first, and then start Beat with an extra parameter "- S django":
G:\ Steed\ Documents\ PycharmProjects\ UsingCeleryWithDjango > G:\ Steed\ Documents\ PycharmProjects\ venv\ UsingCeleryWithDjango\ Scripts\ celery-l info-S djangocelery beat v4.2.1 (windowlicker) is starting.__ -. _ _-_ LocalTime-> 2018-10-08 14:43:43Configuration->. Broker-> redis://192.168.246.11:6379/0. Loader-> celery.loaders.app.AppLoader. Scheduler-> django_celery_beat.schedulers.DatabaseScheduler. Logfile-> [stderr] @% INFO. Maxinterval-> 5.00 seconds (5s) [2018-10-08 14 INFO/MainProcess] beat: Starting... [2018-10-08 14 INFO/MainProcess: INFO/MainProcess] Writing entries... [2018-10-08 14 Starting...: INFO/MainProcess] Writing entries... [2018-10-08 1414 INFO/MainProcess: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add) [2018-10-08 14 beat: INFO/MainProcess] Scheduler : Sending due task add34 (app01.tasks.add) [2018-10-08 14 INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add) [2018-10-08 1414 INFO/MainProcess] Writing entries... [2018-10-08 1414 INFO/MainProcess 432: INFO/MainProcess] Writing entries... [2018-10-08 1414 purge 4343 purge 59Eng 727: INFO/MainProcess] Writing entries... [2018-10-08 1414 purge 4359Magne729: INFO/MainProcess] Writing entries...G:\ Steed\ Documents\ PycharmProjects\ UsingCeleryWithDjango >
Note: each time you modify a task, you need to restart Beat for the latest configuration to take effect. This has a great impact on Intervals's tasks (which are executed at regular intervals). Crontab's mission doesn't seem to be a big problem.
The above content is how to install and use Celery. Have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.