Python学习笔记:Python库Celery详细教程

什么是Celery

Celery 是一个简单、灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。

官方网站:https ://docs.celeryproject.org/en/stable/

上面的定义听起来很复杂吧?不用担心。我将用一个真实世界的例子来简化定义。

假设你在你最喜欢的餐厅与你的女朋友共进晚餐。

  1. 服务员已经来找你并接受了你的订单。
  2. 服务员去了厨房,将你的订单告知Cook。
  3. Cook在 Queue 中有很多订单。他们正在从队列中一个一个地处理。
  4. 你的订单来自队列。一个厨子做好了菜,确认了服务员。
  5. 服务员把菜端到你的餐桌上。
  6. 你和你女朋友吃得很开心。

现在让我们将上述事件与 Celery 联系起来。与上面的例子类比。

  1. 订单是一条消息。消息是关于要执行什么任务和该任务的输入参数的信息。
  2. 煮菜是在 Celery 中执行的任务。一个任务(在编程中是一个函数)并且包含作用于输入并产生一些输出的动作/代码。
  3. Cook是Celery 的一名工人。工人是执行这些任务的程序,即;执行任务/功能。餐厅中可以有一名或多名工人,就像一名或多名厨师一样。
  4. Order Queue 是 Celery 中的一个任务队列。任务队列是要由工作人员执行的任务队列。

注意:Celery 使用 Message Broker 和它的 Messaging Queue 来进行操作。你可以阅读有关此主题的深入了解。但在这里我试图简化事情,让你清楚地了解 Celery 是如何工作的。

在 Celery 中,你可以假设订单或消息是要执行的任务,Waiter 是消息代理,订单队列是任务队列,而厨师是执行任务的 Worker。

当人们在餐厅点单时,任务队列会填满煮菜任务,例如 Task1、Task2、Task3……等等……Cook/工作人员从订单队列中获取订单或消息或任务并进行处理。

在 Celery 中,你可以创建可由工作人员执行的任务。客户端可以调用任务,任务由工作人员而不是客户端执行。一台或多台机器上可以有一个或多个 Celery worker(这就是为什么它在定义中被称为分布式)。

好的,足够的理论。现在让我们进入技术并使用 Celery 编写一个简单的程序。

准备工作

  1. 你需要 Redis/RabbitMQ 之一。我在这里使用 Redis。我已经安装了 Redis 并在 6379 端口上运行。
  2. pip安装相关的库
pip install celery
pip install redis

开始使用Celery

让我们创建一个文件夹 CeleryTest

在上述文件夹CeleryTest)中创建文件 tasks.py 并在文件中写入以下代码。请仔细阅读代码。

from celery import Celery
BROKER_URL = 'redis://localhost:6379/0'
celery_app = Celery('Restaurant', broker=BROKER_URL)
@celery_app.task
def cooking_task(table_no, dishes):
print("Started cooking for Table Number : "+table_no)
for dish in dishes:
print("Cooking : "+dish)
print("Done cooking for Table Number : "+table_no)
  • 我们从 celery 包中导入了 Celery 类。
  • 我们使用 Redis 创建了 BROKER_URL。
  • 我们使用 Celery 类创建了 celery_app 实例,方法是将模块名称作为 Restaurant 传递,将 broker 作为 Redis 传递。
  • 我们用 @celery_app.task 装饰器装饰了 cooking_task 函数。用 @celery_app.task 装饰器装饰的函数被认为是celery任务。

现在转到命令提示符并在你的 tasks.py 所在的同一文件夹中,运行以下命令以运行我们可以执行此任务的 celery worker。

celery -A tasks worker --pool=solo --loglevel=info

你应该看到上面的输出。确保你看到以红线标记的日志,以确保我们的工作人员正常运行。好的,现在我们的 worker 正在运行,可以执行任务 cooking_task。

现在我们将任务提交给我们的工人。

在上面的文件夹( CeleryTest)中创建文件 test.py 并在文件中写入以下代码。

from tasks import cooking_task
table_1_dishes = ["Chicken Biryani", "Lemon chicken", "Pepper chicken"]
result = cooking_task.delay("Table-1", table_1_dishes)
print(result)
  • 我们从任务中导入了 cooking_task
  • 我们通过 cooking_task.delay(*args, **kwargs) 函数通过传递相应的输入(例如 Table-No 和菜式)来调用此任务。

在 test.py 控制台中,你将看到打印的任务 ID。

转到工作人员控制台并检查。

我们的工作人员收到了任务并执行了它。

