Celery队列可以被集成在Django中。
假定我们已经有一个Django项目叫pyseg
.
首先在pyseg/pyseg
下新建一个mycelery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pyseg.settings')
app = Celery('pyseg')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
在pyseg/pyseg/settings.py
中添加
CELERY_BROKER_URL = 'redis://172.17.0.2:6379/0'
CELERY_REUSLT_BACKEND = 'redis://172.17.0.2:6379/0'
CELERY_TASK_SERIALIZER = 'json'
在pyseg/pyseg/__init__.py
中添加
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that task will use this app.
from .mycelery import app as celery_app
__all__ = ['celery_app']
在任一app中创建tasks.py
from __future__ import absolute_import, unicode_literals
from pyseg.mycelery import app
@app.task
def add(x, y)
return x + y
@app.task
def minus(x, y)
return x - y
在项目根目录下启动celery
celery -A pyseg.mycelery worker -l info
然后就可以在views中调用task了
from task import add.minus
add.delay(1, 2)
minus.delay(2, 1)
接下来启动Django来看执行效果吧!
定时任务
- 定时间隔任务
在pyseg/pyseg/settings.py
中添加
from datetime import timedelta
CELERY_BEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': timedelta(seconds=30),
'args': (2, 1)
},
}
- 定时执行任务
# crontab任务
# 每天7:30调用tasks.add
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'myapp.tasks.add',
'schedule': crontab(hour=7, minute=30),
'args': (2, 1)
},
}
需要注意的是task
参数,需要传递任务函数完整的引用,不然会报错找不到。
这两种都需要额外启动一个进程
celery -A pyseg.mycelery beat -l info