要在FastAPI中使用Celery队列和定时执行任务,需要遵循以下步骤:
-
首先,在项目中安装Celery和相应的消息队列,比如RabbitMQ或Redis。
-
创建一个包含Celery配置的文件,比如celeryconfig.py。这个文件应该包含Celery的配置信息,例如消息队列的连接信息、任务序列化方式、任务结果序列化方式等等。
-
创建一个tasks.py文件,这个文件定义了你要执行的任务。你可以使用@celery.task装饰器来标识任务函数。
-
在FastAPI应用程序中,创建一个celery对象并与Celery配置文件连接。然后,从tasks.py文件导入要执行的任务并注册到celery对象中。
-
在FastAPI应用程序中,定义定时任务。你可以使用Celery的beat调度器来完成这个任务,beat调度器使用类似于Crontab的语法来指定定时任务的时间和频率。
-
启动Celery Worker和Beat调度器并运行FastAPI应用程序。
下面是一个使用Celery队列和定时任务的FastAPI示例:
# app.py
from fastapi import FastAPI
from celery import Celery
from celery.schedules import crontab
from tasks import add_task
app = FastAPI()
# 创建一个Celery对象,并配置它
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
celery.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json']
)
# 将任务注册到Celery对象中
celery.task(add_task)
# 定义定时任务
celery.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add_task',
'schedule': 30.0,
'args': (16, 16) # 这个任务的参数
},
'add-every-minute': {
'task': 'tasks.add_task',
'schedule': crontab(minute='*'),
'args': (32, 32) # 这个任务的参数
},
}
# 启动Celery Worker和Beat调度器
celery.worker_main(['celery', 'worker', '-l', 'INFO'])
celery.beat_main(['celery', 'beat', '-l', 'INFO'])
# FastAPI的路由
@app.get('/')
async def root():
return {'message': 'Hello, world!'}
# tasks.py
from celery import shared_task
@shared_task
def add_task(x, y):
return x + y
在上面的代码中,通过Celery的@shared_task装饰器来声明add_task作为一个Celery任务,然后将其注册到Celery对象中。在定义定时任务时,使用了两个不同的时间段,一个是每30秒执行一次,另一个是每分钟执行一次。你可以随意修改这些时间段,使它们符合实际需求。
最后,在命令行中运行python app.py
来启动应用程序和Celery Worker。在另一个终端中,运行celery -A app.celery beat
来启动Beat调度器。现在你可以访问FastAPI路由并观察你的Celery任务被执行和定时执行了。
Update 2024-01-11
本篇中的代码是我参考网上的写的,实际执行过程中可能会报错,为此,我另写了一篇FastAPI使用Celery做队列之二,用的是实际项目中可正常运行的代码,请参考它,本文不另作修改了。