相关文章推荐
内向的火柴  ·  WPF - 善用路由事件 - ...·  4 月前    · 
坏坏的西装  ·  ‘RegAsm.exe‘ ...·  4 月前    · 
耍酷的梨子  ·  mysqldump got error ...·  7 月前    · 
吐血总结,避坑指南,异步任务Celery使用看这个就够了

吐血总结,避坑指南,异步任务Celery使用看这个就够了

写在前面

自学Python很久,Django小项目也写过不少,对Redis也有点概念。

很早前(多年前)就需要一个异步任务功能,当时看过celery、django-q,磕磕碰碰也调通了,弄好了,但是很多概念一直理解不深。

近期又一个重要项目需要异步任务,包括周期性任务,还需要一些灵活的可视化配置,但是我对异步任务的机制忘得差不多了,哎。

没办法,下定决心,这次认真学习,起码记好笔记,希望能做到以下几点:

  • 尽量知其然知其所以然
  • 入门要掌握,复杂功能留下扩展空间
  • 下次有异步任务需求,能够快速运行,只需编写修改任务逻辑即可

吐血总结版

通过学习实验,加上自己的理解。

我觉得学习异步任务需要 掌握几个名词、熟悉工作流程、理解几个概念,知道一些踩坑点 ,具体如下:

掌握几个名词

异步任务或定时任务 ,其实可以理解为将任务发起和任务执行解耦,确保程序主线程正常运行。

定时任务本质上就是异步任务。

异步任务 需要由 任务发起方、任务队列、任务执行者、任务结果存储 几部分互相配合。

任务发起方 ——下面称为生产者,如核心主函数,django核心视图代码等。

任务队列 ——即消息队列,用于缓存待执行任务,一般使用Redis。

任务执行者 ——由Celery提供核心功能,通过Worker命令启动一个Worker,能够自动连接消息队列并读取待执行任务,交由Worker来处理。

任务结果存储 ——可选,将任务执行结果进行持久化保持,可以使用Redis或Mysql等数据库,以便后续查看。

熟悉工作流程

1、定义一个Celery 应用实例,称之为app,导入任务函数,可添加个性化配置

2、编写任务函数,通过@app.task装饰一下,这个是消费者核心代码。

3、在需要使用异步任务的地方(生产者), 调用 之前编写的任务函数。

先导入,再使用,使用delay方法或apply_async方法。

from tasks.task01 import send_email result = send_email.delay('yuan') #queue可选,指定发送任务到哪个队列 result = send_email.apply_async(('yuan',),queue="testq")

4、生产者调用任务时,马上向app配置的任务队列发送任务,其实就是 任务函数名和参数 。此时生产者马上获取一个任务ID——taskid,主程序继续运行,后续可以拿这个taskid来查询任务执行结果。

5、Worker实时监听任务队列并取出任务,分配空闲worker来执行。

6、定时任务,其实就是有人定时向消息队列发送异步任务

理解几个概念

  1. Celery 和Django无关,它是一个通用Python库,Celery Worker 命令是启动消费者,监听消息队列,和是否Django项目无关。
  2. 一般项目中定时任务或者周期性任务,无需生产者参与,需要启动Celery Beat程序,他根据配置文件中定时配置,按要求不停的向消息队列中发送异步任务
  3. 在保证连通性前提下,无论Worker是否启动,Beat或生产者都可以往消息队列中发送任务

踩坑点

  1. 确保有安装Celery、Redis等常用的Python扩展库
  2. windows下执行运行Worker,任务不执行,必需使用-P eventlet启动,同时连接Redis必需使用IP地址,不能使用localhost
  3. 启动Worker时必需和消息队列保持连通,修改任务函数后,必需重启Worker
  4. 启动Worker时可以指定消息队列,但是必需在配置文件中配置,或调用任务时指定队列名
  5. 如果都是使用默认队列celery,启动Worker时可能会收到大量历史任务并进行处理
  6. 定时任务celery beat如果没有及时关闭,会一直按要求发送异步任务,产生大量历史遗留任务

常用命令

#帮助文档 多看看 
celery --help 
#常规启动Worker 
celery -A tasks worker --loglevel=INFO 
#Windows下启动Worker 
celery -A tasks worker --loglevel=INFO -P eventlet 
#关闭Worker 
Ctrl + C
,可能需要连续按 
#启动Beat程序 可以帮我们定时发送任务到消息队列 
celery -A tasks beat --loglevel=INFO 

代码示例

1、目录结构

新建一个celery02学习目录,文件结构如下:

celery02/tasks/__init__.py
            /celery.py
            /tasks01.py
            /tasks02.py
        /produce_task_1.py
        /produce_task_2.py

其中tasks是消费者模块,有管理Celery实例的celery.py文件,有多个任务函数文件。

produce_task_1可以模拟消费者,测试异步任务调用;

produce_task_2模拟定时任务调用。

2、各部分代码解读

celery.py文件就是定义了一个可用的Celery实例app,在任务函数中可以使用。

后续项目只需修改include需要加载的任务文件即可。

#celery.py
from celery import Celery
from datetime import timedelta
app = Celery('celery_demo',
     broker='redis://127.0.0.1:6379/2',
     backend='redis://127.0.0.1:6379/1',
     # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
     include=['tasks.task01',
              'tasks.task02'
app.conf.timezone = 'Asia/Shanghai' # 时区
app.conf.enable_utc = False # 是否使用UTC
app.conf.task_default_queue = "celery02"  #修改默认队列,可以不要
#配置文件定时任务
app.conf.beat_schedule = {
    'sendmail-every-10-seconds': {
        'task': 'tasks.task01.send_email',
        'schedule': timedelta(seconds=10),
        'args': ('李四',)
}

任务文件定义的异步任务的核心逻辑,这里只是作为示例

#task01.py
import time
from tasks.celery import app
#这是关键,穿上这件衣服就是异步任务函数了
@app.task
def send_email(res):
    print("开始向%s发送邮件任务"%res)
    time.sleep(5)
    print("完成向%s发送邮件任务"%res)
    return "mail ok"

另一个异步处理任务

#task02.py
import time
from tasks.celery import app
@app.task
def send_msg(name):
    print("开始向%s发送短信任务"%name)
    time.sleep(5)
    print("完成向%s发送短信任务"%name)
    return "msg ok"

通过一个脚本文件模拟生产者调用异步任务

#produce_task_1.py
from tasks.task01 import send_email
from tasks.task02 import send_msg
from datetime import datetime
v1 = datetime.now()
print(f"当前时间:{v1}")
# 立即告知celery去执行test_celery任务,并传入一个参数
#result = send_email.apply_async(('yuan',),queue="testq")
result = send_email.delay('yuan')
print(f"任务ID{result.id}")
result = send_msg.delay('yuan')
print(f"任务ID{result.id}")

通过一个脚本文件模拟生产者触发定时任务

#produce_task_2.py
from tasks.task01 import send_email
from tasks.task02 import send_msg
from datetime import datetime
#方式一 固定时间
v1 = datetime(2022, 11, 22, 16, 14, 00)
print(f"当前时间:{v1}")
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(f"任务运行时间:{v2}")
result = send_email.apply_async(args=["定时任务-指定时间"], eta=v2)
print(f"任务ID{result.id}")
# 方式二
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())