本篇是FastAPI使用Celery做队列之二,与前篇不同的是,本篇是从实际项目中抽取的代码,并使用Redis做broker。
首先我们新添加个celery_queue.py
,定义celery实例和相关配置,以及定时任务的执行,代码如下:
from celery import Celery
from celery.schedules import crontab
from tasks import *
celery_instance = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')
celery_instance.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json']
)
# 注册任务,实测不注册也可以,但是需要执行 from tasks import add_task,也就是说,这里的注册是为了让celery_instance知道这个任务的存在
# celery_instance.task(add_task)
celery_instance.conf.beat_schedule = {
# 'add-every-30-seconds': {
# 'task': 'tasks.add_task',
# 'schedule': 30.0,
# 'args': (16, 16)
# },
'add-every-minute': {
'task': 'tasks.add_task',
'schedule': crontab(minute='*'),
'args': (32, 32)
},
}
第二步,我们新添加一个task.py
,定义具体执行任务的函数,代码如下
from celery import shared_task
from celery import Celery
# 要跟celery_queue.py中的保持一致,共用一个Celery实例
celery_instance = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')
# 注意点:这里的@celery_instance.task不能写成@shared_task,否则在FastAPI中调用会报错
@celery_instance.task
def add_task(x, y):
return x + y
@celery_instance.task
def minus_task(x, y):
return x - y
第三步,在main.py
中引入task.py
并添加路由
from tasks import *
@app.post("/add")
def add(x: int, y: int):
result = add_task.delay(x, y).get()
return {"result": result}
@app.post("/minus")
def minus(x: int, y: int):
result = minus_task.delay(x, y).get()
return {"result": result}
第四步,安装celery依赖并启动celery实例
pip3 install celery
celery -A celery_queue worker -B -l INFO
最后,启动FastAPI
uvicorn main:app --reload
定义在celery_queue.py
中的任务就会定时执行,也可以通过路由/add
和/minus
去手动执行任务。
1 thought on “FastAPI使用Celery做队列之二”