编辑 test.py 并开始为我们的工作人员分配更多任务

from tasks import cooking_task
table_1_dishes = ["Chicken Biryani", "Lemon chicken", "Pepper chicken"]
result = cooking_task.delay("Table-1", table_1_dishes)
print(result)
table_2_dishes = ["Mutton Biryani", "Egg Biryani"]
result2 = cooking_task.apply_async(args=["Table-2", table_2_dishes])
print(result2)
table_3_dishes = ["Butter Naan", "Andhra Chicken"]
result3 = cooking_task.apply_async(args=["Table-3", table_3_dishes])
print(result3)
table_4_dishes = ["Chicken Manchurian", "Chicken Noodles"]
result4 = cooking_task.apply_async(args=["Table-4", table_4_dishes])
print(result4)

在 test.py 控制台中,我们打印了 4 个任务 ID

返回工作人员控制台并检查。

工人执行四项任务

你已经了解了什么是 celery、如何在 celery 中编写任务、如何运行 worker 以及如何调用 celery 任务。我们将在下一篇文章中深入探讨Celery的使用场景跟Celery Workers、Pool 及其并发配置。

Celery使用的场景

1) 你想在后台运行一些任务。

2) 你想将一些任务(长时间运行或耗时或 CPU 消耗)下发到运行在不同机器上的 Celery Worker,以提高主应用程序服务器的响应。

3) 你想定期安排任务。

4) 你想并行执行任务或扩展你的软件或应用程序系统。

根据我的经验,当满足上述条件时,你可以在电子商务、医疗保健、银行等不同领域使用 Python Celery。

后台运行任务

假设你有一个电子商务应用程序。当用户下订单时,你的应用程序需要发送有关订单更新(例如下订单、付款、发货等)的短信和电子邮件。

假设你有一个 Django/Flask 应用程序,当用户订购时,你不应该以相同的顺序发送电子邮件/短信 django/flask http 请求。如果你这样做,它只会延迟用户的响应,因为电子邮件服务器/文本消息服务器可能不会快速响应。

在这里你需要创建/定义发送电子邮件/短信的任务。你相应地调用这些任务。Celery 工作人员将在后台处理这些任务,但对用户的响应会很快。

分布式任务

假设你有一个数据分析应用程序。当用户请求对数据进行一些复杂的数据分析计算时,它已经来到你的 Django/Flask 应用程序,但是你不想在你的 Django/Flask 应用程序运行的同一台机器上运行这个计算,因为计算需要很多时间并且还吃掉了这台机器的 CPU,导致 Django/Flask 应用程序响应不佳。你的 Django/Flask 应用程序将无法响应不同用户的进一步请求。那你在这里做什么?

很简单,你在 Python Celery 中创建/定义一个用于复杂数据分析计算的任务,并在不同机器上运行 celery worker,在不同机器上运行 Django/Flask 应用程序。

现在,当用户请求对数据进行一些复杂的数据分析计算时,它已经到达你的 Django/Flask 应用程序,但你调用了相应的 Celery 任务。由于 Celery worker 在不同的机器上运行,因此计算发生在不同的机器上。

定时任务

假设你有一个电子商务应用程序。你希望每天通过电子邮件向用户发送产品推荐。你可以创建/定义一个 Celery 任务并使用 Celery beat 安排它。

并行执行任务

假设你有 celery 任务,但你只有一名工人在执行所有任务。你的应用程序无法满足用户需求的要求。

你希望同时运行多个任务以提高应用程序的性能。

答案是增加多台机器上的 celery worker 数量。

Celery Workers、Pool 及其并发配置

在说这个Celery Workers跟Pool之前我们先看下Celery启动worker的命令。

celery -A tasks worker --pool=prefork --concurrency=1 --loglevel=info
  • -A, -app <app> 这个命令选项创建了Celery对象。
  • worker用于启动Celery worker
  • –loglevel用于记录日志可以使用DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL
  • –pool=prefork,这个选项是设置Celery 运行任务不同的方式,可以选择prefork|eventlet|gevent|solo
  • –concurrency=1,这个选项是worker并发运行的任务数量

通过上面的说明,Celery Pool应该是重点,让我们继续深入去了解pool运行任务方式有什么不一样。

Celery 提供了使用“pool”选项运行任务的不同方式。我们必须根据我们正在执行的任务类型(CPU 绑定或 I/O 或网络绑定)选择“pool”。Celery提供了5种不同方式的Pool,我将一个个介绍。

  • Solo

Solo 只创建一个线程并使用该线程运行 celery 任务。并且无法提供并发数。

