使用 Celery 在 Python 中执行异步任务队列

一、介绍

Celery 是基于异步消息传递的任务队列/作业队列。它可以用作应用程序的后台任务处理器,您可以在其中转储任务以在后台或任何给定时刻执行。它可以配置为同步或异步执行任务。

异步消息传递是一个系统,其中消息存储在发送方和接收方之间。这允许发送者在发送消息后立即继续处理其他事情,也可以在接收者采取任何操作之前堆叠多条消息。

Celery 的主要因素:

  • 任务
  • 任务代理
  • 结果后端
  • 程序

任务是您要发送给 Celery 以在其工作线程中执行的功能。这些是普通的 Python 函数,带有装饰器将它们与其他函数分开。

任务代理是消息传递系统,您将使用它与 Celery 通信以发送任务以供执行。Celery 支持许多代理,如 Redis、RabbitMQ、Amazon SQS 等。

结果后端再次是一个消息传递系统,但在这种情况下,Celery 使用它来存储任务执行结果,然后可以从您的应用程序访问该结果。它支持许多结果后端,如Redis,RabbitMQ(AMQP),SQLAlchemy等。

worker 是不言自明的,它是一个 Celery 进程,在后台运行,等待任务到达任务代理,通常多个工作线程一起运行以实现并发执行。

二、Celery 中的任务生命周期

Celery 执行任务的过程可以分解为:

  • 任务注册
  • 任务执行
  • 结果存储

您的应用程序将任务发送到任务代理,然后由工作线程保留执行,最后任务执行的结果存储在结果后端中。

三、Celery 的应用

了解Celery 可能有点忙,不知道它的正确应用。您可以通过许多不同的方式将其集成到应用程序中。

Celery 最常见的应用:

  • 定期执行 – 需要在间隔后定期运行的任务,例如发送每月新闻稿。
  • 第三方执行 – 与第三方服务交互的任务,例如通过 SMTP 发送电子邮件。
  • 长时间执行 – 需要很长时间才能完成执行的任务,例如压缩文件。

四、使用 Celery 创建您的第一个程序

在本节中,您将学习如何将 Celery 任务集成到 Python 程序中。

4.1、准备

要完成本指南,您将需要:

  • Python 3.7 或更高版本
  • Redis Server,按照 Ubuntu 20.04 上的安装和配置 Redis 进行操作

4.2、安装 Celery

Celery 是用 Python 编写的,可以使用 Python 的软件包安装程序(pip)进行安装。

安装最新版本的 Celery :

pip install celery

安装所需的依赖项以将 Redis 与 Celery 一起使用:

pip install celery[redis]

 4.3、编写你的第一个 Celery 任务

在这里,我们分解了一个非常基本的程序,演示了如何编写现有函数或将现有函数转换为 Celery 任务。您可以复制并粘贴最后提到的最终代码来自己测试。

导入和初始化 Celery 对象

从 celery python 包导入类并将其初始化为任何变量,这里我们使用了该变量。传递给类的第一个参数是我们应用程序的名称。Celeryapp

from celery import Celery



app = Celery(

    'tasks',

    broker="redis://localhost:6379/0",

    backend="redis://localhost:6379/0"

)

由于我们将 Redis 用于我们的代理和后端,因此我们使用关键字参数指定了它。如果不使用 Redis 的默认配置,则可以使用以下格式为环境编写连接字符串。

redis://username:password@hostname:port/db

创建基本 Celery 任务

您可以使用装饰器将任何函数转换为 Celery 任务,该装饰器添加了我们现有函数作为任务运行所需的所有必要功能。@app.task

@app.task

def multiply(a, b):

    import time

    time.sleep(5)

    return a * b

最终代码

您可以将最终代码复制并粘贴到一个名为新文件中,以按照下一节中给出的说明进行操作。tasks.py

from celery import Celery



app = Celery(

    'tasks',

    broker="redis://localhost:6379/0",

    backend="redis://localhost:6379/0"

)



@app.task

def multiply(a, b):

   import time

   time.sleep(5)

   return a * b

