一、介绍
Celery 是基于异步消息传递的任务队列/作业队列。它可以用作应用程序的后台任务处理器,您可以在其中转储任务以在后台或任何给定时刻执行。它可以配置为同步或异步执行任务。
异步消息传递是一个系统,其中消息存储在发送方和接收方之间。这允许发送者在发送消息后立即继续处理其他事情,也可以在接收者采取任何操作之前堆叠多条消息。
Celery 的主要因素:
- 任务
- 任务代理
- 结果后端
- 程序
任务是您要发送给 Celery 以在其工作线程中执行的功能。这些是普通的 Python 函数,带有装饰器将它们与其他函数分开。
任务代理是消息传递系统,您将使用它与 Celery 通信以发送任务以供执行。Celery 支持许多代理,如 Redis、RabbitMQ、Amazon SQS 等。
结果后端再次是一个消息传递系统,但在这种情况下,Celery 使用它来存储任务执行结果,然后可以从您的应用程序访问该结果。它支持许多结果后端,如Redis,RabbitMQ(AMQP),SQLAlchemy等。
worker 是不言自明的,它是一个 Celery 进程,在后台运行,等待任务到达任务代理,通常多个工作线程一起运行以实现并发执行。
二、Celery 中的任务生命周期
Celery 执行任务的过程可以分解为:
- 任务注册
- 任务执行
- 结果存储
您的应用程序将任务发送到任务代理,然后由工作线程保留执行,最后任务执行的结果存储在结果后端中。
三、Celery 的应用
了解Celery 可能有点忙,不知道它的正确应用。您可以通过许多不同的方式将其集成到应用程序中。
Celery 最常见的应用:
- 定期执行 – 需要在间隔后定期运行的任务,例如发送每月新闻稿。
- 第三方执行 – 与第三方服务交互的任务,例如通过 SMTP 发送电子邮件。
- 长时间执行 – 需要很长时间才能完成执行的任务,例如压缩文件。
四、使用 Celery 创建您的第一个程序
在本节中,您将学习如何将 Celery 任务集成到 Python 程序中。
4.1、准备
要完成本指南,您将需要:
- Python 3.7 或更高版本
- Redis Server,按照 Ubuntu 20.04 上的安装和配置 Redis 进行操作
4.2、安装 Celery
Celery 是用 Python 编写的,可以使用 Python 的软件包安装程序(pip)进行安装。
安装最新版本的 Celery :
pip install celery
安装所需的依赖项以将 Redis 与 Celery 一起使用:
pip install celery[redis]
4.3、编写你的第一个 Celery 任务
在这里,我们分解了一个非常基本的程序,演示了如何编写现有函数或将现有函数转换为 Celery 任务。您可以复制并粘贴最后提到的最终代码来自己测试。
导入和初始化 Celery 对象
从 celery python 包导入类并将其初始化为任何变量,这里我们使用了该变量。传递给类的第一个参数是我们应用程序的名称。Celery
app
from celery import Celery
app = Celery(
'tasks',
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
由于我们将 Redis 用于我们的代理和后端,因此我们使用关键字参数指定了它。如果不使用 Redis 的默认配置,则可以使用以下格式为环境编写连接字符串。
redis://username:password@hostname:port/db
创建基本 Celery 任务
您可以使用装饰器将任何函数转换为 Celery 任务,该装饰器添加了我们现有函数作为任务运行所需的所有必要功能。@app.task
@app.task
def multiply(a, b):
import time
time.sleep(5)
return a * b
最终代码
您可以将最终代码复制并粘贴到一个名为新文件中,以按照下一节中给出的说明进行操作。tasks.py
from celery import Celery
app = Celery(
'tasks',
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@app.task
def multiply(a, b):
import time
time.sleep(5)
return a * b
4.4、管理 Celery
写完任务后,您需要工作人员在您执行任务时处理任务。
启动 Celery
通过运行给定的命令来启动 Celery 工作线程,以按照下一节中给出的说明进行操作。确保您位于保存的同一目录中tasks.py
启动 Celery :
celery -A tasks worker -n worker1 -P prefork -l INFO
预期输出:
使用的参数:
-A
的缩写,用于指定工作人员将使用的应用程序。--app
-n
是它的缩写,用于指定工作人员的名称。--hostname
-P
是用于指定池类型的缩写,下面将讨论工作器类型。--pool
-l
是它的缩写,用于指定我们工作人员的日志级别。--loglevel
您还可以使用 的缩写在后台运行工作线程。-D
--detach
停止 Celery
处理完所有任务后,您可以通过运行给定的命令手动关闭工作线程。
结束运行的 Celery :
ps auxww | awk '/celery(.*)worker/ {print $2}' | xargs kill -9
4.5、在 Celery 程序中执行任务
现在,工作线程已启动并准备好处理队列,请打开新控制台并执行命令以打开 python 控制台。确保您位于保存的同一目录中python
tasks.py
导入任务:
from tasks import multiply
执行任务:
使用函数将任务执行请求发送到消息代理。.delay()
task1 = mutliply.delay(512, 100)
检查任务状态:
用于检查任务的当前状态。.state
task1.state
获取任务执行结果:
用于获取任务执行结果。.get()
task1.get()
预期输出:
您可以使用演示的步骤将 Celery 集成到应用程序的工作流中。
五、程序类型
选择正确的程序类型非常重要,因为它对执行时间和效率有重大影响。本文的这一部分将指导您了解 Celery 中可用的各种类型的工作人员。
Celery 的类型:
- 独奏
- 预分叉
- 事件小品
- 格万特
顾名思义,Solo 是一个内联池,这意味着任务不会同时处理。它只创建一个线程,并使用该线程执行任务。
用单人池开始 Celery 程序
celery -A tasks worker --pool=solo --loglevel=info
需要逐个运行的任务的理想选择。需要放弃并发并使用独奏池的用例并不多。
Prefork 池使用 Python 内置的多处理库,它可以同时处理多个任务。线程数可以用标志调整。--concurrency
使用预叉池启动 Celery 程序
celery -A tasks worker --pool=prefork --concurrency=4 --loglevel=info
如果您的任务受 CPU 限制,则是一个理想的选择。如果任务的大部分时间都使用 CPU,则称为 CPU 受限,并且只有在 CPU 更快的情况下才能运行得更快。
CPU 绑定任务的示例:文件转换、压缩、搜索算法等。
Eventlet & Gevent pool 使用协程(也称为绿色线程)执行任务,而不是生成传统线程。它可以同时处理多个任务。可以使用 flag 调整协程的数量。--concurrency
使用事件池启动 Celery 程序
celery -A tasks worker --pool=eventlet --concurrency=500 --loglevel=info
使用 Gevent 池启动 Celery 程序
celery -A tasks worker --pool=gevent --concurrency=500 --loglevel=info
I/O 绑定任务的理想选择。当主要瓶颈是等待 I/O 操作完成的时间时,任务称为 I/O 绑定。您可以将并发数设置得很高,因为这不受可用 CPU 数量的限制,这与预分叉不同。
I/O 绑定任务的示例:发送电子邮件、发出 API 请求等。
注意:eventlet 和 gevent 不是 Python 标准库的一部分,您必须通过运行或
pip install celery[eventlet]
pip install celery[gevent]
六、结论
您可以使用给定的信息开始将 Celery 集成到您的应用程序中,但这还不是全部,Celery 可以实现更多。大多数 SaaS(软件即服务)Web 应用程序使用 Celery 作为后台任务处理器来执行各种操作。