Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize Cluster Management by using celery

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article is about how to use celery to achieve cluster management, the editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Architecture:

The celery app here as an example is myapp:

Root@workgroup0:~/celeryapp# ls myappagent.py celery.py config.py _ _ init__.pyroot@workgroup0:~/celeryapp#

Common code section:

Celery.py: (note: 172.16.77.175 is the ip address of the task publishing node)

From_ _ future__ import absolute_importfrom celery import Celeryapp = Celery ('myapp', broker='amqp://guest@172.16.77.175//', backend='amqp://guest@172.16.77.175//', include= [' myapp.agent']) app.config_from_object ('myapp.config') if _ _ name__ =' _ main__': app.start ()

Config.py:

From _ future__ import absolute_importfrom kombu import Queue,Exchangefrom datetime import timedeltaCELERY_TASK_RESULT_EXPIRES=3600CELERY_TASK_SERIALIZER='json'CELERY_ACCEPT_CONTENT= ['json'] CELERY_RESULT_SERIALIZER='json'CELERY_DEFAULT_EXCHANGE =' agent'CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'CELERT_QUEUES = (Queue (' machine1',exchange='agent',routing_key='machine1'), Queue ('machine2',exchange='agent',routing_key='machine2'),)

_ _ init__.py: (blank)

Agent.py of the task publishing node:

From _ _ future__ import absolute_importfrom myapp.celery import app@app.taskdef add (xQuery): return {'the value is': str (xonomy)} @ app.taskdef writefile (): out=open ('/ tmp/data.txt','w') out.write ('hello'+'\ n') out.close () @ app.taskdef mul (x Y): return x*y@app.taskdef xsum (numbers): return sum (numbers) @ app.taskdef getl (stri): return getlength (stri) def getlength (stri): return len (stri)

Agent.py on docker1:

From _ _ future__ import absolute_importfrom myapp.celery import app@app.taskdef add (xQuery): return {'value':str (xpeny),' node_name':'docker1'} # added node_name to identify the node @ app.taskdef writefile (): out=open ('/ tmp/data.txt','w') out.write ('hello'+'\ n') out.close () @ app.taskdef mul (x Y): return x*y@app.taskdef xsum (numbers): return sum (numbers) @ app.taskdef getl (stri): return getlength (stri) def getlength (stri): return len (stri)

On docker2:

From _ _ future__ import absolute_importfrom myapp.celery import app@app.taskdef add (xQuery): return {'value':str (xonomy),' node_name':'docker2'} @ app.taskdef writefile (): out=open ('/ tmp/data.txt','w') out.write ('hello'+'\ n') out.close () @ app.taskdef mul (x Y): return x*y@app.taskdef xsum (numbers): return sum (numbers) @ app.taskdef getl (stri): return getlength (stri) def getlength (stri): return len (stri)

In this example, I only test the add () function:

Start worker on the docker1 node: (specify the listening queue with-Q)

Root@workgroup1:~/celeryapp# celery-A myapp worker-l info-Q machine1/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not sought after Please specify a different user using the-u option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid -celery@workgroup1.hzg.com v3.1.17 (Cipater)-*-- *-* *-- Linux-3.13.0-* *-* *-- Linux-3.13.0-* *-- Linux-3.13.0-*-- [config] -* * -. > app: myapp:0x7f472d73f190- * * -. > transport: amqp://guest:**@172.16.77.175:5672//- * * -. > results: amqp://guest@172.16.77.175//- * *-* . > concurrency: 1 (prefork)-- *-*-- [queues] -. > machine1 exchange=machine1 (direct) key=machine1 [tasks]. Myapp.agent.add. Myapp.agent.getl. Myapp.agent.mul. Myapp.agent.writefile. Myapp.agent.xsum [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672//] mingle: searching for neighbors [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672//] mingle: sync with 1 nodes [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672//] mingle: sync complete [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672// 0714: WARNING/MainProcess] celery@workgroup1.hzg.com ready.

Start worker on docker2:

Root@workgroup2:~/celeryapp# celery-A myapp worker-l info-Q machine2/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not sought after Please specify a different user using the-u option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid -celery@workgroup2.hzg.com v3.1.18 (Cipater)-*-- *-* *-- Linux-3.13.0-- Linux-3.13.0-* *-- [config] -* * -. > app: myapp:0x7f708cb8ec10- * * -. > transport: amqp://guest:**@172.16.77.175:5672//- * * -. > results: amqp://guest@172.16.77.175//- * *-* . > concurrency: 1 (prefork)-- *-*-- [queues] -. > machine2 exchange=machine2 (direct) key=machine2 [tasks]. Myapp.agent.add. Myapp.agent.getl. Myapp.agent.mul. Myapp.agent.writefile. Myapp.agent.xsum [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672//] mingle: searching for neighbors [2015-10-18 15 mingle: INFO/MainProcess] mingle: sync with 1 nodes [2015-10-18 15 Connected to amqp://guest:**@172.16.77.175:5672//: INFO/MainProcess] mingle: sync complete [2015-10-18 1515 mingle: WARNING/MainProcess] celery@workgroup2.hzg.com ready.

Publish a computing task to docker1 at the task release node:

Root@workgroup0:~/celeryapp# lsdefault.etcd hots.sh hotswap.py myapp myapp1tmp people.db resp sora test.pyroot@workgroup0:~/celeryapp# pythonPython 2.7.6 (default, Mar 22 2014, 22:59:56) [GCC 4.8.2] on linux2Type "help", "copyright", "credits" or "license" for more information. > > from myapp.agent import add > res = add.apply_async (args= [122 ~ 34], queue='machine1' Routing_key='machine1') > res.get () {upright valueholders: upright 156, upright nodevalued names: upright docker1'}

You can see the return from docker1 with get (), and then look at the display of docker1:

[2015-10-18 15 c487a9a2-e5cc-462b-a131] Task myapp.agent.add [c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value':' 156, 'node_name':' docker1'}

As for docker2, it hasn't moved at all:

[2015-10-18 15 sync complete [2015-10-18 15 sync complete] 2015-10-18 15 sync complete [2015-10-18 15 WARNING/MainProcess] celery@workgroup2.hzg.com ready.

Publish a task to docker2:

> res = add.apply_async (args= [1440900], queue='machine2',routing_key='machine2') > res.get () {upright valueholders: upright 2340 values, upright nodedocker2'} > above is how to implement cluster management using celery. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report