Python中使用Celery队列。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB, Amazon SQS,CouchDB, SQLAlchemy ,Django ORM, IronMQ。推荐使用RabbitMQ、Redis作为消息队列。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache。建议使用与消息中间件一样的服务。
这里我们使用Redis来作为消息中间件和任务结果存储。
假设Redis服务已正常安装且处理启动状态。
首先需要安装Python的redis、celery依赖
pip install redis celery
创建一个任务 task.py
from celery import Celery
app = Celery('tasks', broker='redis://172.17.0.2:6379/0', backend='redis://172.17.0.2:6379/0')
@app.task
def add(x, y):
return x + y
启动这个任务
celery -A task worker --loglevel=info
另建一个脚本来执行任务 pub.py
from task import add
for i in range(1000):
add.delay(i, i)
# result = add.delay(2,1)
#
# print(result.ready())
#
# print(result.state)
#
# print(result.get())
另开一个终端执行该脚本,然后就可以在两个终端中看到任务的执行情况了。