celery是一个基于分布消息传递的异步任务队列.它一定需要建立在一个分布的消息传递机制上,这个消息传递机制就是celery文档里常说的broker。
celery隐藏了rabbitmq接口的实现细节,既充当了publisher(client)又充当了consumer (worker)的角色。
'''思考一下,如果我们用rabbitmq自己实现任务队列,有一天我们不想用rabbit了怎么办?我们换个思维,如果没有celery,让你自己设计一个异步任务队列你怎么做。首先,要有一个发起任务的client,选定一定保存任务信息的媒介,由一个worker去一直监听这个信息媒介,这个worker最好是多进程的,另外可以兼容尽可能多得信息媒介。好吧,这个不就是celery所做的事儿么,celery兼容多个broker,既是任务发起者又是执行者,另外支持多进程…还有好多通用功能考虑。
'''
假设项目的目录结构是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| celery -A task.celeryapp worker -l info -c 5
celery -A task.celeryapp flower -l info --basic_auth=user1:111111 flower -A task.celeryapp --port=8091
celery beat -A task.celeryapp -l info
celery -B -A task.celeryapp worker -l info
celery multi restart w1 -B -A task.celeryapp -l info
celery -A task.celeryapp inspect stats
celery=Celery() celery.config_from_object('task.celeryconfig') celery.send_task('tq.tasks.test', ("hello world",))
from task.tasks import test print test.delay('param from invoke ')
... celery.config_from_object('task.celeryconfig') from celery.task.control import revoke revoke('aac00b9c-f701-4a21-bed4-53a8b865a39a', terminate=True)
from celery.result import AsyncResult res=AsyncResult(t.task_id) res.state <!---->
res.revoke(terminate=True)
json.dumps(celeryapp.control.inspect().conf(), indent=2)
celeryapp.control.inspect().registered_tasks()
celeryapp.control.inspect().active()
celeryapp.control.inspect().stats()
|
1 2 3
| 在使用flower时遇到 'stats' inspect method failed , 最终通安装指定版本的kombu 解决 4.5.0 pipenv install kombu==4.5.0
|