FastAPI怎么使用Celery队列并定时执行任务

要在FastAPI中使用Celery队列和定时执行任务,需要遵循以下步骤:

  1. 首先,在项目中安装Celery和相应的消息队列,比如RabbitMQ或Redis。

  2. 创建一个包含Celery配置的文件,比如celeryconfig.py。这个文件应该包含Celery的配置信息,例如消息队列的连接信息、任务序列化方式、任务结果序列化方式等等。

  3. 创建一个tasks.py文件,这个文件定义了你要执行的任务。你可以使用@celery.task装饰器来标识任务函数。

  4. 在FastAPI应用程序中,创建一个celery对象并与Celery配置文件连接。然后,从tasks.py文件导入要执行的任务并注册到celery对象中。

  5. 在FastAPI应用程序中,定义定时任务。你可以使用Celery的beat调度器来完成这个任务,beat调度器使用类似于Crontab的语法来指定定时任务的时间和频率。

  6. 启动Celery Worker和Beat调度器并运行FastAPI应用程序。

下面是一个使用Celery队列和定时任务的FastAPI示例:

# app.py

from fastapi import FastAPI
from celery import Celery
from celery.schedules import crontab
from tasks import add_task

app = FastAPI()

# 创建一个Celery对象,并配置它
celery = Celery('tasks', broker='pyamqp://guest@localhost//')
celery.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json']
)

# 将任务注册到Celery对象中
celery.task(add_task)

# 定义定时任务
celery.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add_task',
        'schedule': 30.0,
        'args': (16, 16)  # 这个任务的参数
    },
    'add-every-minute': {
        'task': 'tasks.add_task',
        'schedule': crontab(minute='*'),
        'args': (32, 32)  # 这个任务的参数
    },
}

# 启动Celery Worker和Beat调度器
celery.worker_main(['celery', 'worker', '-l', 'INFO'])
celery.beat_main(['celery', 'beat', '-l', 'INFO'])

# FastAPI的路由
@app.get('/')
async def root():
    return {'message': 'Hello, world!'}
# tasks.py

from celery import shared_task

@shared_task
def add_task(x, y):
    return x + y

在上面的代码中,通过Celery的@shared_task装饰器来声明add_task作为一个Celery任务,然后将其注册到Celery对象中。在定义定时任务时,使用了两个不同的时间段,一个是每30秒执行一次,另一个是每分钟执行一次。你可以随意修改这些时间段,使它们符合实际需求。

最后,在命令行中运行python app.py来启动应用程序和Celery Worker。在另一个终端中,运行celery -A app.celery beat来启动Beat调度器。现在你可以访问FastAPI路由并观察你的Celery任务被执行和定时执行了。

Update 2024-01-11

本篇中的代码是我参考网上的写的,实际执行过程中可能会报错,为此,我另写了一篇FastAPI使用Celery做队列之二,用的是实际项目中可正常运行的代码,请参考它,本文不另作修改了。

Leave a Comment

豫ICP备19001387号-1