4.4、管理 Celery

写完任务后,您需要工作人员在您执行任务时处理任务。

启动 Celery 

通过运行给定的命令来启动 Celery 工作线程,以按照下一节中给出的说明进行操作。确保您位于保存的同一目录中tasks.py

启动 Celery :

celery -A tasks worker -n worker1 -P prefork -l INFO

预期输出:

Start Worker Output

使用的参数:

  • -A的缩写,用于指定工作人员将使用的应用程序。--app
  • -n是它的缩写,用于指定工作人员的名称。--hostname
  • -P是用于指定池类型的缩写,下面将讨论工作器类型。--pool
  • -l是它的缩写,用于指定我们工作人员的日志级别。--loglevel

您还可以使用 的缩写在后台运行工作线程。-D--detach

停止 Celery

处理完所有任务后,您可以通过运行给定的命令手动关闭工作线程。

结束运行的 Celery :

ps auxww | awk '/celery(.*)worker/ {print $2}' | xargs kill -9

4.5、在 Celery 程序中执行任务

现在,工作线程已启动并准备好处理队列,请打开新控制台并执行命令以打开 python 控制台。确保您位于保存的同一目录中pythontasks.py

导入任务:

from tasks import multiply

执行任务:

使用函数将任务执行请求发送到消息代理。.delay()

task1 = mutliply.delay(512, 100)

检查任务状态:

用于检查任务的当前状态。.state

task1.state

获取任务执行结果:

用于获取任务执行结果。.get()

task1.get()

预期输出:

Python Console Output

您可以使用演示的步骤将 Celery 集成到应用程序的工作流中。

 五、程序类型

选择正确的程序类型非常重要,因为它对执行时间和效率有重大影响。本文的这一部分将指导您了解 Celery 中可用的各种类型的工作人员。

 Celery 的类型:

  • 独奏
  • 预分叉
  • 事件小品
  • 格万特

顾名思义,Solo 是一个内联池,这意味着任务不会同时处理。它只创建一个线程,并使用该线程执行任务。

用单人池开始 Celery 程序

celery -A tasks worker --pool=solo --loglevel=info

需要逐个运行的任务的理想选择。需要放弃并发并使用独奏池的用例并不多。

Prefork 池使用 Python 内置的多处理库,它可以同时处理多个任务。线程数可以用标志调整。--concurrency

使用预叉池启动 Celery 程序

celery -A tasks worker --pool=prefork --concurrency=4 --loglevel=info

如果您的任务受 CPU 限制,则是一个理想的选择。如果任务的大部分时间都使用 CPU,则称为 CPU 受限,并且只有在 CPU 更快的情况下才能运行得更快。

CPU 绑定任务的示例:文件转换、压缩、搜索算法等。

Eventlet & Gevent pool 使用协程(也称为绿色线程)执行任务,而不是生成传统线程。它可以同时处理多个任务。可以使用 flag 调整协程的数量。--concurrency

使用事件池启动 Celery 程序

celery -A tasks worker --pool=eventlet --concurrency=500 --loglevel=info

使用 Gevent 池启动 Celery 程序

celery -A tasks worker --pool=gevent --concurrency=500 --loglevel=info

I/O 绑定任务的理想选择。当主要瓶颈是等待 I/O 操作完成的时间时,任务称为 I/O 绑定。您可以将并发数设置得很高,因为这不受可用 CPU 数量的限制,这与预分叉不同。

I/O 绑定任务的示例:发送电子邮件、发出 API 请求等。

注意:eventlet 和 gevent 不是 Python 标准库的一部分,您必须通过运行或pip install celery[eventlet]pip install celery[gevent]

六、结论

您可以使用给定的信息开始将 Celery 集成到您的应用程序中,但这还不是全部,Celery 可以实现更多。大多数 SaaS(软件即服务)Web 应用程序使用 Celery 作为后台任务处理器来执行各种操作。

赞(0)
未经允许不得转载:主机百科 » 使用 Celery 在 Python 中执行异步任务队列