celery -A tasks worker --pool=solo --loglevel=info

例如; 你已经定义了一个任务,该任务下载任务中指定的电影。

现在你要下载10部电影,你提交了10个任务。因为我们有一个Work在跑。Worker从Queue中取出任务,开始在线程中运行。因为我们这里只有一个线程。在现有任务完成之前,它不能选择另一个任务。

假设下载一部电影的平均时间为一小时。现在这个 Worker 需要 10 个小时来完成所有 10 个任务,即下载所有 10 部电影。所以 pool solo 不适合这类任务。

  • prefork

prefork 使用多重处理,可以提供并发选项。(建议提供运行Celery Worker的机器的CPU数量),prefork 是 CPU 绑定的最佳池选项。

celery -A tasks worker --pool=prefork --concurrency=4 --loglevel=info

例如; 我们有 Celery Worker 在 4 个 CPU 机器上运行。你已经定义了一项任务,该任务执行一些复杂的数学计算。

现在你想在 10 个不同的数据集上运行这个计算,你已经提交了 10 个任务。因为我们有一个Work在跑。Worker 从 Queue 中取出任务并开始在这个进程中运行。由于我们这里有 4 个进程,它可以同时运行 4 个任务。

  • eventlet

eventlet 和 gevent 是 I/O 和网络的最佳池选项。

注意:使用eventlet跟gevent需要额外安装相对应的包。

pip install eventlet/gevent

Eventlet 是 Python 的并发网络库,允许你更改运行代码的方式,而不是编写代码的方式。

    • 它使用 epoll 或 kqueue 或 libevent 来实现高度可扩展的非阻塞 I/O。
    • 协程确保开发人员使用类似于线程的阻塞式编程,但提供非阻塞 I/O 的优势。
    • 事件分派是隐式的,这意味着你可以轻松地从 Python 解释器中使用 Eventlet,或者将其作为大型应用程序的一小部分。

eventlet 不会使用并发选项创建多个线程。相反,它所做的是仅创建一个线程并使用称为事件循环的概念处理一个线程的并发。

celery -A tasks worker --pool=eventlet --concurrency=10 --loglevel=info

例如; 你已经定义了一个任务,该任务下载任务中指定的电影。

现在你要下载10部电影,你提交了10个任务。因为我们有一个Work在跑。Worker从Queue中取出任务,开始在线程中运行。由于我们的 eventlet 以 10 并发运行,因此所有任务都开始下载相应的电影。

根据网络速度,所有 10 个文件的下载速度都非常快。

  • gevent

gevent 是一个基于协程的Python网络库,它使用greenlet在libev或libuv事件循环之上提供高级同步 API 。

功能包括:

  • 基于libev或libuv的快速事件循环。
  • 基于 greenlet 的轻量级执行单元。
  • 重新使用 Python 标准库中的概念的 API(例如,有事件和队列)。
  • 支持 SSL 的协作套接字
  • 通过线程池、dnspython 或 c-ares 执行的合作 DNS 查询。
  • 猴子补丁实用程序使第 3 方模块变得合作
  • TCP/UDP/HTTP 服务器
  • 子流程支持(通过gevent.subprocess)
  • 线程池

gevent受到 eventlet 的启发,但具有更一致的 API、更简单的实现和更好的性能。

gevent 跟Eventlet一样不会使用并发选项创建多个线程。相反,它所做的是仅创建一个线程并使用称为事件循环的概念处理一个线程的并发

celery -A tasks worker --pool=gevent --concurrency=10 --loglevel=info

例如; 你已经定义了一个任务,该任务下载任务中指定的电影。

现在你要下载10部电影,你提交了10个任务。因为我们有一个Work在跑。Worker从Queue中取出任务,开始在线程中运行。由于我们的 eventlet 以 10 并发运行,因此所有任务都开始下载相应的电影。

根据网络速度,所有 10 个文件的下载速度都非常快。

  • threads

新加入的模式,高并发的情况性能不如协程。

  • processes

兼容别名跟prefork一样

给TA打赏
共{{data.count}}人
人已打赏
python

Python学习笔记:pymysql菜鸟教程

2023-2-23 16:04:41

python

Python学习笔记:Loguru日志模块详解(一个强大的日志记录模块)

2023-2-23 16:20:13

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索
打开微信,扫描左侧二维码,关注【旅游人lvyouren】,发送【101】获取验证码,输入获取到的验证码即可解锁复制功能,解锁之后可复制网站任意一篇文章,验证码每月更新一次。
提交